Skip to content

Commit

Permalink
- Added history of visited sites to prevent cycles (https://issues.re…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 12, 2023
1 parent 307544c commit a872f4f
Showing 1 changed file with 102 additions and 40 deletions.
142 changes: 102 additions & 40 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
Expand Down Expand Up @@ -442,7 +443,7 @@ public Route getRoute(String site_name) {
* {@link #isSiteMaster()} returns false).
*/
public List<String> getCurrentSites() {
Relayer rel = relayer;
Relayer rel=relayer;
return rel == null ? null : rel.getSiteNames();
}

Expand Down Expand Up @@ -517,13 +518,8 @@ public Object up(Message msg) {

if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY)) {
Address src=msg.getSrc();
SiteUUID sender=new SiteUUID((UUID)msg.getSrc(), NameCache.get(msg.getSrc()), site);
if(src instanceof ExtendedUUID)
sender.addContents((ExtendedUUID)src);
sendToBridges(sender, msg, site);
}
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);
return up_prot.up(msg); // pass up
}
if(handleAdminMessage(hdr, msg.src()))
Expand All @@ -544,13 +540,8 @@ public void up(MessageBatch batch) {

if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY)) {
Address src=msg.getSrc();
SiteUUID sender=new SiteUUID((UUID)msg.getSrc(), NameCache.get(msg.getSrc()), site);
if(src instanceof ExtendedUUID)
sender.addContents((ExtendedUUID)src);
sendToBridges(sender, msg, site);
}
if(dest == null && is_site_master && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);
}
else { // header is not null
if(handleAdminMessage(hdr, batch.sender())) {
Expand Down Expand Up @@ -660,7 +651,8 @@ protected void handleRelayMessage(Relay2Header hdr, Message msg) {
down_prot.down(copy); // multicast locally

// Don't forward: https://issues.redhat.com/browse/JGRP-1519
// sendToBridges(msg.getSrc(), buf, from_site, site_id); // forward to all bridges except self and from
// Changed May 2023: send to all sites - local - hdr.visited_sites
sendToBridges(msg);
}
}

Expand Down Expand Up @@ -744,20 +736,46 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
}


