Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 22, 2023
1 parent 6e54fdb commit f34e0ea
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 29 deletions.
40 changes: 24 additions & 16 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.jgroups.conf.ConfiguratorFactory;
import org.jgroups.conf.XmlNode;
import org.jgroups.protocols.relay.Topology.MemberInfo;
import org.jgroups.protocols.relay.Topology.Members;
import org.jgroups.protocols.relay.config.RelayConfig;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
Expand Down Expand Up @@ -502,7 +503,7 @@ public Object up(Message msg) {
if(hdr == null) {
TopoHeader topo_hdr=msg.getHeader(TOPO_ID);
if(topo_hdr != null) {
handleTopo(topo_hdr, sender, msg, false);
handleTopo(topo_hdr, sender, msg);
return null;
}
}
Expand Down Expand Up @@ -531,7 +532,7 @@ public void up(MessageBatch batch) {
if(hdr == null) {
TopoHeader topo_hdr=msg.getHeader(TOPO_ID);
if(topo_hdr != null) {
handleTopo(topo_hdr, sender, msg, false);
handleTopo(topo_hdr, sender, msg);
it.remove();
}
}
Expand Down Expand Up @@ -564,27 +565,34 @@ public void up(MessageBatch batch) {
up_prot.up(batch);
}

protected void handleTopo(TopoHeader hdr, Address sender, Message msg, boolean send_rsps_for_entire_view) {
protected void handleTopo(TopoHeader hdr, Address sender, Message msg) {
switch(hdr.type()) {
case TopoHeader.REQ:
List<Address> targets=send_rsps_for_entire_view? new ArrayList<>(members) : List.of(local_addr);
for(Address t: targets)
sendResponseFor(t, sender);
if(is_site_master)
sendResponseFor(members, sender);
break;
case TopoHeader.RSP:
MemberInfo mi=msg.getObject();
topo.handleResponse(mi.site(), mi);
Members mbrs=msg.getObject();

System.out.printf("** response from %s: mi=%s\n", sender, mbrs);
if(mbrs.joined != null) {
for(MemberInfo mi: mbrs.joined)
topo.handleResponse(mbrs.site, mi);
}
break;
}
}

protected void sendResponseFor(Address mbr, Address dest) {
SiteAddress my_addr=mbr instanceof SiteMaster? new SiteMaster(((SiteMaster)mbr).getSite())
: new SiteUUID((UUID)mbr, NameCache.get(mbr), site);
MemberInfo mi=new MemberInfo(this.site, my_addr,
(IpAddress)getPhysicalAddress(mbr),
site_masters.contains(mbr));
Message rsp=new ObjectMessage(dest, mi).putHeader(TOPO_ID, new TopoHeader(TopoHeader.RSP));
protected void sendResponseFor(List<Address> mbrs, Address dest) {
Members m=new Members(this.site);
for(Address mbr: mbrs) {
SiteAddress my_addr=mbr instanceof SiteMaster? new SiteMaster(((SiteMaster)mbr).getSite())
: new SiteUUID((UUID)mbr, NameCache.get(mbr), site);
MemberInfo mi=new MemberInfo(this.site, my_addr, (IpAddress)getPhysicalAddress(mbr),
site_masters.contains(mbr));
m.addJoined(mi);
}
Message rsp=new ObjectMessage(dest, m).putHeader(TOPO_ID, new TopoHeader(TopoHeader.RSP));
down(rsp);
}

Expand Down Expand Up @@ -846,7 +854,7 @@ protected void deliver(Address dest, Address sender, final Message msg) {
try {
TopoHeader hdr=msg.getHeader(TOPO_ID);
if(hdr != null) {
handleTopo(hdr, sender, msg, dest instanceof SiteMaster);
handleTopo(hdr, sender, msg);
return;
}
Message copy=copy(msg).setDest(dest).setSrc(sender);
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/relay/Relay2Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
}

public String toString() {
return String.format("%s [final dest=%s, original sender=%s, %s%s]",
return String.format("%s [final dest=%s, original sender=%s%s%s]",
typeToString(type), final_dest, original_sender,
sites + ", sites,",
sites == null || sites.isEmpty()? "" : String.format(", sites=%s", sites),
visited_sites == null || visited_sites.isEmpty()? "" :
String.format(", visited=%s", visited_sites));
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/relay/TopoHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
}

public String toString() {
return type == 0? "topo req" : "topo rsp";
return type == 0? "TOPO-REQ" : "TOPO-RSP";
}
}
90 changes: 80 additions & 10 deletions src/org/jgroups/protocols/relay/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -95,7 +92,7 @@ public Topology removeAll(Collection<String> sites) {
public Topology adjust(String site, Collection<Address> mbrs) {
Set<MemberInfo> list=cache.get(site);
if(list != null && mbrs != null)
list.removeIf(mi -> !mbrs.contains(mi.address()));
list.removeIf(mi -> !mbrs.contains(mi.addr));
return this;
}

Expand Down Expand Up @@ -128,6 +125,84 @@ public interface ResponseHandler {
void handle(String site, MemberInfo rsp);
}

/** Contains information about joined and left members for a given site */
public static class Members implements SizeStreamable {
protected String site;
protected List<MemberInfo> joined;
protected List<Address> left;

public Members() {}
public Members(String site) {this.site=site;}

public Members addJoined(MemberInfo mi) {
if(this.joined == null)
this.joined=new ArrayList<>();
this.joined.add(Objects.requireNonNull(mi));
return this;
}

public Members addLeft(Address left) {
if(this.left == null)
this.left=new ArrayList<>();
this.left.add(Objects.requireNonNull(left));
return this;
}

public int serializedSize() {
int size=Util.size(site) + Short.BYTES * 2;
if(joined != null && !joined.isEmpty()) {
for(MemberInfo mi: joined)
size+=mi.serializedSize();
}
if(left != null && !left.isEmpty()) {
for(Address addr: left)
size+=Util.size(addr);
}
return size;
}

public void writeTo(DataOutput out) throws IOException {
Bits.writeString(site, out);
if(joined == null || joined.isEmpty())
out.writeShort(0);
else {
out.writeShort(joined.size());
for(MemberInfo mi: joined)
mi.writeTo(out);
}
if(left == null || left.isEmpty())
out.writeShort(0);
else {
out.writeShort(left.size());
for(Address addr: left)
Util.writeAddress(addr, out);
}
}

public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
site=Bits.readString(in);
short len=in.readShort();
if(len > 0) {
joined=new ArrayList<>(len);
for(int i=0; i < len; i++) {
MemberInfo mi=new MemberInfo();
mi.readFrom(in);
joined.add(mi);
}
}
len=in.readShort();
if(len > 0) {
left=new ArrayList<>(len);
Address addr=Util.readAddress(in);
left.add(addr);
}
}

public String toString() {
return String.format("site %s: %d members joined %d left", site, joined == null? 0 : joined.size(),
left == null? 0 : left.size());
}
}

public static class MemberInfo implements SizeStreamable, Comparable<MemberInfo> {
protected String site;
Expand All @@ -145,11 +220,6 @@ public MemberInfo(String site, Address addr, IpAddress ip_addr, boolean site_mas
this.site_master=site_master;
}

public String site() {return site;}
public Address address() {return addr;}
public IpAddress ipAddress() {return ip_addr;}
public boolean siteMaster() {return site_master;}

@Override
public int hashCode() {
return addr.hashCode();
Expand Down
20 changes: 20 additions & 0 deletions tests/junit-functional/org/jgroups/tests/SizeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.jgroups.protocols.relay.Relay2Header;
import org.jgroups.protocols.relay.SiteMaster;
import org.jgroups.protocols.relay.SiteUUID;
import org.jgroups.protocols.relay.Topology;
import org.jgroups.protocols.relay.Topology.MemberInfo;
import org.jgroups.protocols.relay.Topology.Members;
import org.jgroups.stack.GossipData;
import org.jgroups.stack.GossipType;
import org.jgroups.stack.IpAddress;
Expand Down Expand Up @@ -627,6 +629,24 @@ public void testMemberInfo() throws Exception {
assert m1.equals(m2);
}

public void testMembers() throws Exception {
Members mbrs=new Members();
_testSize(mbrs);

mbrs=new Members("nyc");
_testSize(mbrs);

Address[] addrs=new Address[5];
for(int i=0; i < addrs.length; i++)
addrs[i]=Util.createRandomAddress();
mbrs.addJoined(new MemberInfo("net1", addrs[0], new IpAddress("127.0.0.1", 7000), true));
mbrs.addJoined(new MemberInfo("net2", addrs[1], new IpAddress("127.0.0.1", 8000), true));
mbrs.addJoined(new MemberInfo("hf", addrs[2], new IpAddress("127.0.0.1", 9000), false));
_testSize(mbrs);
mbrs.addLeft(addrs[3]).addLeft(addrs[4]);
_testSize(mbrs);
}


public void testEncryptHeader() throws Exception {
EncryptHeader hdr=new EncryptHeader((byte)0, new byte[]{'b','e', 'l', 'a'}, new byte[]{'b', 'a', 'n'});
Expand Down

0 comments on commit f34e0ea

Please sign in to comment.