diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index b010434440..3d40da418d 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -6,6 +6,7 @@ import org.jgroups.conf.ConfiguratorFactory; import org.jgroups.conf.XmlNode; import org.jgroups.protocols.relay.Topology.MemberInfo; +import org.jgroups.protocols.relay.Topology.Members; import org.jgroups.protocols.relay.config.RelayConfig; import org.jgroups.stack.IpAddress; import org.jgroups.stack.Protocol; @@ -502,7 +503,7 @@ public Object up(Message msg) { if(hdr == null) { TopoHeader topo_hdr=msg.getHeader(TOPO_ID); if(topo_hdr != null) { - handleTopo(topo_hdr, sender, msg, false); + handleTopo(topo_hdr, sender, msg); return null; } } @@ -531,7 +532,7 @@ public void up(MessageBatch batch) { if(hdr == null) { TopoHeader topo_hdr=msg.getHeader(TOPO_ID); if(topo_hdr != null) { - handleTopo(topo_hdr, sender, msg, false); + handleTopo(topo_hdr, sender, msg); it.remove(); } } @@ -564,27 +565,34 @@ public void up(MessageBatch batch) { up_prot.up(batch); } - protected void handleTopo(TopoHeader hdr, Address sender, Message msg, boolean send_rsps_for_entire_view) { + protected void handleTopo(TopoHeader hdr, Address sender, Message msg) { switch(hdr.type()) { case TopoHeader.REQ: - List
targets=send_rsps_for_entire_view? new ArrayList<>(members) : List.of(local_addr); - for(Address t: targets) - sendResponseFor(t, sender); + if(is_site_master) + sendResponseFor(members, sender); break; case TopoHeader.RSP: - MemberInfo mi=msg.getObject(); - topo.handleResponse(mi.site(), mi); + Members mbrs=msg.getObject(); + + System.out.printf("** response from %s: mi=%s\n", sender, mbrs); + if(mbrs.joined != null) { + for(MemberInfo mi: mbrs.joined) + topo.handleResponse(mbrs.site, mi); + } break; } } - protected void sendResponseFor(Address mbr, Address dest) { - SiteAddress my_addr=mbr instanceof SiteMaster? new SiteMaster(((SiteMaster)mbr).getSite()) - : new SiteUUID((UUID)mbr, NameCache.get(mbr), site); - MemberInfo mi=new MemberInfo(this.site, my_addr, - (IpAddress)getPhysicalAddress(mbr), - site_masters.contains(mbr)); - Message rsp=new ObjectMessage(dest, mi).putHeader(TOPO_ID, new TopoHeader(TopoHeader.RSP)); + protected void sendResponseFor(List mbrs, Address dest) { + Members m=new Members(this.site); + for(Address mbr: mbrs) { + SiteAddress my_addr=mbr instanceof SiteMaster? new SiteMaster(((SiteMaster)mbr).getSite()) + : new SiteUUID((UUID)mbr, NameCache.get(mbr), site); + MemberInfo mi=new MemberInfo(this.site, my_addr, (IpAddress)getPhysicalAddress(mbr), + site_masters.contains(mbr)); + m.addJoined(mi); + } + Message rsp=new ObjectMessage(dest, m).putHeader(TOPO_ID, new TopoHeader(TopoHeader.RSP)); down(rsp); } @@ -846,7 +854,7 @@ protected void deliver(Address dest, Address sender, final Message msg) { try { TopoHeader hdr=msg.getHeader(TOPO_ID); if(hdr != null) { - handleTopo(hdr, sender, msg, dest instanceof SiteMaster); + handleTopo(hdr, sender, msg); return; } Message copy=copy(msg).setDest(dest).setSrc(sender); diff --git a/src/org/jgroups/protocols/relay/Relay2Header.java b/src/org/jgroups/protocols/relay/Relay2Header.java index 99ed84de7d..87f00a93d2 100644 --- a/src/org/jgroups/protocols/relay/Relay2Header.java +++ b/src/org/jgroups/protocols/relay/Relay2Header.java @@ -137,9 +137,9 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { } public String toString() { - return String.format("%s [final dest=%s, original sender=%s, %s%s]", + return String.format("%s [final dest=%s, original sender=%s%s%s]", typeToString(type), final_dest, original_sender, - sites + ", sites,", + sites == null || sites.isEmpty()? "" : String.format(", sites=%s", sites), visited_sites == null || visited_sites.isEmpty()? "" : String.format(", visited=%s", visited_sites)); } diff --git a/src/org/jgroups/protocols/relay/TopoHeader.java b/src/org/jgroups/protocols/relay/TopoHeader.java index 171804a17a..d0d44150b6 100644 --- a/src/org/jgroups/protocols/relay/TopoHeader.java +++ b/src/org/jgroups/protocols/relay/TopoHeader.java @@ -34,6 +34,6 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { } public String toString() { - return type == 0? "topo req" : "topo rsp"; + return type == 0? "TOPO-REQ" : "TOPO-RSP"; } } diff --git a/src/org/jgroups/protocols/relay/Topology.java b/src/org/jgroups/protocols/relay/Topology.java index ecf503f345..2ae36c6540 100644 --- a/src/org/jgroups/protocols/relay/Topology.java +++ b/src/org/jgroups/protocols/relay/Topology.java @@ -10,10 +10,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.BiConsumer; @@ -95,7 +92,7 @@ public Topology removeAll(Collection