Skip to content

Commit

Permalink
- Changed logic in Relayer to add/remove routes to/from the routing t…
Browse files Browse the repository at this point in the history
…able; this works for both symmetric and asymmetric routing
  • Loading branch information
belaban committed May 11, 2023
1 parent 5c645c2 commit 8643441
Showing 1 changed file with 48 additions and 43 deletions.
91 changes: 48 additions & 43 deletions src/org/jgroups/protocols/relay/Relayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Maintains bridges and routing table. Does the routing of outgoing messages and dispatches incoming messages to
Expand Down Expand Up @@ -71,8 +73,8 @@ public void start(List<RelayConfig.BridgeConfig> bridge_configs, String bridge_n
return;
}
try {
for(RelayConfig.BridgeConfig bridge_config: bridge_configs) {
Bridge bridge=new Bridge(bridge_config.createChannel(), bridge_config.getClusterName(), bridge_name,
for(RelayConfig.BridgeConfig cfg: bridge_configs) {
Bridge bridge=new Bridge(cfg.createChannel(), cfg.getClusterName(), cfg.getTo(), bridge_name,
() -> new SiteUUID(UUID.randomUUID(), null, my_site_id));
bridges.add(bridge);
}
Expand Down Expand Up @@ -169,16 +171,19 @@ protected static boolean isExcluded(Route route, String... excluded_sites) {


protected class Bridge implements Receiver {
protected JChannel channel;
protected final String cluster_name;
protected View view;
protected JChannel channel;
protected String cluster_name;
protected String to;
protected View view;

protected Bridge(final JChannel ch, final String cluster_name, String channel_name, AddressGenerator addr_generator) throws Exception {
protected Bridge(final JChannel ch, final String cluster_name, String to, String channel_name,
AddressGenerator addr_generator) throws Exception {
this.channel=ch;
channel.setName(channel_name);
channel.setReceiver(this);
channel.addAddressGenerator(addr_generator);
this.cluster_name=cluster_name;
this.to=to;
}

protected void start() throws Exception {
Expand Down Expand Up @@ -218,35 +223,28 @@ public void receive(Message msg) {
relay.handleRelayMessage(hdr, msg);
}

/** The view contains a list of SiteUUIDs. Adjust the routing table based on the SiteUUIDs UUID and site
*/
/** The view contains a list of SiteUUIDs. Adjust the routing table based on the SiteUUIDs and site */
public void viewAccepted(View new_view) {
View old_view=this.view;
Map<String,List<Address>> sites=Util.getSites(new_view, relay.site());
List<String> removed_routes=removedRoutes(old_view, new_view);
Set<String> up=new HashSet<>(), down=new HashSet<>(removed_routes);

this.view=new_view;
// add new routes to routing table:
for(String r: sites.keySet()) {
if(!routes.containsKey(r))
up.add(r);
}
log.trace("[Relayer " + channel.getAddress() + "] view: " + new_view);

Map<String,List<Address>> tmp=extract(new_view);
Set<String> down=new HashSet<>(routes.keySet());
Set<String> up=new HashSet<>();

down.removeAll(tmp.keySet());

routes.keySet().retainAll(tmp.keySet()); // remove all sites which are not in the view

for(Map.Entry<String,List<Address>> entry: tmp.entrySet()) {
String key=entry.getKey();
for(Map.Entry<String,List<Address>> entry: sites.entrySet()) {
String key=entry.getKey();
List<Address> val=entry.getValue();

List<Route> newRoutes;
if(routes.containsKey(key))
newRoutes=new ArrayList<>(routes.get(key));
else {
newRoutes=new ArrayList<>();
if(up != null)
up.add(key);
}
List<Route> existing=routes.get(key);
List<Route> newRoutes=existing != null? new ArrayList<>(existing) : new ArrayList<>();

// Remove routes not in the view anymore:
newRoutes.removeIf(route -> !val.contains(route.siteMaster()));
newRoutes.removeIf(r -> !val.contains(r.siteMaster()));

// Add routes that aren't yet in the routing table:
val.stream().filter(addr -> !contains(newRoutes, addr))
Expand All @@ -255,11 +253,16 @@ public void viewAccepted(View new_view) {
if(newRoutes.isEmpty()) {
routes.remove(key);
down.add(key);
up.remove(key);
}
else
routes.put(key, Collections.unmodifiableList(newRoutes));
routes.put(key, newRoutes);
}

// remove all routes which were dropped between the old and new view:
if(!removed_routes.isEmpty() && log.isTraceEnabled())
log.trace("%s: removing routes %s from routing table", removed_routes);
removed_routes.forEach(routes.keySet()::remove);

if(!down.isEmpty())
relay.sitesChange(true, down.toArray(new String[0]));
if(!up.isEmpty())
Expand All @@ -268,24 +271,26 @@ public void viewAccepted(View new_view) {

@Override
public String toString() {
return String.format("bridge %s", cluster_name);
return String.format("bridge to %s [cluster: %s]", to, cluster_name);
}

protected boolean contains(List<Route> routes, Address addr) {
return routes.stream().anyMatch(route -> route.siteMaster().equals(addr));
}

/** Returns a map containing the site keys and addresses as values */
protected Map<String,List<Address>> extract(View view) {
Map<String,List<Address>> map=new HashMap<>(view.size());
for(Address mbr: view) {
SiteAddress member=(SiteAddress)mbr;
String key=member.getSite();
List<Address> list=map.computeIfAbsent(key, k -> new ArrayList<>());
if(!list.contains(member))
list.add(member);
}
return map;
/** Returns a list of routes that were in old_view, but are no longer in new_view */
protected List<String> removedRoutes(View old_view, View new_view) {
List<String> l=new ArrayList<>();
if(old_view == null)
return l;
List<String> old_routes=Stream.of(old_view.getMembersRaw()).filter(a -> a instanceof SiteUUID)
.map(s -> ((SiteUUID)s).getSite()).collect(Collectors.toList());
List<String> new_routes=Stream.of(new_view.getMembersRaw()).filter(a -> a instanceof SiteUUID)
.map(s -> ((SiteUUID)s).getSite()).collect(Collectors.toList());
old_routes.removeAll(new_routes);
return old_routes;
}


}
}

0 comments on commit 8643441

Please sign in to comment.