diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index d2dce19a807..ab83ab803bb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -42,7 +42,8 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol version: Long = 0L, failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats], intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]], - timestamps: Map[Address, Long] = Map.empty[Address, Long]) + timestamps: Map[Address, Long] = Map.empty[Address, Long], + explicitRemovals: Set[Address] = Set.empty[Address]) private val state = new AtomicReference[State](State()) @@ -63,6 +64,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val oldFailureStats = oldState.failureStats val oldTimestamps = oldState.timestamps val latestTimestamp = oldState.timestamps.get(connection) + val explicitRemovals = oldState.explicitRemovals if (latestTimestamp.isEmpty) { @@ -71,12 +73,14 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val newFailureStats = oldFailureStats + (connection -> FailureStats()) val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long]) val newTimestamps = oldTimestamps + (connection -> newTimestamp) + val newExplicitRemovals = explicitRemovals - connection val newState = oldState copy ( version = oldState.version + 1, failureStats = newFailureStats, intervalHistory = newIntervalHistory, - timestamps = newTimestamps) + timestamps = newTimestamps, + explicitRemovals = newExplicitRemovals) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -125,10 +129,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection) + val newExplicitRemovals = explicitRemovals - connection + val newState = oldState copy (version = oldState.version + 1, failureStats = newFailureStats, intervalHistory = newIntervalHistory, - timestamps = newTimestamps) + timestamps = newTimestamps, + explicitRemovals = newExplicitRemovals) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -150,7 +157,9 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val oldTimestamp = oldState.timestamps.get(connection) val phi = - if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections + // if connection has been removed explicitly + if (oldState.explicitRemovals.contains(connection)) Double.MaxValue + else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timestampDiff = newTimestamp - oldTimestamp.get @@ -179,11 +188,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol val failureStats = oldState.failureStats - connection val intervalHistory = oldState.intervalHistory - connection val timestamps = oldState.timestamps - connection + val explicitRemovals = oldState.explicitRemovals + connection val newState = oldState copy (version = oldState.version + 1, failureStats = failureStats, intervalHistory = intervalHistory, - timestamps = timestamps) + timestamps = timestamps, + explicitRemovals = explicitRemovals) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) remove(connection) // recur diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 275cd32c754..1da43156d34 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -36,8 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) } - // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector - "mark node as dead after explicit removal of connection" ignore { + "mark node as dead after explicit removal of connection" in { val fd = new AccrualFailureDetector(system, conn) fd.heartbeat(conn) @@ -55,6 +54,35 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(false) } + "mark node as available after explicit removal of connection and receiving heartbeat again" in { + val fd = new AccrualFailureDetector(system, conn) + + fd.heartbeat(conn) + + Thread.sleep(1000) + fd.heartbeat(conn) + + Thread.sleep(100) + fd.heartbeat(conn) + + fd.isAvailable(conn) must be(true) + + fd.remove(conn) + + fd.isAvailable(conn) must be(false) + + // it recieves heartbeat from an explicitly removed node + fd.heartbeat(conn) + + Thread.sleep(1000) + fd.heartbeat(conn) + + Thread.sleep(100) + fd.heartbeat(conn) + + fd.isAvailable(conn) must be(true) + } + "mark node as dead if heartbeat are missed" in { val fd = new AccrualFailureDetector(system, conn, threshold = 3)