Permalink
Browse files

Merge pull request #415 from amir343/master

AccrualFD: explicit removal of connections functionality + corresponding tests
  • Loading branch information...
2 parents 1614ae3 + d3e18aa commit 1f30be1f875b114a682b2b3246463883c0e68368 @viktorklang viktorklang committed Apr 24, 2012
@@ -42,7 +42,8 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
version: Long = 0L,
failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats],
intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]],
- timestamps: Map[Address, Long] = Map.empty[Address, Long])
+ timestamps: Map[Address, Long] = Map.empty[Address, Long],
+ explicitRemovals: Set[Address] = Set.empty[Address])
private val state = new AtomicReference[State](State())
@@ -63,6 +64,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldFailureStats = oldState.failureStats
val oldTimestamps = oldState.timestamps
val latestTimestamp = oldState.timestamps.get(connection)
+ val explicitRemovals = oldState.explicitRemovals
if (latestTimestamp.isEmpty) {
@@ -71,12 +73,14 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val newFailureStats = oldFailureStats + (connection -> FailureStats())
val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
val newTimestamps = oldTimestamps + (connection -> newTimestamp)
+ val newExplicitRemovals = explicitRemovals - connection
val newState = oldState copy (
version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
- timestamps = newTimestamps)
+ timestamps = newTimestamps,
+ explicitRemovals = newExplicitRemovals)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@@ -125,10 +129,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)
+ val newExplicitRemovals = explicitRemovals - connection
+
val newState = oldState copy (version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
- timestamps = newTimestamps)
+ timestamps = newTimestamps,
+ explicitRemovals = newExplicitRemovals)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@@ -150,7 +157,9 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldTimestamp = oldState.timestamps.get(connection)
val phi =
- if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
+ // if connection has been removed explicitly
+ if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
+ else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = newTimestamp - oldTimestamp.get
@@ -179,11 +188,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val failureStats = oldState.failureStats - connection
val intervalHistory = oldState.intervalHistory - connection
val timestamps = oldState.timestamps - connection
+ val explicitRemovals = oldState.explicitRemovals + connection
val newState = oldState copy (version = oldState.version + 1,
failureStats = failureStats,
intervalHistory = intervalHistory,
- timestamps = timestamps)
+ timestamps = timestamps,
+ explicitRemovals = explicitRemovals)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur
@@ -36,8 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
}
- // FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector
- "mark node as dead after explicit removal of connection" ignore {
+ "mark node as dead after explicit removal of connection" in {
val fd = new AccrualFailureDetector(system, conn)
fd.heartbeat(conn)
@@ -55,6 +54,35 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(false)
}
+ "mark node as available after explicit removal of connection and receiving heartbeat again" in {
+ val fd = new AccrualFailureDetector(system, conn)
+
+ fd.heartbeat(conn)
+
+ Thread.sleep(1000)
+ fd.heartbeat(conn)
+
+ Thread.sleep(100)
+ fd.heartbeat(conn)
+
+ fd.isAvailable(conn) must be(true)
+
+ fd.remove(conn)
+
+ fd.isAvailable(conn) must be(false)
+
+ // it recieves heartbeat from an explicitly removed node
+ fd.heartbeat(conn)
+
+ Thread.sleep(1000)
+ fd.heartbeat(conn)
+
+ Thread.sleep(100)
+ fd.heartbeat(conn)
+
+ fd.isAvailable(conn) must be(true)
+ }
+
"mark node as dead if heartbeat are missed" in {
val fd = new AccrualFailureDetector(system, conn, threshold = 3)

0 comments on commit 1f30be1

Please sign in to comment.