diff --git a/src/org/jgroups/protocols/FD.java b/src/org/jgroups/protocols/FD.java index b521ce854a1..3eb26e17e88 100644 --- a/src/org/jgroups/protocols/FD.java +++ b/src/org/jgroups/protocols/FD.java @@ -42,12 +42,16 @@ public class FD extends Protocol { /* ----------------------------------------- Properties -------------------------------------------------- */ - @Property(description="Timeout to suspect a node P if neither a heartbeat nor data were received from P. Default is 3000 msec") - long timeout=3000; + @Property(description="Timeout to suspect a node P if neither a heartbeat nor data were received from P.") + protected long timeout=3000; @Property(description="Number of times to send an are-you-alive message") - int max_tries=2; - + protected int max_tries=5; + + + @Property(description="Treat messages received from members as heartbeats. Note that this means we're updating " + + "a value in a hashmap every time a message is passing up the stack through FD, which is costly.") + boolean msg_counts_as_heartbeat=true; /* --------------------------------------------- JMX ------------------------------------------------------ */ @@ -66,7 +70,7 @@ public class FD extends Protocol { protected Address local_addr=null; - private long last_ack=System.currentTimeMillis(); + protected long last_ack=System.currentTimeMillis(); protected int num_tries=0; @@ -82,10 +86,10 @@ public class FD extends Protocol { @GuardedBy("lock") protected final List
pingable_mbrs=new ArrayList
(); - private TimeScheduler timer=null; + protected TimeScheduler timer=null; @GuardedBy("lock") - private Future monitor_future=null; // task that performs the actual monitoring for failure detection + protected Future monitor_future=null; // task that performs the actual monitoring for failure detection /** Transmits SUSPECT message until view change or UNSUSPECT is received */ protected final Broadcaster bcast_task=new Broadcaster(); @@ -133,6 +137,7 @@ public void stop() { lock.lock(); try { stopMonitor(); + ping_dest=null; } finally { lock.unlock(); @@ -140,7 +145,7 @@ public void stop() { } - private Address getPingDest(List
mbrs) { + protected Address getPingDest(List
mbrs) { Address tmp, retval=null; if(mbrs == null || mbrs.size() < 2 || local_addr == null) @@ -159,9 +164,6 @@ private Address getPingDest(List
mbrs) { } - protected Monitor createMonitor() { - return new Monitor(); - } @ManagedOperation(description="Stops checking for crashed members") public void stopFailureDetection() { @@ -175,17 +177,17 @@ public void startFailureDetection() { /** Requires lock to held by caller */ @GuardedBy("lock") - private void startMonitor() { + protected void startMonitor() { if(monitor_future == null || monitor_future.isDone()) { last_ack=System.currentTimeMillis(); // start from scratch - monitor_future=timer.scheduleWithFixedDelay(createMonitor(), timeout, timeout, TimeUnit.MILLISECONDS); + monitor_future=timer.scheduleWithFixedDelay(new Monitor(), timeout, timeout, TimeUnit.MILLISECONDS); num_tries=0; } } /** Requires lock to be held by caller */ @GuardedBy("lock") - private void stopMonitor() { + protected void stopMonitor() { if(monitor_future != null) { monitor_future.cancel(true); monitor_future=null; @@ -198,7 +200,7 @@ private void stopMonitor() { /** Restarts the monitor if the ping destination has changed. If not, this is a no-op. * Requires lock to be held by the caller */ @GuardedBy("lock") - private void restartMonitor() { + protected void restartMonitor() { Address tmp_dest=getPingDest(pingable_mbrs); boolean restart_monitor=tmp_dest == null || ping_dest == null || // tmp_dest != null && ping_dest == null @@ -225,7 +227,8 @@ public Object up(Event evt) { Message msg=(Message)evt.getArg(); FdHeader hdr=(FdHeader)msg.getHeader(this.id); if(hdr == null) { - updateTimestamp(msg.getSrc()); + if(msg_counts_as_heartbeat) + updateTimestamp(msg.getSrc()); break; // message did not originate from FD layer, just pass up } @@ -252,15 +255,13 @@ public Object up(Event evt) { sendHeartbeatResponse(msg.getSrc()); continue; } - else { - lock.lock(); - try { - pingable_mbrs.remove(mbr); - restartMonitor(); - } - finally { - lock.unlock(); - } + lock.lock(); + try { + pingable_mbrs.remove(mbr); + restartMonitor(); + } + finally { + lock.unlock(); } up_prot.up(new Event(Event.SUSPECT, mbr)); down_prot.down(new Event(Event.SUSPECT, mbr)); @@ -309,9 +310,8 @@ public Object down(Event evt) { } - private void sendHeartbeatResponse(Address dest) { - Message hb_ack=new Message(dest, null, null); - hb_ack.setFlag(Message.OOB); + protected void sendHeartbeatResponse(Address dest) { + Message hb_ack=new Message(dest).setFlag(Message.OOB); FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK); tmp_hdr.from=local_addr; hb_ack.putHeader(this.id, tmp_hdr); @@ -319,7 +319,7 @@ private void sendHeartbeatResponse(Address dest) { } @GuardedBy("lock") - private void unsuspect(Address mbr) { + protected void unsuspect(Address mbr) { lock.lock(); try { bcast_task.removeSuspectedMember(mbr); @@ -333,8 +333,7 @@ private void unsuspect(Address mbr) { } } - @GuardedBy("lock") - private void updateTimestamp(Address sender) { + protected void updateTimestamp(Address sender) { if(ping_dest != null && sender != null && ping_dest.equals(sender)) { long tmp=System.currentTimeMillis(); lock.lock(); @@ -417,10 +416,7 @@ public void readFrom(DataInput in) throws Exception { protected class Monitor implements Runnable { public void run() { - Message hb_req; - long not_heard_from; // time in msecs we haven't heard from ping_dest Address dest=null; - lock.lock(); try { if(ping_dest == null) { @@ -429,8 +425,7 @@ public void run() { pingable_mbrs + ", local_addr=" + local_addr); return; } - else - dest=ping_dest; + dest=ping_dest; } finally { lock.unlock(); @@ -438,9 +433,7 @@ public void run() { // 1. send heartbeat request - hb_req=new Message(dest, null, null); - hb_req.setFlag(Message.OOB); - hb_req.putHeader(id, new FdHeader(FdHeader.HEARTBEAT)); // send heartbeat request + Message hb_req=new Message(dest).setFlag(Message.OOB).putHeader(id, new FdHeader(FdHeader.HEARTBEAT)); if(log.isDebugEnabled()) log.debug(local_addr + ": sending are-you-alive msg to " + dest + " (own address=" + local_addr + ')'); down_prot.down(new Event(Event.MSG, hb_req)); @@ -449,15 +442,14 @@ public void run() { // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been // received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install // a new view - not_heard_from=System.currentTimeMillis() - last_ack; + long not_heard_from=System.currentTimeMillis() - last_ack; // time in msecs we haven't heard from ping_dest // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003) if(not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs if(num_tries >= max_tries) { if(log.isDebugEnabled()) log.debug(local_addr + ": received no heartbeat ack from " + dest + " for " + (num_tries +1) + " times (" + ((num_tries+1) * timeout) + " milliseconds), suspecting it"); - // broadcast a SUSPECT message to all members - loop until - // unsuspect or view change is received + // broadcast a SUSPECT message to all members - loop until unsuspect or view change is received bcast_task.addSuspectedMember(dest); num_tries=0; if(stats) { @@ -482,15 +474,15 @@ public void run() { * any longer. Then the task terminates. */ protected final class Broadcaster { - final List
suspected_mbrs=new ArrayList
(7); - final Lock bcast_lock=new ReentrantLock(); + protected final List
suspected_mbrs=new ArrayList
(7); + protected final Lock bcast_lock=new ReentrantLock(); @GuardedBy("bcast_lock") - Future bcast_future=null; + protected Future bcast_future=null; @GuardedBy("bcast_lock") - BroadcastTask task; + protected BroadcastTask task; - List
getSuspectedMembers() { + protected List
getSuspectedMembers() { return suspected_mbrs; } @@ -498,7 +490,7 @@ List
getSuspectedMembers() { * Starts a new task, or - if already running - adds the argument to the running task. * @param suspect */ - private void startBroadcastTask(Address suspect) { + protected void startBroadcastTask(Address suspect) { bcast_lock.lock(); try { if(bcast_future == null || bcast_future.isDone()) { @@ -520,7 +512,7 @@ private void startBroadcastTask(Address suspect) { } } - private void stopBroadcastTask() { + protected void stopBroadcastTask() { bcast_lock.lock(); try { if(bcast_future != null) { @@ -570,7 +562,7 @@ void adjustSuspectedMembers(List
new_mbrship) { protected final class BroadcastTask implements Runnable { - private final List
suspected_members=new ArrayList
(); + protected final List
suspected_members=new ArrayList
(); BroadcastTask(List
suspected_members) { @@ -585,9 +577,7 @@ public void stop() { public void run() { - Message suspect_msg; FD.FdHeader hdr; - synchronized(suspected_members) { if(suspected_members.isEmpty()) { stop(); @@ -598,9 +588,7 @@ public void run() { hdr.mbrs=new ArrayList
(suspected_members); hdr.from=local_addr; } - suspect_msg=new Message(); // mcast SUSPECT to all members - suspect_msg.setFlag(Message.OOB); - suspect_msg.putHeader(id, hdr); + Message suspect_msg=new Message().setFlag(Message.OOB).putHeader(id, hdr); if(log.isDebugEnabled()) log.debug(local_addr + ": broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group"); down_prot.down(new Event(Event.MSG, suspect_msg)); diff --git a/tests/junit-functional/org/jgroups/tests/FdMonitorTest.java b/tests/junit-functional/org/jgroups/tests/FdMonitorTest.java new file mode 100644 index 00000000000..25900a49045 --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/FdMonitorTest.java @@ -0,0 +1,76 @@ +package org.jgroups.tests; + +import org.jgroups.Global; +import org.jgroups.JChannel; +import org.jgroups.protocols.FD; +import org.jgroups.protocols.PING; +import org.jgroups.protocols.SHARED_LOOPBACK; +import org.jgroups.protocols.UNICAST2; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + + +@Test(groups=Global.FUNCTIONAL) +public class FdMonitorTest { + protected static final String CLUSTER_NAME = "FdMonitorTest"; + protected JChannel a, b; + + + @BeforeMethod + public void setup() throws Exception { + a=createChannel("A"); + b=createChannel("B"); + } + + @AfterMethod + public void destroy() { + Util.close(b,a); + } + + public void testFdMonitorActivation() throws Exception { + a.connect(CLUSTER_NAME); + assert !FD(a).isMonitorRunning(); + + b.connect(CLUSTER_NAME); + Util.waitUntilAllChannelsHaveSameSize(10000,500,a,b); + + validateFdMonitor(""); + + reconnect(b); + Util.waitUntilAllChannelsHaveSameSize(10000, 500,a,b); + + // Util.sleep(60000); + validateFdMonitor("after B reconnect"); + } + + + protected void validateFdMonitor(String msg) { + assert FD(a).isMonitorRunning() : "A.FD.monitor "+msg; + assert FD(b).isMonitorRunning() : "B.FD.monitor "+msg; + } + + protected static void reconnect(JChannel node) throws Exception { + node.disconnect(); + node.connect(CLUSTER_NAME); + } + + protected static FD FD(JChannel ch) { + return (FD) ch.getProtocolStack().findProtocol(FD.class); + } + + protected JChannel createChannel(String name) throws Exception { + JChannel ch=Util.createChannel(new SHARED_LOOPBACK(), + new PING().setValue("timeout",500).setValue("num_initial_members",2), + new FD().setValue("timeout", 1000).setValue("num_tries", 3), + new NAKACK2(), + new UNICAST2(), + new GMS().setValue("print_local_addr",false)); + ch.setName(name); + return ch; + } +}