Skip to content

Commit

Permalink
First successful sending of a unicast to a remote SiteMaster and rece…
Browse files Browse the repository at this point in the history
…ption of the unicast response
  • Loading branch information
belaban committed Aug 21, 2012
1 parent 8a1d3a0 commit eca5198
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 46 deletions.
106 changes: 69 additions & 37 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class RELAY2 extends Protocol {


/* --------------------------------------------- Fields ------------------------------------------------ */
@ManagedAttribute(description="The site-ID")
@ManagedAttribute(description="My site-ID")
protected short site_id=-1;

protected Map<String,RelayConfig.SiteConfig> sites;
Expand Down Expand Up @@ -129,25 +129,30 @@ public Object down(Event evt) {
Message msg=(Message)evt.getArg();
Address dest=msg.getDest();
if(dest == null || !(dest instanceof SiteAddress))
break; // todo: handle multicasts
break;
SiteAddress target=(SiteAddress)dest;
// if(addr.getSite() == site_id) // same site: local; pass down
// break;
SiteUUID sender=null;
if(msg.getSrc() == null) {
sender=new SiteUUID((UUID)local_addr, site_id);
msg.setSrc(sender);
}

byte[] buf=marshal(msg);
if(buf == null)
return null; // don't pass down
if(sender == null)
sender=new SiteUUID((UUID)local_addr, site_id);
if(local_addr.equals(coord))
route(target, sender, buf);
SiteAddress sender=new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id);

// target is in the same site; we can deliver the message locally
if(target.getSite() == site_id) {
if(local_addr.equals(target))
deliver(target, sender, buf);
else
deliverLocally(target, sender, buf);
return null;
}

// forward to the coordinator unless we're the coord (then route the message directly)
if(!is_coord)
forwardTo(coord, target, sender, buf, null);
else
forwardToCoordinator(target, sender, buf);
route(target, sender, buf);
return null;

case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
break;
Expand Down Expand Up @@ -179,29 +184,27 @@ public Object up(Event evt) {

/** Called to handle a received relay message */
protected void handleMessage(Relay2Header hdr, Message msg) {
System.out.println("**** handleMessage(): to=" + hdr.dest + ", from=" + hdr.sender + ", " + msg.getLength() + " bytes " +
System.out.println("**** handleMessage(): to=" + hdr.final_dest + ", from=" + hdr.original_sender + ", " + msg.getLength() + " bytes " +
"(forwarder=" + msg.getSrc() + ")");

}


protected void forwardToCoordinator(SiteAddress dest, Address sender, byte[] buf) {
Message msg=new Message(coord, buf);
Relay2Header hdr=new Relay2Header(Relay2Header.DATA, dest, sender);
msg.putHeader(id, hdr);
down_prot.down(new Event(Event.MSG, msg));
route((SiteAddress)hdr.final_dest, (SiteAddress)hdr.original_sender, msg.getBuffer());
}



/**
* Routes the message to the target destination, used by a site master (coordinator)
* @param dest
* @param buf
*/
protected void route(SiteAddress dest, SiteUUID sender, byte[] buf) {
protected void route(SiteAddress dest, SiteAddress sender, byte[] buf) {
short target_site=dest.getSite();
if(target_site == site_id) {
deliverLocally(dest, sender, buf);
if(local_addr.equals(dest) || ((dest instanceof SiteMaster) && is_coord)) {
deliver(dest, sender, buf);
}
else
deliverLocally(dest, sender, buf); // send to member in same local site
return;
}
Relayer tmp=relayer;
Expand All @@ -213,10 +216,39 @@ protected void route(SiteAddress dest, SiteUUID sender, byte[] buf) {
relay(dest, sender, route, buf);
}

protected void deliverLocally(SiteAddress dest, SiteUUID sender, byte[] buf) {
protected void forwardTo(Address next_dest, SiteAddress final_dest, Address original_sender, byte[] buf, JChannel ch) {
Message msg=new Message(next_dest, buf);
Relay2Header hdr=new Relay2Header(Relay2Header.DATA, final_dest, original_sender);
msg.putHeader(id, hdr);
if(ch != null) {
try {
ch.send(msg);
}
catch(Exception e) {
log.error("failed forwarding message to " + final_dest, e);
}
}
else
down_prot.down(new Event(Event.MSG, msg));
}


protected void deliverLocally(SiteAddress dest, SiteAddress sender, byte[] buf) {

}

protected void deliver(Address dest, Address sender, byte[] buf) {
try {
Message original_msg=(Message)Util.streamableFromByteBuffer(Message.class, buf);
original_msg.setSrc(sender);
original_msg.setDest(dest);
up_prot.up(new Event(Event.MSG, original_msg));
}
catch(Exception e) {
log.error("failed unmarshalling message", e);
}
}

protected void relay(SiteAddress to, SiteAddress from, Relayer.Route route, byte[] buf) {
if(route == null) {
log.warn("route for site" + to.getSite() + " not found; dropping message");
Expand Down Expand Up @@ -288,37 +320,37 @@ public static class Relay2Header extends Header {
public static final byte DATA = 1;

protected byte type;
protected Address dest;
protected Address sender;
protected Address final_dest;
protected Address original_sender;


public Relay2Header() {
}

public Relay2Header(byte type, Address dest, Address sender) {
public Relay2Header(byte type, Address final_dest, Address original_sender) {
this.type=type;
this.dest=dest;
this.sender=sender;
this.final_dest=final_dest;
this.original_sender=original_sender;
}

public int size() {
return Global.BYTE_SIZE + Util.size(dest) + Util.size(sender);
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender);
}

public void writeTo(DataOutput out) throws Exception {
out.writeByte(type);
Util.writeAddress(dest, out);
Util.writeAddress(sender, out);
Util.writeAddress(final_dest, out);
Util.writeAddress(original_sender, out);
}

public void readFrom(DataInput in) throws Exception {
type=in.readByte();
dest=Util.readAddress(in);
sender=Util.readAddress(in);
final_dest=Util.readAddress(in);
original_sender=Util.readAddress(in);
}

public String toString() {
return typeToString(type) + " [dest=" + dest + ", sender=" + sender + "]";
return typeToString(type) + " [dest=" + final_dest + ", sender=" + original_sender + "]";
}

protected static String typeToString(byte type) {
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/relay/Relayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void start() throws Throwable {
new AddressGenerator() {
public Address generateAddress() {
UUID uuid=UUID.randomUUID();
return new SiteUUID(uuid, my_site_id);
return new SiteUUID(uuid, null, my_site_id);
}
});
bridges.add(bridge);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/relay/SiteMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public SiteMaster() {
}

public SiteMaster(short site) {
super(0, 0, site);
super(0, 0, null, site);
}

public SiteMaster(String site) {
Expand Down
22 changes: 16 additions & 6 deletions src/org/jgroups/protocols/relay/SiteUUID.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.jgroups.Global;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;

import java.io.*;
import java.util.Map;
Expand All @@ -14,8 +15,11 @@
* @since 3.2
*/
public class SiteUUID extends UUID implements SiteAddress {
private static final long serialVersionUID=-575018477146695139L;
private static final long serialVersionUID=3748908939644729773L;
protected String name; // logical name, can be null

protected short site;

// Maps between site-IDs (shorts) and site names
protected static final ConcurrentMap<Short,String> site_cache=new ConcurrentHashMap<Short,String>(10);

Expand All @@ -24,13 +28,15 @@ public SiteUUID() {
}


public SiteUUID(long mostSigBits, long leastSigBits, short site) {
public SiteUUID(long mostSigBits, long leastSigBits, String name, short site) {
super(mostSigBits,leastSigBits);
this.name=name;
this.site=site;
}

public SiteUUID(UUID uuid, short site) {
public SiteUUID(UUID uuid, String name, short site) {
super(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
this.name=name;
this.site=site;
}

Expand Down Expand Up @@ -63,7 +69,7 @@ public short getSite() {
// }

public UUID copy() {
return new SiteUUID(mostSigBits, leastSigBits, site);
return new SiteUUID(mostSigBits, leastSigBits, name, site);
}

// public boolean equals(Object obj) {
Expand All @@ -76,31 +82,35 @@ public UUID copy() {

public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
name=in.readUTF();
site=in.readShort();
}

public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeUTF(name);
out.writeShort(site);
}

public void readFrom(DataInput in) throws Exception {
super.readFrom(in);
name=Util.readString(in);
site=in.readShort();
}

public void writeTo(DataOutput out) throws Exception {
super.writeTo(out);
Util.writeString(name, out);
out.writeShort(site);
}

public int size() {
return super.size() + Global.SHORT_SIZE;
return super.size() + Util.size(name) + Global.SHORT_SIZE;
}


public String toString() {
String retval=super.toString();
String retval=name != null? name : super.toString();
String suffix=site_cache.get(site);
return retval + ":" + (suffix != null? suffix : String.valueOf(site));
}
Expand Down
2 changes: 1 addition & 1 deletion tests/junit-functional/org/jgroups/tests/SizeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public static void testRelay2Header() throws Exception {
Address dest=new SiteMaster((short)0);
RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, dest, null);
_testSize(hdr);
Address sender=new SiteUUID(UUID.randomUUID(), (short)1);
Address sender=new SiteUUID(UUID.randomUUID(), "dummy", (short)1);
hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, dest, sender);
_testSize(hdr);
}
Expand Down

0 comments on commit eca5198

Please sign in to comment.