/** Sends the message via all bridges excluding the excluded_sites bridges */
protected void sendToBridges(Address sender, final Message msg, String ... excluded_sites) {
/** Sends the message to all sites in the routing table, minus the local site */
protected void sendToBridges(final Message msg) {
Relayer tmp=relayer;
List<Route> routes=tmp != null? tmp.getRoutes(excluded_sites) : null;
Map<String,List<Route>> routes=tmp != null? tmp.routes : null;
if(routes == null)
return;
for(Route route: routes) {
if(log.isTraceEnabled())
log.trace(local_addr + ": relaying multicast message from " + sender + " via route " + route);
try {
route.send(null, sender, msg);
}
catch(Exception ex) {
log.error(local_addr + ": failed relaying message from " + sender + " via route " + route, ex);
Address src=msg.getSrc();
Relay2Header hdr=msg.getHeader(this.id);
SiteUUID sender=new SiteUUID((UUID)msg.getSrc(), NameCache.get(msg.getSrc()), site);
if(src instanceof ExtendedUUID)
sender.addContents((ExtendedUUID)src);

Set<String> visited_sites=new HashSet<>(routes.keySet()), // to be added to the header
sites_to_visit=new HashSet<>(routes.keySet()); // sites to which to forward the message

if(this.site != null) {
visited_sites.add(this.site);
sites_to_visit.remove(this.site); // don't send to the local site
}

if(hdr != null && hdr.hasVisitedSites()) {
visited_sites.addAll(hdr.getVisitedSites());
sites_to_visit.removeAll(hdr.getVisitedSites()); // avoid cycles (https://issues.redhat.com/browse/JGRP-1519)
}

for(String dest_site: sites_to_visit) {
List<Route> val=routes.get(dest_site);
if(val == null)
continue;
// try sending over all routes; break after the first successful send
for(Route route: val) {
if(log.isTraceEnabled())
log.trace(local_addr + ": relaying multicast message from " + sender + " via route " + route);
try {
route.send(null, sender, msg, visited_sites);
break;
}
catch(Exception ex) {
log.error(local_addr + ": failed relaying message from " + sender + " via route " + route, ex);
}
}
}
}
Expand Down Expand Up @@ -875,7 +893,7 @@ protected void notifySiteMasterListener(boolean flag) {
* members which cannot become site masters (can_become_site_master == false). If no site master can be found,
* the first member of the view will be returned (even if it has can_become_site_master == false)
*/
protected List<Address> determineSiteMasters(View view, int max_num_site_masters) {
protected static List<Address> determineSiteMasters(View view, int max_num_site_masters) {
List<Address> retval=new ArrayList<>(view.size());
int selected=0;

Expand Down Expand Up @@ -965,10 +983,12 @@ public static class Relay2Header extends Header {
public static final byte TOPO_REQ = 6;
public static final byte TOPO_RSP = 7;

protected byte type;
protected Address final_dest;
protected Address original_sender;
protected String[] sites; // used with SITES_UP/SITES_DOWN/TOPO_RSP
protected byte type;
protected Address final_dest;
protected Address original_sender;
protected String[] sites; // used with SITES_UP/SITES_DOWN/TOPO_RSP
protected Set<String> visited_sites; // used to record sites to which this msg was already sent
// (https://issues.redhat.com/browse/JGRP-1519)


public Relay2Header() {
Expand Down Expand Up @@ -998,9 +1018,28 @@ public String[] getSites() {
return sites;
}

public Relay2Header addToVisitedSites(String s) {
if(visited_sites == null)
visited_sites=new HashSet<>();
visited_sites.add(s);
return this;
}

public Relay2Header addToVisitedSites(Collection<String> list) {
if(list == null || list.isEmpty())
return this;
for(String s: list)
addToVisitedSites(s);
return this;
}

public boolean hasVisitedSites() {return visited_sites != null && !visited_sites.isEmpty();}
public Set<String> getVisitedSites() {return visited_sites;}

@Override
public int serializedSize() {
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) + sizeOf(sites);
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) +
sizeOf(sites) + sizeOf(visited_sites);
}

@Override
Expand All @@ -1013,6 +1052,11 @@ public void writeTo(DataOutput out) throws IOException {
for(String s: sites)
Bits.writeString(s, out);
}
out.writeInt(visited_sites == null? 0 : visited_sites.size());
if(visited_sites != null) {
for(String s: visited_sites)
Bits.writeString(s, out);
}
}

@Override
Expand All @@ -1021,16 +1065,25 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
final_dest=Util.readAddress(in);
original_sender=Util.readAddress(in);
int num_elements=in.readInt();
if(num_elements == 0)
return;
sites=new String[num_elements];
for(int i=0; i < sites.length; i++)
sites[i]=Bits.readString(in);
if(num_elements > 0) {
sites=new String[num_elements];
for(int i=0; i < sites.length; i++)
sites[i]=Bits.readString(in);
}
num_elements=in.readInt();
if(num_elements > 0) {
visited_sites=new ConcurrentSkipListSet<>();
for(int i=0; i < num_elements; i++)
visited_sites.add(Bits.readString(in));
}
}

public String toString() {
return typeToString(type) + " [dest=" + final_dest + ", sender=" + original_sender +
(type == TOPO_RSP? ", topos=" : ", sites=") + Arrays.toString(sites) + "]";
return String.format("%s [dest=%s, sender=%s, %s%s%s]",
typeToString(type), final_dest, original_sender,
type == TOPO_RSP? "topos=" : "sites=", Arrays.toString(sites),
visited_sites == null || visited_sites.isEmpty()? "" :
String.format(", visited=%s", visited_sites));
}

protected static String typeToString(byte type) {
Expand All @@ -1054,5 +1107,14 @@ protected static int sizeOf(String[] arr) {
}
return retval;
}

protected static int sizeOf(Collection<String> list) {
int retval=Global.INT_SIZE; // number of elements
if(list != null) {
for(String s: list)
retval+=Bits.sizeUTF(s) + 1; // presence bytes
}
return retval;
}
}
}

0 comments on commit a872f4f

Please sign in to comment.