Skip to content

Commit

Permalink
- Moved Relay2Header out of RELAY2 into its own class
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 12, 2023
1 parent b5070ee commit 1c3d6e1
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 196 deletions.
2 changes: 1 addition & 1 deletion conf/jg-magic-map.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<class id="76" name="org.jgroups.protocols.RSVP$RsvpHeader"/>
<class id="77" name="org.jgroups.tests.perf.MPerf$MPerfHeader"/>
<class id="78" name="org.jgroups.protocols.pbcast.NakAckHeader2"/>
<class id="80" name="org.jgroups.protocols.relay.RELAY2$Relay2Header"/>
<class id="80" name="org.jgroups.protocols.relay.Relay2Header"/>
<class id="82" name="org.jgroups.protocols.UnicastHeader3"/>
<class id="83" name="org.jgroups.protocols.FORK$ForkHeader"/>
<class id="84" name="org.jgroups.protocols.PERF$PerfHeader"/>
Expand Down
5 changes: 3 additions & 2 deletions src/org/jgroups/demos/RelayDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void receive(Message msg) {
System.out.println("<< " + msg.getObject() + " from " + sender);
Address dst=msg.getDest();
if(dst == null) {
Message rsp=new BytesMessage(msg.getSrc(), "response");
Message rsp=new ObjectMessage(msg.getSrc(), "response");
try {
ch.send(rsp);
}
Expand Down Expand Up @@ -102,7 +102,8 @@ protected void eventLoop(JChannel ch) {
break;
if(process(line)) // see if we have a command, otherwise pass down
continue;
ch.send(null, line);
ObjectMessage msg=new ObjectMessage(null, line);
ch.send(msg);
}
catch(Throwable t) {
t.printStackTrace();
Expand Down
212 changes: 35 additions & 177 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@
import org.jgroups.util.UUID;
import org.jgroups.util.*;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.jgroups.protocols.relay.Relay2Header.SITE_UNREACHABLE;

/**
*
* Design: ./doc/design/RELAY2.txt and at https://github.com/belaban/JGroups/blob/master/doc/design/RELAY2.txt.<p/>
Expand Down Expand Up @@ -154,7 +152,7 @@ public class RELAY2 extends Protocol {
@Deprecated(since="5.2.15")
public boolean relayMulticasts() {return true;}
@Deprecated(since="5.2.15")
public RELAY2 relayMulticasts(boolean flag) { return this;}
public RELAY2 relayMulticasts(boolean flag) {return this;}
public RELAY2 asyncRelayCreation(boolean flag) {async_relay_creation=flag; return this;}
public RELAY2 siteMasterPicker(SiteMasterPicker s) {if(s != null) this.site_master_picker=s; return this;}

Expand Down Expand Up @@ -514,12 +512,10 @@ public Object up(Message msg) {
Relay2Header hdr=msg.getHeader(id);
Address dest=msg.getDest();

if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);
return up_prot.up(msg); // pass up
}
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);

if(handleAdminMessage(hdr, msg.src()))
return null;
if(dest != null)
Expand All @@ -536,19 +532,19 @@ public void up(MessageBatch batch) {
Relay2Header hdr=msg.getHeader(id);
Address dest=msg.getDest();

if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY))
sendToBridges(msg);

if(hdr != null && handleAdminMessage(hdr, batch.sender())) {
it.remove();
continue;
}
else { // header is not null
if(handleAdminMessage(hdr, batch.sender())) {
it.remove();
continue;
}

if(hdr != null) {
it.remove(); // message is consumed
if(dest != null) {
if(hdr.getType() == Relay2Header.SITE_UNREACHABLE) {
if(hdr.getType() == SITE_UNREACHABLE) {
SiteAddress site_addr=(SiteAddress)hdr.final_dest;
String site_name=site_addr.getSite();
if(unreachable_sites == null)
Expand Down Expand Up @@ -626,9 +622,8 @@ public void handleView(View view) {
protected void handleRelayMessage(Relay2Header hdr, Message msg) {
if(hdr.final_dest != null) {
Message message=msg;
Relay2Header header=hdr;

if(header.type == Relay2Header.DATA && can_forward_local_cluster) {
if(hdr.type == Relay2Header.DATA && can_forward_local_cluster) {
SiteUUID site_uuid=(SiteUUID)hdr.final_dest;

// If configured to do so, we want to load-balance these messages,
Expand All @@ -638,19 +633,20 @@ protected void handleRelayMessage(Relay2Header hdr, Message msg) {
// If we select a different address to handle this message, we handle it here.
if(!final_dest.equals(hdr.final_dest)) {
message=copy(msg);
header=new Relay2Header(Relay2Header.DATA, final_dest, hdr.original_sender );
message.putHeader(id, header);
byte type=hdr.type > 0? hdr.type : Relay2Header.DATA;
hdr=new Relay2Header(type, final_dest, hdr.original_sender );
message.putHeader(id, hdr);
}
}
handleMessage(header, message);
handleMessage(hdr, message);
}
else {
Message copy=copy(msg).setDest(null).setSrc(null).putHeader(id, hdr);
down_prot.down(copy); // multicast locally

// Don't forward: https://issues.redhat.com/browse/JGRP-1519
// Changed May 2023: send to all sites - local - hdr.visited_sites
sendToBridges(msg);
//sendToBridges(msg);
}
}

Expand All @@ -659,11 +655,16 @@ protected boolean handleAdminMessage(Relay2Header hdr, Address sender) {
switch(hdr.type) {
case Relay2Header.SITES_UP:
case Relay2Header.SITES_DOWN:
if(route_status_listener != null) {
Set<String> tmp_sites=hdr.getSites();
if(route_status_listener != null && tmp_sites != null) {
tmp_sites.remove(this.site);
if(tmp_sites.isEmpty())
return true;
String[] tmp=tmp_sites.toArray(new String[]{});
if(hdr.type == Relay2Header.SITES_UP)
route_status_listener.sitesUp(hdr.getSites());
route_status_listener.sitesUp(tmp);
else
route_status_listener.sitesDown(hdr.getSites());
route_status_listener.sitesDown(tmp);
}
return true;
case Relay2Header.TOPO_REQ:
Expand All @@ -672,7 +673,7 @@ protected boolean handleAdminMessage(Relay2Header hdr, Address sender) {
down_prot.down(topo_rsp);
return true;
case Relay2Header.TOPO_RSP:
String rsp=hdr.sites != null && hdr.sites.length > 0? hdr.sites[0] : null;
String rsp=hdr.sites != null && !hdr.sites.isEmpty()? hdr.sites.iterator().next() : null;
topo_collector.add(sender, rsp);
return true;
}
Expand All @@ -686,7 +687,7 @@ protected void handleMessage(Relay2Header hdr, Message msg) {
case Relay2Header.DATA:
route((SiteAddress)hdr.final_dest, (SiteAddress)hdr.original_sender, msg);
break;
case Relay2Header.SITE_UNREACHABLE:
case SITE_UNREACHABLE:
triggerSiteUnreachableEvent((SiteAddress)hdr.final_dest);
break;
case Relay2Header.HOST_UNREACHABLE:
Expand Down Expand Up @@ -796,7 +797,7 @@ protected void sendSiteUnreachableTo(Address src, String target_site) {
}
// send message back to the src node.
Message msg=new EmptyMessage(src).setFlag(Message.Flag.OOB)
.putHeader(id,new Relay2Header(Relay2Header.SITE_UNREACHABLE,new SiteMaster(target_site),null));
.putHeader(id,new Relay2Header(SITE_UNREACHABLE, new SiteMaster(target_site), null));
down_prot.down(msg);
}

Expand Down Expand Up @@ -863,7 +864,7 @@ protected void sitesChange(boolean down, String ... sites) {
return;
Relay2Header hdr=new Relay2Header(down? Relay2Header.SITES_DOWN : Relay2Header.SITES_UP, null, null)
.setSites(sites);
down_prot.down(new EmptyMessage(null).putHeader(id, hdr).setFlag(Message.Flag.NO_RELAY));
down_prot.down(new EmptyMessage(null).putHeader(id, hdr)); // .setFlag(Message.Flag.NO_RELAY));
}

/** Copies the message, but only the headers above the current protocol (RELAY) (or RpcDispatcher related headers) */
Expand Down Expand Up @@ -975,147 +976,4 @@ private void triggerSiteUnreachableEvent(SiteAddress remoteSite) {
up_prot.up(new Event(Event.SITE_UNREACHABLE, remoteSite));
}

public static class Relay2Header extends Header {
public static final byte DATA = 1;
public static final byte SITE_UNREACHABLE = 2; // final_dest is a SiteMaster
public static final byte HOST_UNREACHABLE = 3; // final_dest is a SiteUUID (not currently used)
public static final byte SITES_UP = 4;
public static final byte SITES_DOWN = 5;
public static final byte TOPO_REQ = 6;
public static final byte TOPO_RSP = 7;

protected byte type;
protected Address final_dest;
protected Address original_sender;
protected String[] sites; // used with SITES_UP/SITES_DOWN/TOPO_RSP
protected Set<String> visited_sites; // used to record sites to which this msg was already sent
// (https://issues.redhat.com/browse/JGRP-1519)


public Relay2Header() {
}

public Relay2Header(byte type) {
this.type=type;
}

public Relay2Header(byte type, Address final_dest, Address original_sender) {
this(type);
this.final_dest=final_dest;
this.original_sender=original_sender;
}
public short getMagicId() {return 80;}
public Supplier<? extends Header> create() {return Relay2Header::new;}
public byte getType() {return type;}
public Address getFinalDest() {return final_dest;}
public Address getOriginalSender() {return original_sender;}

public Relay2Header setSites(String ... s) {
sites=s;
return this;
}

public String[] getSites() {
return sites;
}

public Relay2Header addToVisitedSites(String s) {
if(visited_sites == null)
visited_sites=new HashSet<>();
visited_sites.add(s);
return this;
}

public Relay2Header addToVisitedSites(Collection<String> list) {
if(list == null || list.isEmpty())
return this;
for(String s: list)
addToVisitedSites(s);
return this;
}

public boolean hasVisitedSites() {return visited_sites != null && !visited_sites.isEmpty();}
public Set<String> getVisitedSites() {return visited_sites;}

@Override
public int serializedSize() {
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) +
sizeOf(sites) + sizeOf(visited_sites);
}

@Override
public void writeTo(DataOutput out) throws IOException {
out.writeByte(type);
Util.writeAddress(final_dest, out);
Util.writeAddress(original_sender, out);
out.writeInt(sites == null? 0 : sites.length);
if(sites != null) {
for(String s: sites)
Bits.writeString(s, out);
}
out.writeInt(visited_sites == null? 0 : visited_sites.size());
if(visited_sites != null) {
for(String s: visited_sites)
Bits.writeString(s, out);
}
}

@Override
public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
type=in.readByte();
final_dest=Util.readAddress(in);
original_sender=Util.readAddress(in);
int num_elements=in.readInt();
if(num_elements > 0) {
sites=new String[num_elements];
for(int i=0; i < sites.length; i++)
sites[i]=Bits.readString(in);
}
num_elements=in.readInt();
if(num_elements > 0) {
visited_sites=new ConcurrentSkipListSet<>();
for(int i=0; i < num_elements; i++)
visited_sites.add(Bits.readString(in));
}
}

public String toString() {
return String.format("%s [final dest=%s, original sender=%s, %s%s%s]",
typeToString(type), final_dest, original_sender,
type == TOPO_RSP? "topos=" : "sites=", Arrays.toString(sites),
visited_sites == null || visited_sites.isEmpty()? "" :
String.format(", visited=%s", visited_sites));
}

protected static String typeToString(byte type) {
switch(type) {
case DATA: return "DATA";
case SITE_UNREACHABLE: return "SITE_UNREACHABLE";
case HOST_UNREACHABLE: return "HOST_UNREACHABLE";
case SITES_UP: return "SITES_UP";
case SITES_DOWN: return "SITES_DOWN";
case TOPO_REQ: return "TOPO_REQ";
case TOPO_RSP: return "TOPO_RSP";
default: return "<unknown>";
}
}

protected static int sizeOf(String[] arr) {
int retval=Global.INT_SIZE; // number of elements
if(arr != null) {
for(String s: arr)
retval+=Bits.sizeUTF(s) + 1; // presence bytes
}
return retval;
}

protected static int sizeOf(Collection<String> list) {
int retval=Global.INT_SIZE; // number of elements
if(list != null) {
for(String s: list)
retval+=Bits.sizeUTF(s) + 1; // presence bytes
}
return retval;
}
}
}
Loading

0 comments on commit 1c3d6e1

Please sign in to comment.