Skip to content

Commit

Permalink
Merge pull request akka#415 from amir343/master
Browse files Browse the repository at this point in the history
AccrualFD: explicit removal of connections functionality + corresponding tests
  • Loading branch information
viktorklang committed Apr 24, 2012
2 parents 1614ae3 + d3e18aa commit 1f30be1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
Expand Up @@ -42,7 +42,8 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
version: Long = 0L,
failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats],
intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]],
timestamps: Map[Address, Long] = Map.empty[Address, Long])
timestamps: Map[Address, Long] = Map.empty[Address, Long],
explicitRemovals: Set[Address] = Set.empty[Address])

private val state = new AtomicReference[State](State())

Expand All @@ -63,6 +64,7 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldFailureStats = oldState.failureStats
val oldTimestamps = oldState.timestamps
val latestTimestamp = oldState.timestamps.get(connection)
val explicitRemovals = oldState.explicitRemovals

if (latestTimestamp.isEmpty) {

Expand All @@ -71,12 +73,14 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val newFailureStats = oldFailureStats + (connection -> FailureStats())
val newIntervalHistory = oldState.intervalHistory + (connection -> Vector.empty[Long])
val newTimestamps = oldTimestamps + (connection -> newTimestamp)
val newExplicitRemovals = explicitRemovals - connection

val newState = oldState copy (
version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
timestamps = newTimestamps)
timestamps = newTimestamps,
explicitRemovals = newExplicitRemovals)

// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
Expand Down Expand Up @@ -125,10 +129,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol

val newIntervalHistory = oldState.intervalHistory + (connection -> newIntervalsForConnection)

val newExplicitRemovals = explicitRemovals - connection

val newState = oldState copy (version = oldState.version + 1,
failureStats = newFailureStats,
intervalHistory = newIntervalHistory,
timestamps = newTimestamps)
timestamps = newTimestamps,
explicitRemovals = newExplicitRemovals)

// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
Expand All @@ -150,7 +157,9 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val oldTimestamp = oldState.timestamps.get(connection)

val phi =
if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
// if connection has been removed explicitly
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timestampDiff = newTimestamp - oldTimestamp.get

Expand Down Expand Up @@ -179,11 +188,13 @@ class AccrualFailureDetector(system: ActorSystem, address: Address, val threshol
val failureStats = oldState.failureStats - connection
val intervalHistory = oldState.intervalHistory - connection
val timestamps = oldState.timestamps - connection
val explicitRemovals = oldState.explicitRemovals + connection

val newState = oldState copy (version = oldState.version + 1,
failureStats = failureStats,
intervalHistory = intervalHistory,
timestamps = timestamps)
timestamps = timestamps,
explicitRemovals = explicitRemovals)

// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur
Expand Down
Expand Up @@ -36,8 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
}

// FIXME how should we deal with explicit removal of connection? - if triggered as failure then we have a problem in boostrap - see line 142 in AccrualFailureDetector
"mark node as dead after explicit removal of connection" ignore {
"mark node as dead after explicit removal of connection" in {
val fd = new AccrualFailureDetector(system, conn)

fd.heartbeat(conn)
Expand All @@ -55,6 +54,35 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(false)
}

"mark node as available after explicit removal of connection and receiving heartbeat again" in {
val fd = new AccrualFailureDetector(system, conn)

fd.heartbeat(conn)

Thread.sleep(1000)
fd.heartbeat(conn)

Thread.sleep(100)
fd.heartbeat(conn)

fd.isAvailable(conn) must be(true)

fd.remove(conn)

fd.isAvailable(conn) must be(false)

// it recieves heartbeat from an explicitly removed node
fd.heartbeat(conn)

Thread.sleep(1000)
fd.heartbeat(conn)

Thread.sleep(100)
fd.heartbeat(conn)

fd.isAvailable(conn) must be(true)
}

"mark node as dead if heartbeat are missed" in {
val fd = new AccrualFailureDetector(system, conn, threshold = 3)

Expand Down

0 comments on commit 1f30be1

Please sign in to comment.