Skip to content

Commit

Permalink
Monitor sets ping_dest=null on stop(), reconnects correctly now (http…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Dec 13, 2012
1 parent 0817aea commit 19404ee
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 55 deletions.
98 changes: 43 additions & 55 deletions src/org/jgroups/protocols/FD.java
Expand Up @@ -42,12 +42,16 @@ public class FD extends Protocol {

/* ----------------------------------------- Properties -------------------------------------------------- */

@Property(description="Timeout to suspect a node P if neither a heartbeat nor data were received from P. Default is 3000 msec")
long timeout=3000;
@Property(description="Timeout to suspect a node P if neither a heartbeat nor data were received from P.")
protected long timeout=3000;

@Property(description="Number of times to send an are-you-alive message")
int max_tries=2;

protected int max_tries=5;


@Property(description="Treat messages received from members as heartbeats. Note that this means we're updating " +
"a value in a hashmap every time a message is passing up the stack through FD, which is costly.")
boolean msg_counts_as_heartbeat=true;


/* --------------------------------------------- JMX ------------------------------------------------------ */
Expand All @@ -66,7 +70,7 @@ public class FD extends Protocol {

protected Address local_addr=null;

private long last_ack=System.currentTimeMillis();
protected long last_ack=System.currentTimeMillis();

protected int num_tries=0;

Expand All @@ -82,10 +86,10 @@ public class FD extends Protocol {
@GuardedBy("lock")
protected final List<Address> pingable_mbrs=new ArrayList<Address>();

private TimeScheduler timer=null;
protected TimeScheduler timer=null;

@GuardedBy("lock")
private Future<?> monitor_future=null; // task that performs the actual monitoring for failure detection
protected Future<?> monitor_future=null; // task that performs the actual monitoring for failure detection

/** Transmits SUSPECT message until view change or UNSUSPECT is received */
protected final Broadcaster bcast_task=new Broadcaster();
Expand Down Expand Up @@ -133,14 +137,15 @@ public void stop() {
lock.lock();
try {
stopMonitor();
ping_dest=null;
}
finally {
lock.unlock();
}
}


private Address getPingDest(List<Address> mbrs) {
protected Address getPingDest(List<Address> mbrs) {
Address tmp, retval=null;

if(mbrs == null || mbrs.size() < 2 || local_addr == null)
Expand All @@ -159,9 +164,6 @@ private Address getPingDest(List<Address> mbrs) {
}


protected Monitor createMonitor() {
return new Monitor();
}

@ManagedOperation(description="Stops checking for crashed members")
public void stopFailureDetection() {
Expand All @@ -175,17 +177,17 @@ public void startFailureDetection() {

/** Requires lock to held by caller */
@GuardedBy("lock")
private void startMonitor() {
protected void startMonitor() {
if(monitor_future == null || monitor_future.isDone()) {
last_ack=System.currentTimeMillis(); // start from scratch
monitor_future=timer.scheduleWithFixedDelay(createMonitor(), timeout, timeout, TimeUnit.MILLISECONDS);
monitor_future=timer.scheduleWithFixedDelay(new Monitor(), timeout, timeout, TimeUnit.MILLISECONDS);
num_tries=0;
}
}

/** Requires lock to be held by caller */
@GuardedBy("lock")
private void stopMonitor() {
protected void stopMonitor() {
if(monitor_future != null) {
monitor_future.cancel(true);
monitor_future=null;
Expand All @@ -198,7 +200,7 @@ private void stopMonitor() {
/** Restarts the monitor if the ping destination has changed. If not, this is a no-op.
* Requires lock to be held by the caller */
@GuardedBy("lock")
private void restartMonitor() {
protected void restartMonitor() {
Address tmp_dest=getPingDest(pingable_mbrs);
boolean restart_monitor=tmp_dest == null ||
ping_dest == null || // tmp_dest != null && ping_dest == null
Expand All @@ -225,7 +227,8 @@ public Object up(Event evt) {
Message msg=(Message)evt.getArg();
FdHeader hdr=(FdHeader)msg.getHeader(this.id);
if(hdr == null) {
updateTimestamp(msg.getSrc());
if(msg_counts_as_heartbeat)
updateTimestamp(msg.getSrc());
break; // message did not originate from FD layer, just pass up
}

Expand All @@ -252,15 +255,13 @@ public Object up(Event evt) {
sendHeartbeatResponse(msg.getSrc());
continue;
}
else {
lock.lock();
try {
pingable_mbrs.remove(mbr);
restartMonitor();
}
finally {
lock.unlock();
}
lock.lock();
try {
pingable_mbrs.remove(mbr);
restartMonitor();
}
finally {
lock.unlock();
}
up_prot.up(new Event(Event.SUSPECT, mbr));
down_prot.down(new Event(Event.SUSPECT, mbr));
Expand Down Expand Up @@ -309,17 +310,16 @@ public Object down(Event evt) {
}


private void sendHeartbeatResponse(Address dest) {
Message hb_ack=new Message(dest, null, null);
hb_ack.setFlag(Message.OOB);
protected void sendHeartbeatResponse(Address dest) {
Message hb_ack=new Message(dest).setFlag(Message.OOB);
FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK);
tmp_hdr.from=local_addr;
hb_ack.putHeader(this.id, tmp_hdr);
down_prot.down(new Event(Event.MSG, hb_ack));
}

@GuardedBy("lock")
private void unsuspect(Address mbr) {
protected void unsuspect(Address mbr) {
lock.lock();
try {
bcast_task.removeSuspectedMember(mbr);
Expand All @@ -333,8 +333,7 @@ private void unsuspect(Address mbr) {
}
}

@GuardedBy("lock")
private void updateTimestamp(Address sender) {
protected void updateTimestamp(Address sender) {
if(ping_dest != null && sender != null && ping_dest.equals(sender)) {
long tmp=System.currentTimeMillis();
lock.lock();
Expand Down Expand Up @@ -417,10 +416,7 @@ public void readFrom(DataInput in) throws Exception {
protected class Monitor implements Runnable {

public void run() {
Message hb_req;
long not_heard_from; // time in msecs we haven't heard from ping_dest
Address dest=null;

lock.lock();
try {
if(ping_dest == null) {
Expand All @@ -429,18 +425,15 @@ public void run() {
pingable_mbrs + ", local_addr=" + local_addr);
return;
}
else
dest=ping_dest;
dest=ping_dest;
}
finally {
lock.unlock();
}


// 1. send heartbeat request
hb_req=new Message(dest, null, null);
hb_req.setFlag(Message.OOB);
hb_req.putHeader(id, new FdHeader(FdHeader.HEARTBEAT)); // send heartbeat request
Message hb_req=new Message(dest).setFlag(Message.OOB).putHeader(id, new FdHeader(FdHeader.HEARTBEAT));
if(log.isDebugEnabled())
log.debug(local_addr + ": sending are-you-alive msg to " + dest + " (own address=" + local_addr + ')');
down_prot.down(new Event(Event.MSG, hb_req));
Expand All @@ -449,15 +442,14 @@ public void run() {
// 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been
// received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install
// a new view
not_heard_from=System.currentTimeMillis() - last_ack;
long not_heard_from=System.currentTimeMillis() - last_ack; // time in msecs we haven't heard from ping_dest
// quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003)
if(not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs
if(num_tries >= max_tries) {
if(log.isDebugEnabled())
log.debug(local_addr + ": received no heartbeat ack from " + dest + " for " + (num_tries +1) +
" times (" + ((num_tries+1) * timeout) + " milliseconds), suspecting it");
// broadcast a SUSPECT message to all members - loop until
// unsuspect or view change is received
// broadcast a SUSPECT message to all members - loop until unsuspect or view change is received
bcast_task.addSuspectedMember(dest);
num_tries=0;
if(stats) {
Expand All @@ -482,23 +474,23 @@ public void run() {
* any longer. Then the task terminates.
*/
protected final class Broadcaster {
final List<Address> suspected_mbrs=new ArrayList<Address>(7);
final Lock bcast_lock=new ReentrantLock();
protected final List<Address> suspected_mbrs=new ArrayList<Address>(7);
protected final Lock bcast_lock=new ReentrantLock();
@GuardedBy("bcast_lock")
Future<?> bcast_future=null;
protected Future<?> bcast_future=null;
@GuardedBy("bcast_lock")
BroadcastTask task;
protected BroadcastTask task;


List<Address> getSuspectedMembers() {
protected List<Address> getSuspectedMembers() {
return suspected_mbrs;
}

/**
* Starts a new task, or - if already running - adds the argument to the running task.
* @param suspect
*/
private void startBroadcastTask(Address suspect) {
protected void startBroadcastTask(Address suspect) {
bcast_lock.lock();
try {
if(bcast_future == null || bcast_future.isDone()) {
Expand All @@ -520,7 +512,7 @@ private void startBroadcastTask(Address suspect) {
}
}

private void stopBroadcastTask() {
protected void stopBroadcastTask() {
bcast_lock.lock();
try {
if(bcast_future != null) {
Expand Down Expand Up @@ -570,7 +562,7 @@ void adjustSuspectedMembers(List<Address> new_mbrship) {


protected final class BroadcastTask implements Runnable {
private final List<Address> suspected_members=new ArrayList<Address>();
protected final List<Address> suspected_members=new ArrayList<Address>();


BroadcastTask(List<Address> suspected_members) {
Expand All @@ -585,9 +577,7 @@ public void stop() {


public void run() {
Message suspect_msg;
FD.FdHeader hdr;

synchronized(suspected_members) {
if(suspected_members.isEmpty()) {
stop();
Expand All @@ -598,9 +588,7 @@ public void run() {
hdr.mbrs=new ArrayList<Address>(suspected_members);
hdr.from=local_addr;
}
suspect_msg=new Message(); // mcast SUSPECT to all members
suspect_msg.setFlag(Message.OOB);
suspect_msg.putHeader(id, hdr);
Message suspect_msg=new Message().setFlag(Message.OOB).putHeader(id, hdr);
if(log.isDebugEnabled())
log.debug(local_addr + ": broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group");
down_prot.down(new Event(Event.MSG, suspect_msg));
Expand Down
76 changes: 76 additions & 0 deletions tests/junit-functional/org/jgroups/tests/FdMonitorTest.java
@@ -0,0 +1,76 @@
package org.jgroups.tests;

import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.protocols.FD;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.UNICAST2;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;



@Test(groups=Global.FUNCTIONAL)
public class FdMonitorTest {
protected static final String CLUSTER_NAME = "FdMonitorTest";
protected JChannel a, b;


@BeforeMethod
public void setup() throws Exception {
a=createChannel("A");
b=createChannel("B");
}

@AfterMethod
public void destroy() {
Util.close(b,a);
}

public void testFdMonitorActivation() throws Exception {
a.connect(CLUSTER_NAME);
assert !FD(a).isMonitorRunning();

b.connect(CLUSTER_NAME);
Util.waitUntilAllChannelsHaveSameSize(10000,500,a,b);

validateFdMonitor("");

reconnect(b);
Util.waitUntilAllChannelsHaveSameSize(10000, 500,a,b);

// Util.sleep(60000);
validateFdMonitor("after B reconnect");
}


protected void validateFdMonitor(String msg) {
assert FD(a).isMonitorRunning() : "A.FD.monitor "+msg;
assert FD(b).isMonitorRunning() : "B.FD.monitor "+msg;
}

protected static void reconnect(JChannel node) throws Exception {
node.disconnect();
node.connect(CLUSTER_NAME);
}

protected static FD FD(JChannel ch) {
return (FD) ch.getProtocolStack().findProtocol(FD.class);
}

protected JChannel createChannel(String name) throws Exception {
JChannel ch=Util.createChannel(new SHARED_LOOPBACK(),
new PING().setValue("timeout",500).setValue("num_initial_members",2),
new FD().setValue("timeout", 1000).setValue("num_tries", 3),
new NAKACK2(),
new UNICAST2(),
new GMS().setValue("print_local_addr",false));
ch.setName(name);
return ch;
}
}

0 comments on commit 19404ee

Please sign in to comment.