From 5b20e8d79528bbd10ddfba2c32264bb1f13c20a4 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 1 Nov 2012 09:49:41 +0100 Subject: [PATCH] First commit of batching for RELAY2 (https://issues.jboss.org/browse/JGRP-1528); Relay2Test doesn't pass yet --- src/org/jgroups/protocols/relay/RELAY2.java | 121 ++++++----- src/org/jgroups/protocols/relay/Relayer.java | 202 ++++++++++++++---- .../protocols/relay/config/RelayConfig.java | 2 +- .../org/jgroups/tests/Relay2Test.java | 195 ++++++++++++++++- 4 files changed, 413 insertions(+), 107 deletions(-) diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index b468aab79b3..3ed5bfff790 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -10,6 +10,7 @@ import org.jgroups.protocols.relay.config.RelayConfig; import org.jgroups.stack.AddressGenerator; import org.jgroups.stack.Protocol; +import org.jgroups.util.TimeScheduler; import org.jgroups.util.TopologyUUID; import org.jgroups.util.UUID; import org.jgroups.util.Util; @@ -42,17 +43,29 @@ public class RELAY2 extends Protocol { @Property(description="Whether or not we generate our own addresses in which we use can_become_site_master. " + "If this property is false, can_become_site_master is ignored") - protected boolean enable_address_tagging=true; + protected boolean enable_address_tagging=false; @Property(description="Whether or not to relay multicast (dest=null) messages") protected boolean relay_multicasts=true; - @Property(description="The number of tries to forward a message to a remote site") + @Property(description="The number of tries to forward a message to a remote site", + deprecatedMessage="not used anymore, will be ignored") protected int max_forward_attempts=5; - @Property(description="The time (in milliseconds) to sleep between forward attempts") + @Property(description="The time (in milliseconds) to sleep between forward attempts", + deprecatedMessage="not used anymore, will be ignored") protected long forward_sleep=1000; + @Property(description="Max number of messages in the foward queue. Messages are added to the forward queue " + + "when the status of a route went from UP to UNKNOWN and the queue is flushed when the status goes to " + + "UP (resending all queued messages) or DOWN (sending SITE-UNREACHABLE messages to the senders)") + protected int fwd_queue_max_size=2000; + + @Property(description="Number of millisconds to wait when the status for a site changed from UP to UNKNOWN " + + "before that site is declared DOWN. A site that's DOWN triggers immediate sending of a SITE-UNREACHABLE message " + + "back to the sender of a message to that site") + protected long site_down_timeout=8000; + /* --------------------------------------------- Fields ------------------------------------------------ */ @ManagedAttribute(description="My site-ID") @@ -70,6 +83,8 @@ public class RELAY2 extends Protocol { protected Relayer relayer; + protected TimeScheduler timer; + protected volatile Address local_addr; /** Whether or not FORWARD_TO_COORD is on the stack */ @@ -77,27 +92,48 @@ public class RELAY2 extends Protocol { protected boolean forwarding_protocol_present; + // Fluent configuration public RELAY2 site(String site_name) {site=site_name; return this;} public RELAY2 config(String cfg) {config=cfg; return this;} public RELAY2 canBecomeSiteMaster(boolean flag) {can_become_site_master=flag; return this;} public RELAY2 enableAddressTagging(boolean flag) {enable_address_tagging=flag; return this;} public RELAY2 relayMulticasts(boolean flag) {relay_multicasts=flag; return this;} - public RELAY2 maxForwardAttempts(int num) {max_forward_attempts=num; return this;} - public RELAY2 forwardSleep(long time) {forward_sleep=time; return this;} + @Deprecated + public RELAY2 maxForwardAttempts(int num) { return this;} + @Deprecated + public RELAY2 forwardSleep(long time) { return this;} + public RELAY2 forwardQueueMaxSize(int size) {fwd_queue_max_size=size; return this;} + public RELAY2 siteDownTimeout(long timeout) {site_down_timeout=timeout; return this;} - public RELAY2 addSite(String site_name, RelayConfig.SiteConfig cfg) { - sites.put(site_name, cfg); - return this; - } + public String site() {return site;} + public String config() {return config;} + public boolean canBecomeSiteMaster() {return can_become_site_master;} + public boolean enableAddressTagging() {return enable_address_tagging;} + public boolean relayMulticasts() {return relay_multicasts;} + public int forwardQueueMaxSize() {return fwd_queue_max_size;} + public long siteDownTimeout() {return site_down_timeout;} + + + public Address getLocalAddress() {return local_addr;} + public TimeScheduler getTimer() {return timer;} public View getBridgeView(String cluster_name) { Relayer tmp=relayer; return tmp != null? tmp.getBridgeView(cluster_name) : null; } + + + public RELAY2 addSite(String site_name, RelayConfig.SiteConfig cfg) { + sites.put(site_name, cfg); + return this; + } + + public void init() throws Exception { super.init(); + timer=getTransport().getTimer(); if(site == null) throw new IllegalArgumentException("site cannot be null"); if(config != null) @@ -352,48 +388,32 @@ protected void route(SiteAddress dest, SiteAddress sender, byte[] buf) { return; } - Relayer.Route route=null; - int num_forward_attempts=0; - - while((route=tmp.getRoute(target_site)) == null && !Thread.currentThread().isInterrupted()) { - if(++num_forward_attempts >= max_forward_attempts) - break; - Util.sleep(forward_sleep); - } - - if(Thread.currentThread().isInterrupted()) - return; - - if(route == null) { - log.warn("route for " + SiteUUID.getSiteName(target_site) + " (" + target_site + - ") not found, dropping message to " + dest + " from " + sender); - sendSiteUnreachableTo(sender, target_site); - } + Relayer.Route route=tmp.getRoute(target_site); + if(route == null) + log.error("no route to " + SiteUUID.getSiteName(target_site) + ": dropping message"); else - _route(dest, sender, route, buf); + route.send(target_site, dest, sender, buf); } /** Sends the message via all bridges excluding the excluded_sites bridges */ protected void sendToBridges(Address sender, byte[] buf, short ... excluded_sites) { - List routes=relayer != null? relayer.getRoutes(excluded_sites) : null; + Relayer tmp=relayer; + List routes=tmp != null? tmp.getRoutes(excluded_sites) : null; if(routes == null) return; for(Relayer.Route route: routes) { if(log.isTraceEnabled()) - log.trace("relaying multicast message from " + sender + " over bridge " + route.site_master.toString()); - Relay2Header hdr=new Relay2Header(Relay2Header.DATA, null, sender); - Message msg=new Message(route.site_master, buf); - msg.putHeader(id, hdr); + log.trace("relaying multicast message from " + sender + " via route " + route); try { - route.bridge.send(msg); + route.send(((SiteAddress)route.getSiteMaster()).getSite(), null, sender, buf); } - catch(Exception e) { - log.error("failed relaying message from " + sender + " over bridge " + route.site_master.toString()); + catch(Exception ex) { + log.error("failed relaying message from " + sender + " via route " + route, ex); } } } - protected void sendSiteUnreachableTo(SiteAddress dest, short target_site) { + protected void sendSiteUnreachableTo(Address dest, short target_site) { Message msg=new Message(dest); msg.setSrc(new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id)); Relay2Header hdr=new Relay2Header(Relay2Header.SITE_UNREACHABLE, new SiteMaster(target_site), null); @@ -434,7 +454,7 @@ protected void deliverLocally(SiteAddress dest, SiteAddress sender, byte[] buf) if(log.isTraceEnabled()) log.trace("delivering message to " + dest + " in local cluster"); - forwardTo(local_dest,dest,sender,buf,send_to_coord); + forwardTo(local_dest, dest, sender, buf, send_to_coord); } protected void deliver(Address dest, Address sender, byte[] buf) { @@ -451,23 +471,6 @@ protected void deliver(Address dest, Address sender, byte[] buf) { } } - protected void _route(SiteAddress to, SiteAddress from, Relayer.Route route, byte[] buf) { - if(route == null) { - log.warn("route for site" + to.getSite() + " not found; dropping message"); - return; - } - if(log.isTraceEnabled()) - log.trace("routing message to " + to + " via " + route.site_master); - RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, to, from); - Message msg=new Message(route.site_master, buf); - msg.putHeader(id,hdr); - try { - route.bridge.send(msg); - } - catch(Exception e) { - log.error("failure relaying message", e); - } - } protected byte[] marshal(Message msg) { @@ -502,7 +505,7 @@ protected void handleView(View view) { try { if(log.isTraceEnabled()) log.trace("I became site master; starting bridges"); - relayer.start(site_config.getBridges(), bridge_name, site_id); + relayer.start(sites.size(), site_config.getBridges(), bridge_name, site_id); } catch(Throwable t) { log.error("failed starting relayer", t); @@ -535,6 +538,14 @@ protected static Address determineSiteMaster(View view) { } + + public static enum RouteStatus { + UP, // The route is up and messages can be sent + UNKNOWN, // Initial status, plus when the view excludes the route. Queues all messages in this state + DOWN // The route is down; send SITE-UNREACHABLE messages back to the senders + } + + public static class Relay2Header extends Header { public static final byte DATA = 1; public static final byte SITE_UNREACHABLE = 2; // final_dest is a SiteMaster diff --git a/src/org/jgroups/protocols/relay/Relayer.java b/src/org/jgroups/protocols/relay/Relayer.java index 40cc9e3e8a0..4d6fe4ef73d 100644 --- a/src/org/jgroups/protocols/relay/Relayer.java +++ b/src/org/jgroups/protocols/relay/Relayer.java @@ -8,7 +8,10 @@ import org.jgroups.util.Util; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.*; /** * Maintains bridges and routing table. Does the routing of outgoing messages and dispatches incoming messages to @@ -27,6 +30,11 @@ public class Relayer { protected final RELAY2 relay; + // Used to store messages for a site with status UNKNOWN. Messages will be flushed when the status changes to UP, or + // a SITE-UNREACHABLE message will be sent to each member *once* when the status changes to DOWN + protected final ConcurrentMap> fwd_queue=new ConcurrentHashMap>(); + + public Relayer(RELAY2 relay, Log log) { this.relay=relay; @@ -39,11 +47,14 @@ public Relayer(RELAY2 relay, Log log) { * @param bridge_configs A list of bridge configurations * @param bridge_name The name of the local bridge channel, prefixed with '_'. * @param my_site_id The ID of this site + * @param num_sites The number of sites * @throws Throwable */ - public void start(List bridge_configs, String bridge_name, final short my_site_id) + public void start(int num_sites, List bridge_configs, String bridge_name, final short my_site_id) throws Throwable { - routes=new Route[bridge_configs.size()]; + routes=new Route[num_sites]; + for(short i=0; i < num_sites; i++) + routes[i]=new Route(null, null, RELAY2.RouteStatus.DOWN); bridges=new ArrayList(bridge_configs.size()); try { for(RelayConfig.BridgeConfig bridge_config: bridge_configs) { @@ -78,6 +89,10 @@ public Address generateAddress() { public void stop() { for(Bridge bridge: bridges) bridge.stop(); + bridges.clear(); + fwd_queue.clear(); + for(int i=0; i < routes.length; i++) + routes[i]=null; } @@ -97,31 +112,14 @@ public synchronized String printRoutes() { } - protected synchronized void addRoute(short site, Route route) { - Route old_route; - ensureCapacity(site); - if((old_route=routes[site]) == null) - _addRoute(site, route); - else if(!old_route.site_master.equals(route.site_master) || old_route.bridge != route.bridge) - _addRoute(site, route); + protected synchronized void setRoute(short site, JChannel bridge, SiteMaster site_master, RELAY2.RouteStatus status) { + Route existing_route=routes[site]; + existing_route.setBridge(bridge); + existing_route.setSiteMaster(site_master); + existing_route.setStatus(status); } - protected void _addRoute(short site, Route route) { - if(log.isTraceEnabled()) - log.trace("added route " + SiteUUID.getSiteName(site)); - routes[site]=route; - } - protected synchronized Route removeRoute(short site) { - if(site <= routes.length -1) { - Route route=routes[site]; - routes[site]=null; - if(log.isTraceEnabled()) - log.trace("removed route " + SiteUUID.getSiteName(site)); - return route; - } - return null; - } protected synchronized Route getRoute(short site) { if(site <= routes.length -1) @@ -133,7 +131,7 @@ protected synchronized List getRoutes(short ... excluded_sites) { List retval=new ArrayList(routes.length); for(short i=0; i < routes.length; i++) { Route tmp=routes[i]; - if(tmp != null) { + if(tmp != null && tmp.getStatus() != RELAY2.RouteStatus.DOWN) { if(!isExcluded(tmp, excluded_sites)) retval.add(tmp); } @@ -162,34 +160,78 @@ protected static boolean isExcluded(Route route, short... excluded_sites) { return false; } - protected synchronized void ensureCapacity(short site) { - if(site >= routes.length) { - Route[] tmp_routes=new Route[Math.max(site+1, routes.length * 2)]; - System.arraycopy(routes, 0, tmp_routes, 0, routes.length); - routes=tmp_routes; - } - } /** * Includes information about the site master of the route and the channel to be used */ - public static class Route { - protected final Address site_master; - protected final JChannel bridge; + public class Route { + private volatile Address site_master; + private volatile JChannel bridge; + private volatile RELAY2.RouteStatus status; public Route(Address site_master, JChannel bridge) { + this(site_master, bridge, RELAY2.RouteStatus.UP); + } + + public Route(Address site_master, JChannel bridge, RELAY2.RouteStatus status) { this.site_master=site_master; this.bridge=bridge; + this.status=status; } - public JChannel getBridge() {return bridge;} + public JChannel getBridge() {return bridge;} + public void setBridge(JChannel new_bridge) {bridge=new_bridge;} + public Address getSiteMaster() {return site_master;} + public void setSiteMaster(Address new_site_master) {site_master=new_site_master;} + public RELAY2.RouteStatus getStatus() {return status;} + public void setStatus(RELAY2.RouteStatus new_status) {status=new_status;} + + + public void send(short target_site, Address final_destination, Address original_sender, byte[] buf) { + switch(status) { + case DOWN: // send SITE-UNREACHABLE message back to sender + relay.sendSiteUnreachableTo(original_sender, target_site); + return; + case UNKNOWN: // queue message + BlockingQueue queue=fwd_queue.get(target_site); + if(queue == null) { + queue=new LinkedBlockingQueue(relay.forwardQueueMaxSize()); + BlockingQueue existing=fwd_queue.putIfAbsent(target_site, queue); + if(existing != null) + queue=existing; + } + try { + queue.put(createMessage(new SiteMaster(target_site), final_destination, original_sender, buf)); + } + catch(InterruptedException e) { + } + return; + } + + // at this point status is RUNNING + if(log.isTraceEnabled()) + log.trace("routing message to " + final_destination + " via " + site_master); + try { + Message msg=createMessage(site_master, final_destination, original_sender, buf); + bridge.send(msg); + } + catch(Exception e) { + log.error("failure relaying message", e); + } + } - public Address getSiteMaster() {return site_master;} public String toString() { - return site_master.toString(); + return (site_master != null? site_master + " " : "") + "[" + status + "]"; + } + + protected Message createMessage(Address target, Address final_destination, Address original_sender, byte[] buf) { + Message msg=new Message(target, buf); + RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, final_destination, original_sender); + msg.putHeader(relay.getId(), hdr); + return msg; } } @@ -235,8 +277,15 @@ public void viewAccepted(View new_view) { for(Address addr: left_mbrs) { if(addr instanceof SiteUUID) { SiteUUID site_uuid=(SiteUUID)addr; - short site=site_uuid.getSite(); - removeRoute(site); + final short site=site_uuid.getSite(); + changeStatusToUnknown(site); + relay.getTimer().schedule(new Runnable() { + public void run() { + Route route=routes[site]; + if(route.getStatus() == RELAY2.RouteStatus.UNKNOWN) + changeStatusToDown(site); + } + }, relay.siteDownTimeout(), TimeUnit.MILLISECONDS); } } } @@ -245,10 +294,79 @@ public void viewAccepted(View new_view) { if(addr instanceof SiteUUID) { SiteUUID site_uuid=(SiteUUID)addr; short site=site_uuid.getSite(); - Route route=new Route(site_uuid, channel); - addRoute(site, route); + changeStatusToUp(site, channel, site_uuid); + } + } + } + + + protected void changeStatusToUnknown(short id) { + Route route=routes[id]; + route.setStatus(RELAY2.RouteStatus.UNKNOWN); // messages are queued from now on + } + + protected void changeStatusToDown(short id) { + Route route=routes[id]; + route.setStatus(RELAY2.RouteStatus.DOWN); // SITE-UNREACHABLE responses are sent in this state + BlockingQueue msgs=fwd_queue.remove(id); + if(msgs != null && !msgs.isEmpty()) { + Set
targets=new HashSet
(); // we need to send a SITE-UNREACHABLE only *once* to every sender + for(Message msg: msgs) { + RELAY2.Relay2Header hdr=(RELAY2.Relay2Header)msg.getHeader(relay.getId()); + targets.add(hdr.original_sender); + } + for(Address target: targets) { + if(route.getStatus() != RELAY2.RouteStatus.UP) + relay.sendSiteUnreachableTo(target, id); + } + } + } + + protected void changeStatusToUp(final short id, JChannel bridge, Address site_master) { + final Route route=routes[id]; + if(route.getBridge() == null || !route.getBridge().equals(bridge)) + route.setBridge(bridge); + if(route.getSiteMaster() == null || !route.getSiteMaster().equals(site_master)) + route.setSiteMaster(site_master); + + RELAY2.RouteStatus old_status=route.getStatus(); + route.setStatus(RELAY2.RouteStatus.UP); + + switch(old_status) { + case UNKNOWN: + case DOWN: // queue should be empty, but anyway... + relay.getTimer().execute(new Runnable() { + public void run() { + flushQueue(id, route); + } + }); + break; + } + } + + // Resends all messages in the queue, then clears the queue + protected void flushQueue(short id, Route route) { + BlockingQueue msgs=fwd_queue.get(id); + if(msgs == null || msgs.isEmpty()) + return; + Message msg; + JChannel bridge=route.getBridge(); + if(log.isTraceEnabled()) + log.trace(relay.getLocalAddress() + ": forwarding " + msgs.size() + " queued messages"); + System.out.println(relay.getLocalAddress() + ": forwarding " + msgs.size() + " queued messages"); + while((msg=msgs.poll()) != null && route.getStatus() == RELAY2.RouteStatus.UP) { + try { + Message copy=msg.copy(); // need the copy to change the destination to the site master + copy.setDest(route.getSiteMaster()); + bridge.send(copy); + System.out.println("--> " + copy); + } + catch(Throwable ex) { + log.error("failed forwarding queued message to " + SiteUUID.getSiteName(id), ex); } } + fwd_queue.remove(id); } + } } diff --git a/src/org/jgroups/protocols/relay/config/RelayConfig.java b/src/org/jgroups/protocols/relay/config/RelayConfig.java index 808d73fcfb3..3fa6ef5e432 100644 --- a/src/org/jgroups/protocols/relay/config/RelayConfig.java +++ b/src/org/jgroups/protocols/relay/config/RelayConfig.java @@ -206,7 +206,7 @@ public abstract static class BridgeConfig { protected BridgeConfig(String cluster_name) {this.cluster_name=cluster_name;} public String getClusterName() {return cluster_name;} - public abstract JChannel createChannel() throws Exception; + public abstract JChannel createChannel() throws Exception; public String toString() {return "cluster=" + cluster_name;} } diff --git a/tests/junit-functional/org/jgroups/tests/Relay2Test.java b/tests/junit-functional/org/jgroups/tests/Relay2Test.java index 84b5688e741..a65ca0fd3f4 100644 --- a/tests/junit-functional/org/jgroups/tests/Relay2Test.java +++ b/tests/junit-functional/org/jgroups/tests/Relay2Test.java @@ -1,9 +1,6 @@ package org.jgroups.tests; -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.MergeView; -import org.jgroups.View; +import org.jgroups.*; import org.jgroups.protocols.FORWARD_TO_COORD; import org.jgroups.protocols.PING; import org.jgroups.protocols.SHARED_LOOPBACK; @@ -12,13 +9,16 @@ import org.jgroups.protocols.pbcast.NAKACK2; import org.jgroups.protocols.relay.RELAY2; import org.jgroups.protocols.relay.Relayer; +import org.jgroups.protocols.relay.SiteMaster; import org.jgroups.protocols.relay.config.RelayConfig; import org.jgroups.stack.Protocol; import org.jgroups.util.Util; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** * Various RELAY2-related tests @@ -31,6 +31,8 @@ public class Relay2Test { protected JChannel x, y, z; // members in site "sfo" protected static final String BRIDGE_CLUSTER="global"; + protected static final String LON_CLUSTER="lon-cluster"; + protected static final String SFO_CLUSTER="sfo-cluster"; @AfterMethod protected void destroy() {Util.close(z,y,x,c,b,a);} @@ -40,11 +42,11 @@ public class Relay2Test { * (https://issues.jboss.org/browse/JGRP-1524) */ public void testMissingRouteAfterMerge() throws Exception { - a=createNode("lon", "A", "london-cluster"); - b=createNode("lon", "B", "london-cluster"); + a=createNode("lon", "A", LON_CLUSTER); + b=createNode("lon", "B", LON_CLUSTER); Util.waitUntilAllChannelsHaveSameSize(30000, 500, a,b); - x=createNode("sfo", "X", "sfo-cluster"); + x=createNode("sfo", "X", SFO_CLUSTER); assert x.getView().size() == 1; RELAY2 ar=(RELAY2)a.getProtocolStack().findProtocol(RELAY2.class), @@ -110,6 +112,135 @@ public void testMissingRouteAfterMerge() throws Exception { } + /** + * Tests whether the bridge channel connects and disconnects ok. + * @throws Exception + */ + public void testConnectAndReconnectOfBridgeStack() throws Exception { + a=new JChannel(createBridgeStack()); + a.setName("A"); + b=new JChannel(createBridgeStack()); + b.setName("B"); + + a.connect(BRIDGE_CLUSTER); + b.connect(BRIDGE_CLUSTER); + Util.waitUntilAllChannelsHaveSameSize(10000, 500, a, b); + + b.disconnect(); + Util.waitUntilAllChannelsHaveSameSize(10000, 500, a); + + b.connect(BRIDGE_CLUSTER); + Util.waitUntilAllChannelsHaveSameSize(10000, 500, a, b); + } + + + + /** + * Tests sites LON and SFO, with SFO disconnecting (bridge view on LON should be 1) and reconnecting (bridge view on + * LON and SFO should be 2) + * @throws Exception + */ + public void testDisconnectAndReconnect() throws Exception { + a=createNode("lon", "A", LON_CLUSTER); + x=createNode("sfo", "X", SFO_CLUSTER); + + System.out.println("Started A and X; waiting for bridge view of 2 on A and X"); + waitForBridgeView(2, 20000, 500, a, x); + + System.out.println("Disconnecting X; waiting for a bridge view on 1 on A"); + x.disconnect(); + waitForBridgeView(1, 20000, 500, a); + + System.out.println("Reconnecting X again; waiting for a bridge view of 2 on A and X"); + x.connect(SFO_CLUSTER); + waitForBridgeView(2, 20000, 500, a, x); + } + + + + + + /** + * Tests that queued messages are forwarded successfully. The scenario is: + *
    + *
  • Node A in site LON, node X in site SFO
  • + *
  • Node X is brought down (gracefully)
  • + *
  • Node A sends a few unicast messages to the site master of SFO (queued)
  • + *
  • Node X is started again
  • + *
  • The queued messages on A should be forwarded to the site master of SFO
  • + *
+ * https://issues.jboss.org/browse/JGRP-1528 + */ + public void testQueueingAndForwarding() throws Exception { + a=createNode("lon", "A", LON_CLUSTER); + x=createNode("sfo", "X", null); // don't connect yet + MyReceiver rx=new MyReceiver(); + x.setReceiver(rx); + x.connect(SFO_CLUSTER); + + System.out.println("Waiting for site SFO to be UP"); + RELAY2 relay_a=(RELAY2)a.getProtocolStack().findProtocol(RELAY2.class); + Relayer.Route sfo_route=relay_a.getRoute("sfo"); + + for(int i=0; i < 20; i++) { + if(sfo_route.getStatus() == RELAY2.RouteStatus.UP) + break; + Util.sleep(500); + } + System.out.println("Route to SFO: " + sfo_route); + assert sfo_route.getStatus() == RELAY2.RouteStatus.UP; + + Address sm_sfo=new SiteMaster("sfo"); + System.out.println("Sending message 0 to the site master of SFO"); + a.send(sm_sfo, 0); + + List list=rx.getList(); + for(int i=0; i < 20; i++) { + if(!list.isEmpty()) + break; + Util.sleep(500); + } + System.out.println("list = " + list); + assert list.size() == 1 && list.get(0) == 0; + rx.clear(); + + x.disconnect(); + System.out.println("Waiting for site SFO to be UNKNOWN"); + + for(int i=0; i < 20; i++) { + if(sfo_route.getStatus() == RELAY2.RouteStatus.UNKNOWN) + break; + Util.sleep(500); + } + System.out.println("Route to SFO: " + sfo_route); + assert sfo_route.getStatus() == RELAY2.RouteStatus.UNKNOWN; + + + System.out.println("sending 5 messages from A to site master SFO - they should all get queued"); + for(int i=1; i <= 5; i++) + a.send(sm_sfo, i); + + System.out.println("Starting X again; the queued messages should now get re-sent"); + x.connect(SFO_CLUSTER); + + /*x=createNode("sfo", "X", null); // don't connect yet + x.setReceiver(rx); + x.connect(SFO_CLUSTER);*/ + + + + for(int i=0; i < 20; i++) { + if(list.size() == 5) + break; + Util.sleep(500); + } + System.out.println("list = " + list); + assert list.size() == 5; + for(int i=1; i <= 5; i++) + assert list.contains(i); + } + + protected JChannel createNode(String site_name, String node_name, String cluster_name) throws Exception { JChannel ch=new JChannel(new SHARED_LOOPBACK(), @@ -117,10 +248,11 @@ protected JChannel createNode(String site_name, String node_name, String cluster new NAKACK2(), new UNICAST2(), new GMS(), - new FORWARD_TO_COORD().setValue("resend_delay", 500), + new FORWARD_TO_COORD(), createRELAY2(site_name)); ch.setName(node_name); - ch.connect(cluster_name); + if(cluster_name != null) + ch.connect(cluster_name); return ch; } @@ -155,4 +287,49 @@ protected static void createPartition(JChannel ... channels) { } } + protected void waitForBridgeView(int expected_size, long timeout, long interval, JChannel ... channels) { + long deadline=System.currentTimeMillis() + timeout; + + while(System.currentTimeMillis() < deadline) { + boolean views_correct=true; + for(JChannel ch: channels) { + RELAY2 relay=(RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class); + View bridge_view=relay.getBridgeView(BRIDGE_CLUSTER); + if(bridge_view.size() != expected_size) { + views_correct=false; + break; + } + } + if(views_correct) + break; + Util.sleep(interval); + } + + System.out.println("Bridge views:\n"); + for(JChannel ch: channels) { + RELAY2 relay=(RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class); + View bridge_view=relay.getBridgeView(BRIDGE_CLUSTER); + System.out.println(ch.getAddress() + ": " + bridge_view); + } + + for(JChannel ch: channels) { + RELAY2 relay=(RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class); + View bridge_view=relay.getBridgeView(BRIDGE_CLUSTER); + assert bridge_view.size() == expected_size : ch.getAddress() + ": bridge view=" + bridge_view + ", expected=" + expected_size; + } + } + + + protected static class MyReceiver extends ReceiverAdapter { + protected final List list=new ArrayList(5); + + public List getList() {return list;} + public void clear() {list.clear();} + + public void receive(Message msg) { + list.add((Integer)msg.getObject()); + System.out.println("<-- " + msg.getObject()); + } + } + }