Skip to content

Commit

Permalink
- Added support for asymmetric routing (https://issues.redhat.com/bro…
Browse files Browse the repository at this point in the history
…wse/JGRP-1506)

- Changed logic in Relayer to add/remove routes to/from the routing table; this works for both symmetric and asymmetric routing
- Added history of visited sites to prevent cycles (https://issues.redhat.com/browse/JGRP-1519)
- Using the correct sender when relaying messages to other sites (sender's site was incorrect)
- Implemented forwardind of messages to sites via ForwardingRoutes
  • Loading branch information
belaban committed May 16, 2023
1 parent cdec0cf commit 2885eaa
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 143 deletions.
5 changes: 3 additions & 2 deletions src/org/jgroups/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,11 @@ public static List<Address> leftMembers(View one, View two) {
}

public static List<Address> newMembers(View old, View new_view) {
if(old == null || new_view == null)
if(new_view == null)
return null;
List<Address> retval=new ArrayList<>(new_view.getMembers());
retval.removeAll(old.getMembers());
if(old != null)
retval.removeAll(old.getMembers());
return retval;
}

Expand Down
50 changes: 50 additions & 0 deletions src/org/jgroups/protocols/relay/ForwardingRoute.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.jgroups.protocols.relay;

import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Route which forwards all messages to a given site via a given route.<br/>
* Example: if site HF is reachable via NET1, and NET2 is not directly connected to HF, it needs to have a
* ForwardConfig with to="HF" and gateway="NET1". This means that a message to site HF will be forwarded to NET1,
* which then forwards it to HF.<br/>
* Note that 'to' can be a regular expression and {@link #matches(String)} can be used to match a given site against it.
* @author Bela Ban
* @since 5.2.15
*/
public class ForwardingRoute implements Comparable<ForwardingRoute> {
protected String to; // target site
protected String gateway; // actual site to use for routing (routes.get(gateway); has to point to an existing route
protected Pattern p;

public ForwardingRoute(String to, String gateway) {
this.to=Objects.requireNonNull(to);
this.gateway=Objects.requireNonNull(gateway);
p=Pattern.compile(to);
}

public String to() {return to;}
public ForwardingRoute to(String to) {this.to=Objects.requireNonNull(to); p=Pattern.compile(to); return this;}
public String gateway() {return gateway;}
public ForwardingRoute gateway(String gw) {this.gateway=Objects.requireNonNull(gw); return this;}
public boolean matches(String site) {Matcher m=p.matcher(site); return m.matches();}

@Override
public int compareTo(ForwardingRoute o) {
if(this == o)
return 0;
int rc=gateway.compareTo(o.gateway);
return rc != 0? rc : to.compareTo(o.to);
}

@Override
public boolean equals(Object obj) {
return compareTo((ForwardingRoute)obj) == 0;
}

@Override
public String toString() {
return String.format("to=%s, gw=%s", to, gateway);
}
}
152 changes: 106 additions & 46 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
Expand Down Expand Up @@ -335,11 +336,6 @@ else if(site_masters_ratio > 1) {
if(site_config == null)
throw new Exception("site configuration for \"" + site + "\" not found in " + config);
log.trace("site configuration:\n" + site_config);

if(!site_config.getForwards().isEmpty())
log.warn(local_addr + ": forwarding routes are currently not supported and will be ignored. This will change " +
"with hierarchical routing (https://issues.redhat.com/browse/JGRP-1506)");

if(enable_address_tagging) {
JChannel ch=getProtocolStack().getChannel();
ch.addAddressGenerator(() -> {
Expand Down Expand Up @@ -442,7 +438,7 @@ public Route getRoute(String site_name) {
* {@link #isSiteMaster()} returns false).
*/
public List<String> getCurrentSites() {
Relayer rel = relayer;
Relayer rel=relayer;
return rel == null ? null : rel.getSiteNames();
}

Expand Down Expand Up @@ -517,13 +513,8 @@ public Object up(Message msg) {

if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY)) {
Address src=msg.getSrc();
SiteUUID sender=new SiteUUID((UUID)msg.getSrc(), NameCache.get(msg.getSrc()), site);
if(src instanceof ExtendedUUID)
sender.addContents((ExtendedUUID)src);
sendToBridges(sender, msg, site);
}
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);
return up_prot.up(msg); // pass up
}
if(handleAdminMessage(hdr, msg.src()))
Expand All @@ -544,13 +535,8 @@ public void up(MessageBatch batch) {

if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY)) {
Address src=msg.getSrc();
SiteUUID sender=new SiteUUID((UUID)msg.getSrc(), NameCache.get(msg.getSrc()), site);
if(src instanceof ExtendedUUID)
sender.addContents((ExtendedUUID)src);
sendToBridges(sender, msg, site);
}
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);
}
else { // header is not null
if(handleAdminMessage(hdr, batch.sender())) {
Expand Down Expand Up @@ -660,7 +646,8 @@ protected void handleRelayMessage(Relay2Header hdr, Message msg) {
down_prot.down(copy); // multicast locally

// Don't forward: https://issues.redhat.com/browse/JGRP-1519
// sendToBridges(msg.getSrc(), buf, from_site, site_id); // forward to all bridges except self and from
// Changed May 2023: send to all sites - local - hdr.visited_sites
sendToBridges(msg);
}
}

Expand Down Expand Up @@ -732,6 +719,8 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
}

Route route=tmp.getRoute(target_site, sender);
if(route == null)
route=tmp.getForwardingRouteMatching(target_site, sender);
if(route == null) {
if(suppress_log_no_route != null)
suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors, sender, target_site);
Expand All @@ -744,20 +733,47 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
}


/** Sends the message via all bridges excluding the excluded_sites bridges */
protected void sendToBridges(Address sender, final Message msg, String ... excluded_sites) {
/** Sends the message to all sites in the routing table, minus the local site */
protected void sendToBridges(final Message msg) {
Relayer tmp=relayer;
List<Route> routes=tmp != null? tmp.getRoutes(excluded_sites) : null;
Map<String,List<Route>> routes=tmp != null? tmp.routes : null;
if(routes == null)
return;
for(Route route: routes) {
if(log.isTraceEnabled())
log.trace(local_addr + ": relaying multicast message from " + sender + " via route " + route);
try {
route.send(null, sender, msg);
}
catch(Exception ex) {
log.error(local_addr + ": failed relaying message from " + sender + " via route " + route, ex);
Address src=msg.getSrc();
Relay2Header hdr=msg.getHeader(this.id);
Address original_sender=hdr != null && hdr.original_sender != null? hdr.getOriginalSender() :
new SiteUUID((UUID)src, NameCache.get(src), site);
if(src instanceof ExtendedUUID)
((ExtendedUUID)original_sender).addContents((ExtendedUUID)src);

Set<String> visited_sites=new HashSet<>(routes.keySet()), // to be added to the header
sites_to_visit=new HashSet<>(routes.keySet()); // sites to which to forward the message

if(this.site != null) {
visited_sites.add(this.site);
sites_to_visit.remove(this.site); // don't send to the local site
}

if(hdr != null && hdr.hasVisitedSites()) {
visited_sites.addAll(hdr.getVisitedSites());
sites_to_visit.removeAll(hdr.getVisitedSites()); // avoid cycles (https://issues.redhat.com/browse/JGRP-1519)
}

for(String dest_site: sites_to_visit) {
List<Route> val=routes.get(dest_site);
if(val == null)
continue;
// try sending over all routes; break after the first successful send
for(Route route: val) {
if(log.isTraceEnabled())
log.trace(local_addr + ": relaying multicast message from " + original_sender + " via route " + route);
try {
route.send(null, original_sender, msg, visited_sites);
break;
}
catch(Exception ex) {
log.error(local_addr + ": failed relaying message from " + original_sender + " via route " + route, ex);
}
}
}
}
Expand Down Expand Up @@ -857,7 +873,7 @@ protected Message copy(Message msg) {
protected void startRelayer(Relayer rel, String bridge_name) {
try {
log.trace(local_addr + ": became site master; starting bridges");
rel.start(site_config.getBridges(), bridge_name, site);
rel.start(site_config, bridge_name, site);
}
catch(Throwable t) {
log.error(local_addr + ": failed starting relayer", t);
Expand All @@ -875,7 +891,7 @@ protected void notifySiteMasterListener(boolean flag) {
* members which cannot become site masters (can_become_site_master == false). If no site master can be found,
* the first member of the view will be returned (even if it has can_become_site_master == false)
*/
protected List<Address> determineSiteMasters(View view, int max_num_site_masters) {
protected static List<Address> determineSiteMasters(View view, int max_num_site_masters) {
List<Address> retval=new ArrayList<>(view.size());
int selected=0;

Expand Down Expand Up @@ -965,10 +981,12 @@ public static class Relay2Header extends Header {
public static final byte TOPO_REQ = 6;
public static final byte TOPO_RSP = 7;

protected byte type;
protected Address final_dest;
protected Address original_sender;
protected String[] sites; // used with SITES_UP/SITES_DOWN/TOPO_RSP
protected byte type;
protected Address final_dest;
protected Address original_sender;
protected String[] sites; // used with SITES_UP/SITES_DOWN/TOPO_RSP
protected Set<String> visited_sites; // used to record sites to which this msg was already sent
// (https://issues.redhat.com/browse/JGRP-1519)


public Relay2Header() {
Expand Down Expand Up @@ -998,9 +1016,28 @@ public String[] getSites() {
return sites;
}

public Relay2Header addToVisitedSites(String s) {
if(visited_sites == null)
visited_sites=new HashSet<>();
visited_sites.add(s);
return this;
}

public Relay2Header addToVisitedSites(Collection<String> list) {
if(list == null || list.isEmpty())
return this;
for(String s: list)
addToVisitedSites(s);
return this;
}

public boolean hasVisitedSites() {return visited_sites != null && !visited_sites.isEmpty();}
public Set<String> getVisitedSites() {return visited_sites;}

@Override
public int serializedSize() {
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) + sizeOf(sites);
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) +
sizeOf(sites) + sizeOf(visited_sites);
}

@Override
Expand All @@ -1013,6 +1050,11 @@ public void writeTo(DataOutput out) throws IOException {
for(String s: sites)
Bits.writeString(s, out);
}
out.writeInt(visited_sites == null? 0 : visited_sites.size());
if(visited_sites != null) {
for(String s: visited_sites)
Bits.writeString(s, out);
}
}

@Override
Expand All @@ -1021,16 +1063,25 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
final_dest=Util.readAddress(in);
original_sender=Util.readAddress(in);
int num_elements=in.readInt();
if(num_elements == 0)
return;
sites=new String[num_elements];
for(int i=0; i < sites.length; i++)
sites[i]=Bits.readString(in);
if(num_elements > 0) {
sites=new String[num_elements];
for(int i=0; i < sites.length; i++)
sites[i]=Bits.readString(in);
}
num_elements=in.readInt();
if(num_elements > 0) {
visited_sites=new ConcurrentSkipListSet<>();
for(int i=0; i < num_elements; i++)
visited_sites.add(Bits.readString(in));
}
}

public String toString() {
return typeToString(type) + " [dest=" + final_dest + ", sender=" + original_sender +
(type == TOPO_RSP? ", topos=" : ", sites=") + Arrays.toString(sites) + "]";
return String.format("%s [final dest=%s, original sender=%s, %s%s%s]",
typeToString(type), final_dest, original_sender,
type == TOPO_RSP? "topos=" : "sites=", Arrays.toString(sites),
visited_sites == null || visited_sites.isEmpty()? "" :
String.format(", visited=%s", visited_sites));
}

protected static String typeToString(byte type) {
Expand All @@ -1054,5 +1105,14 @@ protected static int sizeOf(String[] arr) {
}
return retval;
}

protected static int sizeOf(Collection<String> list) {
int retval=Global.INT_SIZE; // number of elements
if(list != null) {
for(String s: list)
retval+=Bits.sizeUTF(s) + 1; // presence bytes
}
return retval;
}
}
}

0 comments on commit 2885eaa

Please sign in to comment.