Skip to content

Commit

Permalink
First commit of batching for RELAY2 (https://issues.jboss.org/browse/…
Browse files Browse the repository at this point in the history
…JGRP-1528); Relay2Test doesn't pass yet
  • Loading branch information
belaban committed Nov 13, 2012
1 parent 708ffcf commit 5b20e8d
Show file tree
Hide file tree
Showing 4 changed files with 413 additions and 107 deletions.
121 changes: 66 additions & 55 deletions src/org/jgroups/protocols/relay/RELAY2.java
Expand Up @@ -10,6 +10,7 @@
import org.jgroups.protocols.relay.config.RelayConfig;
import org.jgroups.stack.AddressGenerator;
import org.jgroups.stack.Protocol;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.TopologyUUID;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;
Expand Down Expand Up @@ -42,17 +43,29 @@ public class RELAY2 extends Protocol {

@Property(description="Whether or not we generate our own addresses in which we use can_become_site_master. " +
"If this property is false, can_become_site_master is ignored")
protected boolean enable_address_tagging=true;
protected boolean enable_address_tagging=false;

@Property(description="Whether or not to relay multicast (dest=null) messages")
protected boolean relay_multicasts=true;

@Property(description="The number of tries to forward a message to a remote site")
@Property(description="The number of tries to forward a message to a remote site",
deprecatedMessage="not used anymore, will be ignored")
protected int max_forward_attempts=5;

@Property(description="The time (in milliseconds) to sleep between forward attempts")
@Property(description="The time (in milliseconds) to sleep between forward attempts",
deprecatedMessage="not used anymore, will be ignored")
protected long forward_sleep=1000;

@Property(description="Max number of messages in the foward queue. Messages are added to the forward queue " +
"when the status of a route went from UP to UNKNOWN and the queue is flushed when the status goes to " +
"UP (resending all queued messages) or DOWN (sending SITE-UNREACHABLE messages to the senders)")
protected int fwd_queue_max_size=2000;

@Property(description="Number of millisconds to wait when the status for a site changed from UP to UNKNOWN " +
"before that site is declared DOWN. A site that's DOWN triggers immediate sending of a SITE-UNREACHABLE message " +
"back to the sender of a message to that site")
protected long site_down_timeout=8000;


/* --------------------------------------------- Fields ------------------------------------------------ */
@ManagedAttribute(description="My site-ID")
Expand All @@ -70,34 +83,57 @@ public class RELAY2 extends Protocol {

protected Relayer relayer;

protected TimeScheduler timer;

protected volatile Address local_addr;

/** Whether or not FORWARD_TO_COORD is on the stack */
@ManagedAttribute(description="FORWARD_TO_COORD protocol is present below the current protocol")
protected boolean forwarding_protocol_present;



// Fluent configuration
public RELAY2 site(String site_name) {site=site_name; return this;}
public RELAY2 config(String cfg) {config=cfg; return this;}
public RELAY2 canBecomeSiteMaster(boolean flag) {can_become_site_master=flag; return this;}
public RELAY2 enableAddressTagging(boolean flag) {enable_address_tagging=flag; return this;}
public RELAY2 relayMulticasts(boolean flag) {relay_multicasts=flag; return this;}
public RELAY2 maxForwardAttempts(int num) {max_forward_attempts=num; return this;}
public RELAY2 forwardSleep(long time) {forward_sleep=time; return this;}
@Deprecated
public RELAY2 maxForwardAttempts(int num) { return this;}
@Deprecated
public RELAY2 forwardSleep(long time) { return this;}
public RELAY2 forwardQueueMaxSize(int size) {fwd_queue_max_size=size; return this;}
public RELAY2 siteDownTimeout(long timeout) {site_down_timeout=timeout; return this;}

public RELAY2 addSite(String site_name, RelayConfig.SiteConfig cfg) {
sites.put(site_name, cfg);
return this;
}
public String site() {return site;}
public String config() {return config;}
public boolean canBecomeSiteMaster() {return can_become_site_master;}
public boolean enableAddressTagging() {return enable_address_tagging;}
public boolean relayMulticasts() {return relay_multicasts;}
public int forwardQueueMaxSize() {return fwd_queue_max_size;}
public long siteDownTimeout() {return site_down_timeout;}


public Address getLocalAddress() {return local_addr;}
public TimeScheduler getTimer() {return timer;}

public View getBridgeView(String cluster_name) {
Relayer tmp=relayer;
return tmp != null? tmp.getBridgeView(cluster_name) : null;
}


public RELAY2 addSite(String site_name, RelayConfig.SiteConfig cfg) {
sites.put(site_name, cfg);
return this;
}



public void init() throws Exception {
super.init();
timer=getTransport().getTimer();
if(site == null)
throw new IllegalArgumentException("site cannot be null");
if(config != null)
Expand Down Expand Up @@ -352,48 +388,32 @@ protected void route(SiteAddress dest, SiteAddress sender, byte[] buf) {
return;
}

Relayer.Route route=null;
int num_forward_attempts=0;

while((route=tmp.getRoute(target_site)) == null && !Thread.currentThread().isInterrupted()) {
if(++num_forward_attempts >= max_forward_attempts)
break;
Util.sleep(forward_sleep);
}

if(Thread.currentThread().isInterrupted())
return;

if(route == null) {
log.warn("route for " + SiteUUID.getSiteName(target_site) + " (" + target_site +
") not found, dropping message to " + dest + " from " + sender);
sendSiteUnreachableTo(sender, target_site);
}
Relayer.Route route=tmp.getRoute(target_site);
if(route == null)
log.error("no route to " + SiteUUID.getSiteName(target_site) + ": dropping message");
else
_route(dest, sender, route, buf);
route.send(target_site, dest, sender, buf);
}

/** Sends the message via all bridges excluding the excluded_sites bridges */
protected void sendToBridges(Address sender, byte[] buf, short ... excluded_sites) {
List<Relayer.Route> routes=relayer != null? relayer.getRoutes(excluded_sites) : null;
Relayer tmp=relayer;
List<Relayer.Route> routes=tmp != null? tmp.getRoutes(excluded_sites) : null;
if(routes == null)
return;
for(Relayer.Route route: routes) {
if(log.isTraceEnabled())
log.trace("relaying multicast message from " + sender + " over bridge " + route.site_master.toString());
Relay2Header hdr=new Relay2Header(Relay2Header.DATA, null, sender);
Message msg=new Message(route.site_master, buf);
msg.putHeader(id, hdr);
log.trace("relaying multicast message from " + sender + " via route " + route);
try {
route.bridge.send(msg);
route.send(((SiteAddress)route.getSiteMaster()).getSite(), null, sender, buf);
}
catch(Exception e) {
log.error("failed relaying message from " + sender + " over bridge " + route.site_master.toString());
catch(Exception ex) {
log.error("failed relaying message from " + sender + " via route " + route, ex);
}
}
}

protected void sendSiteUnreachableTo(SiteAddress dest, short target_site) {
protected void sendSiteUnreachableTo(Address dest, short target_site) {
Message msg=new Message(dest);
msg.setSrc(new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id));
Relay2Header hdr=new Relay2Header(Relay2Header.SITE_UNREACHABLE, new SiteMaster(target_site), null);
Expand Down Expand Up @@ -434,7 +454,7 @@ protected void deliverLocally(SiteAddress dest, SiteAddress sender, byte[] buf)

if(log.isTraceEnabled())
log.trace("delivering message to " + dest + " in local cluster");
forwardTo(local_dest,dest,sender,buf,send_to_coord);
forwardTo(local_dest, dest, sender, buf, send_to_coord);
}

protected void deliver(Address dest, Address sender, byte[] buf) {
Expand All @@ -451,23 +471,6 @@ protected void deliver(Address dest, Address sender, byte[] buf) {
}
}

protected void _route(SiteAddress to, SiteAddress from, Relayer.Route route, byte[] buf) {
if(route == null) {
log.warn("route for site" + to.getSite() + " not found; dropping message");
return;
}
if(log.isTraceEnabled())
log.trace("routing message to " + to + " via " + route.site_master);
RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, to, from);
Message msg=new Message(route.site_master, buf);
msg.putHeader(id,hdr);
try {
route.bridge.send(msg);
}
catch(Exception e) {
log.error("failure relaying message", e);
}
}


protected byte[] marshal(Message msg) {
Expand Down Expand Up @@ -502,7 +505,7 @@ protected void handleView(View view) {
try {
if(log.isTraceEnabled())
log.trace("I became site master; starting bridges");
relayer.start(site_config.getBridges(), bridge_name, site_id);
relayer.start(sites.size(), site_config.getBridges(), bridge_name, site_id);
}
catch(Throwable t) {
log.error("failed starting relayer", t);
Expand Down Expand Up @@ -535,6 +538,14 @@ protected static Address determineSiteMaster(View view) {
}



public static enum RouteStatus {
UP, // The route is up and messages can be sent
UNKNOWN, // Initial status, plus when the view excludes the route. Queues all messages in this state
DOWN // The route is down; send SITE-UNREACHABLE messages back to the senders
}


public static class Relay2Header extends Header {
public static final byte DATA = 1;
public static final byte SITE_UNREACHABLE = 2; // final_dest is a SiteMaster
Expand Down

0 comments on commit 5b20e8d

Please sign in to comment.