Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Aug 6, 2012
1 parent f685f7a commit 55f9305
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 4 deletions.
99 changes: 97 additions & 2 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.io.DataOutput;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;

Expand Down Expand Up @@ -126,6 +125,23 @@ public String printRoutes() {

public Object down(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
Address dest=msg.getDest();
if(dest == null || !(dest instanceof SiteAddress))
break; // todo: handle multicasts
SiteAddress target=(SiteAddress)dest;
// if(addr.getSite() == site_id) // same site: local; pass down
// break;
byte[] buf=marshal(msg);
if(buf == null)
return null; // don't pass down
SiteUUID sender=new SiteUUID((UUID)local_addr, site_id);
if(local_addr.equals(coord))
route(target, sender, buf);
else
forwardToCoordinator(target, sender, buf);
return null;
case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
break;
Expand All @@ -139,13 +155,92 @@ public Object down(Event evt) {

public Object up(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
Relay2Header hdr=(Relay2Header)msg.getHeader(id);
if(hdr == null)
break;

System.out.println("[" + local_addr + "] received message with Relay2Header " + hdr);
handleMessage(hdr, msg);
return null;
case Event.VIEW_CHANGE:
handleView((View)evt.getArg());
break;
}
return up_prot.up(evt);
}

/** Called to handle a received relay message */
protected void handleMessage(Relay2Header hdr, Message msg) {
System.out.println("**** handleMessage()");
}


protected void forwardToCoordinator(SiteAddress dest, Address sender, byte[] buf) {
Message msg=new Message(coord, buf);
Relay2Header hdr=new Relay2Header(Relay2Header.DATA, dest, sender);
msg.putHeader(id, hdr);
down_prot.down(new Event(Event.MSG, msg));
}


/**
* Routes the message to the target destination, used by a site master (coordinator)
* @param dest
* @param buf
*/
protected void route(SiteAddress dest, SiteUUID sender, byte[] buf) {
short target_site=dest.getSite();
if(target_site == site_id) {
deliverLocally(dest, sender, buf);
return;
}
Relayer tmp=relayer;
if(tmp == null) {
log.warn("not site master; dropping message");
return;
}
Relayer.Route route=relayer.getRoute(target_site);
relay(dest, sender, route, buf);
}

protected void deliverLocally(SiteAddress dest, SiteUUID sender, byte[] buf) {

}

protected void relay(SiteAddress to, SiteAddress from, Relayer.Route route, byte[] buf) {
if(route == null) {
log.warn("route for site" + to.getSite() + " not found; dropping message");
return;
}
RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, to, from);
Message msg=new Message(route.site_master, buf);
msg.putHeader(id, hdr);
try {
route.bridge.send(msg);
}
catch(Exception e) {
log.error("failure relaying message", e);
}
}


protected byte[] marshal(Message msg) {
Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks
// setting dest and src to null reduces the serialized size of the message; we'll set dest/src from the header later
tmp.setDest(null);
tmp.setSrc(null);

try {
return Util.streamableToByteBuffer(tmp);
}
catch(Exception e) {
log.error("marshalling failure", e);
return null;
}
}



protected void handleView(View view) {
Expand All @@ -159,7 +254,7 @@ protected void handleView(View view) {
if(become_coord) {
is_coord=true;
String bridge_name="_" + UUID.get(local_addr);
relayer=new Relayer(site_config, bridge_name, log);
relayer=new Relayer(site_config, bridge_name, log, this);
try {
if(log.isTraceEnabled())
log.trace("I became site master; starting bridges");
Expand Down
19 changes: 17 additions & 2 deletions src/org/jgroups/protocols/relay/Relayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.jgroups.*;
import org.jgroups.logging.Log;
import org.jgroups.protocols.RELAY;
import org.jgroups.protocols.relay.config.RelayConfig;
import org.jgroups.stack.AddressGenerator;
import org.jgroups.util.UUID;
Expand Down Expand Up @@ -32,9 +33,12 @@ public class Relayer {

protected final Log log;

protected final RELAY2 relay;

public Relayer(RelayConfig.SiteConfig site_config, String local_name, Log log) {

public Relayer(RelayConfig.SiteConfig site_config, String local_name, Log log, RELAY2 relay) {
this.site_config=site_config;
this.relay=relay;
int num_routes=site_config.getBridges().size();
my_site_id=site_config.getId();
routes=new Route[num_routes];
Expand Down Expand Up @@ -118,6 +122,12 @@ protected Route removeRoute(short site) {
return null;
}

protected Route getRoute(short site) {
if(site <= routes.length -1)
return routes[site];
return null;
}

protected void ensureCapacity(short site) {
if(site >= routes.length-1) {
Route[] tmp_routes=new Route[routes.length * 2];
Expand Down Expand Up @@ -168,7 +178,12 @@ protected void stop() {
}

public void receive(Message msg) {

RELAY2.Relay2Header hdr=(RELAY2.Relay2Header)msg.getHeader(relay.getId());
if(hdr == null) {
log.warn("received a message without a relay header; discarding it");
return;
}
relay.handleMessage(hdr, msg);
}

public void viewAccepted(View view) {
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/relay/SiteMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public int compareTo(Address other) {
if(other instanceof UUID)
return super.compareTo(other);
SiteMaster tmp=(SiteMaster)other;
if(tmp == null) return 1;
return site == tmp.site? 0 : site < tmp.site? -1 : 1;
}

Expand Down

0 comments on commit 55f9305

Please sign in to comment.