From f34e0ead39c9ce16fb3f4eade478e1ee1faa1ef0 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 22 Jun 2023 14:33:18 +0200 Subject: [PATCH] ns --- src/org/jgroups/protocols/relay/RELAY2.java | 40 +++++---- .../jgroups/protocols/relay/Relay2Header.java | 4 +- .../jgroups/protocols/relay/TopoHeader.java | 2 +- src/org/jgroups/protocols/relay/Topology.java | 90 ++++++++++++++++--- .../org/jgroups/tests/SizeTest.java | 20 +++++ 5 files changed, 127 insertions(+), 29 deletions(-) diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index b010434440..3d40da418d 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -6,6 +6,7 @@ import org.jgroups.conf.ConfiguratorFactory; import org.jgroups.conf.XmlNode; import org.jgroups.protocols.relay.Topology.MemberInfo; +import org.jgroups.protocols.relay.Topology.Members; import org.jgroups.protocols.relay.config.RelayConfig; import org.jgroups.stack.IpAddress; import org.jgroups.stack.Protocol; @@ -502,7 +503,7 @@ public Object up(Message msg) { if(hdr == null) { TopoHeader topo_hdr=msg.getHeader(TOPO_ID); if(topo_hdr != null) { - handleTopo(topo_hdr, sender, msg, false); + handleTopo(topo_hdr, sender, msg); return null; } } @@ -531,7 +532,7 @@ public void up(MessageBatch batch) { if(hdr == null) { TopoHeader topo_hdr=msg.getHeader(TOPO_ID); if(topo_hdr != null) { - handleTopo(topo_hdr, sender, msg, false); + handleTopo(topo_hdr, sender, msg); it.remove(); } } @@ -564,27 +565,34 @@ public void up(MessageBatch batch) { up_prot.up(batch); } - protected void handleTopo(TopoHeader hdr, Address sender, Message msg, boolean send_rsps_for_entire_view) { + protected void handleTopo(TopoHeader hdr, Address sender, Message msg) { switch(hdr.type()) { case TopoHeader.REQ: - List
targets=send_rsps_for_entire_view? new ArrayList<>(members) : List.of(local_addr); - for(Address t: targets) - sendResponseFor(t, sender); + if(is_site_master) + sendResponseFor(members, sender); break; case TopoHeader.RSP: - MemberInfo mi=msg.getObject(); - topo.handleResponse(mi.site(), mi); + Members mbrs=msg.getObject(); + + System.out.printf("** response from %s: mi=%s\n", sender, mbrs); + if(mbrs.joined != null) { + for(MemberInfo mi: mbrs.joined) + topo.handleResponse(mbrs.site, mi); + } break; } } - protected void sendResponseFor(Address mbr, Address dest) { - SiteAddress my_addr=mbr instanceof SiteMaster? new SiteMaster(((SiteMaster)mbr).getSite()) - : new SiteUUID((UUID)mbr, NameCache.get(mbr), site); - MemberInfo mi=new MemberInfo(this.site, my_addr, - (IpAddress)getPhysicalAddress(mbr), - site_masters.contains(mbr)); - Message rsp=new ObjectMessage(dest, mi).putHeader(TOPO_ID, new TopoHeader(TopoHeader.RSP)); + protected void sendResponseFor(List
mbrs, Address dest) { + Members m=new Members(this.site); + for(Address mbr: mbrs) { + SiteAddress my_addr=mbr instanceof SiteMaster? new SiteMaster(((SiteMaster)mbr).getSite()) + : new SiteUUID((UUID)mbr, NameCache.get(mbr), site); + MemberInfo mi=new MemberInfo(this.site, my_addr, (IpAddress)getPhysicalAddress(mbr), + site_masters.contains(mbr)); + m.addJoined(mi); + } + Message rsp=new ObjectMessage(dest, m).putHeader(TOPO_ID, new TopoHeader(TopoHeader.RSP)); down(rsp); } @@ -846,7 +854,7 @@ protected void deliver(Address dest, Address sender, final Message msg) { try { TopoHeader hdr=msg.getHeader(TOPO_ID); if(hdr != null) { - handleTopo(hdr, sender, msg, dest instanceof SiteMaster); + handleTopo(hdr, sender, msg); return; } Message copy=copy(msg).setDest(dest).setSrc(sender); diff --git a/src/org/jgroups/protocols/relay/Relay2Header.java b/src/org/jgroups/protocols/relay/Relay2Header.java index 99ed84de7d..87f00a93d2 100644 --- a/src/org/jgroups/protocols/relay/Relay2Header.java +++ b/src/org/jgroups/protocols/relay/Relay2Header.java @@ -137,9 +137,9 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { } public String toString() { - return String.format("%s [final dest=%s, original sender=%s, %s%s]", + return String.format("%s [final dest=%s, original sender=%s%s%s]", typeToString(type), final_dest, original_sender, - sites + ", sites,", + sites == null || sites.isEmpty()? "" : String.format(", sites=%s", sites), visited_sites == null || visited_sites.isEmpty()? "" : String.format(", visited=%s", visited_sites)); } diff --git a/src/org/jgroups/protocols/relay/TopoHeader.java b/src/org/jgroups/protocols/relay/TopoHeader.java index 171804a17a..d0d44150b6 100644 --- a/src/org/jgroups/protocols/relay/TopoHeader.java +++ b/src/org/jgroups/protocols/relay/TopoHeader.java @@ -34,6 +34,6 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { } public String toString() { - return type == 0? "topo req" : "topo rsp"; + return type == 0? "TOPO-REQ" : "TOPO-RSP"; } } diff --git a/src/org/jgroups/protocols/relay/Topology.java b/src/org/jgroups/protocols/relay/Topology.java index ecf503f345..2ae36c6540 100644 --- a/src/org/jgroups/protocols/relay/Topology.java +++ b/src/org/jgroups/protocols/relay/Topology.java @@ -10,10 +10,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.BiConsumer; @@ -95,7 +92,7 @@ public Topology removeAll(Collection sites) { public Topology adjust(String site, Collection
mbrs) { Set list=cache.get(site); if(list != null && mbrs != null) - list.removeIf(mi -> !mbrs.contains(mi.address())); + list.removeIf(mi -> !mbrs.contains(mi.addr)); return this; } @@ -128,6 +125,84 @@ public interface ResponseHandler { void handle(String site, MemberInfo rsp); } + /** Contains information about joined and left members for a given site */ + public static class Members implements SizeStreamable { + protected String site; + protected List joined; + protected List
left; + + public Members() {} + public Members(String site) {this.site=site;} + + public Members addJoined(MemberInfo mi) { + if(this.joined == null) + this.joined=new ArrayList<>(); + this.joined.add(Objects.requireNonNull(mi)); + return this; + } + + public Members addLeft(Address left) { + if(this.left == null) + this.left=new ArrayList<>(); + this.left.add(Objects.requireNonNull(left)); + return this; + } + + public int serializedSize() { + int size=Util.size(site) + Short.BYTES * 2; + if(joined != null && !joined.isEmpty()) { + for(MemberInfo mi: joined) + size+=mi.serializedSize(); + } + if(left != null && !left.isEmpty()) { + for(Address addr: left) + size+=Util.size(addr); + } + return size; + } + + public void writeTo(DataOutput out) throws IOException { + Bits.writeString(site, out); + if(joined == null || joined.isEmpty()) + out.writeShort(0); + else { + out.writeShort(joined.size()); + for(MemberInfo mi: joined) + mi.writeTo(out); + } + if(left == null || left.isEmpty()) + out.writeShort(0); + else { + out.writeShort(left.size()); + for(Address addr: left) + Util.writeAddress(addr, out); + } + } + + public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + site=Bits.readString(in); + short len=in.readShort(); + if(len > 0) { + joined=new ArrayList<>(len); + for(int i=0; i < len; i++) { + MemberInfo mi=new MemberInfo(); + mi.readFrom(in); + joined.add(mi); + } + } + len=in.readShort(); + if(len > 0) { + left=new ArrayList<>(len); + Address addr=Util.readAddress(in); + left.add(addr); + } + } + + public String toString() { + return String.format("site %s: %d members joined %d left", site, joined == null? 0 : joined.size(), + left == null? 0 : left.size()); + } + } public static class MemberInfo implements SizeStreamable, Comparable { protected String site; @@ -145,11 +220,6 @@ public MemberInfo(String site, Address addr, IpAddress ip_addr, boolean site_mas this.site_master=site_master; } - public String site() {return site;} - public Address address() {return addr;} - public IpAddress ipAddress() {return ip_addr;} - public boolean siteMaster() {return site_master;} - @Override public int hashCode() { return addr.hashCode(); diff --git a/tests/junit-functional/org/jgroups/tests/SizeTest.java b/tests/junit-functional/org/jgroups/tests/SizeTest.java index 2ebb57b2bc..1066866843 100644 --- a/tests/junit-functional/org/jgroups/tests/SizeTest.java +++ b/tests/junit-functional/org/jgroups/tests/SizeTest.java @@ -12,7 +12,9 @@ import org.jgroups.protocols.relay.Relay2Header; import org.jgroups.protocols.relay.SiteMaster; import org.jgroups.protocols.relay.SiteUUID; +import org.jgroups.protocols.relay.Topology; import org.jgroups.protocols.relay.Topology.MemberInfo; +import org.jgroups.protocols.relay.Topology.Members; import org.jgroups.stack.GossipData; import org.jgroups.stack.GossipType; import org.jgroups.stack.IpAddress; @@ -627,6 +629,24 @@ public void testMemberInfo() throws Exception { assert m1.equals(m2); } + public void testMembers() throws Exception { + Members mbrs=new Members(); + _testSize(mbrs); + + mbrs=new Members("nyc"); + _testSize(mbrs); + + Address[] addrs=new Address[5]; + for(int i=0; i < addrs.length; i++) + addrs[i]=Util.createRandomAddress(); + mbrs.addJoined(new MemberInfo("net1", addrs[0], new IpAddress("127.0.0.1", 7000), true)); + mbrs.addJoined(new MemberInfo("net2", addrs[1], new IpAddress("127.0.0.1", 8000), true)); + mbrs.addJoined(new MemberInfo("hf", addrs[2], new IpAddress("127.0.0.1", 9000), false)); + _testSize(mbrs); + mbrs.addLeft(addrs[3]).addLeft(addrs[4]); + _testSize(mbrs); + } + public void testEncryptHeader() throws Exception { EncryptHeader hdr=new EncryptHeader((byte)0, new byte[]{'b','e', 'l', 'a'}, new byte[]{'b', 'a', 'n'});