Permalink
Browse files

Fix bug introduced in refactoring, see #2284

  • Loading branch information...
1 parent 66c81e9 commit 279fd2b6ef4a5d8aef16f7f0dd2d188b9b378a1c @patriknw patriknw committed Oct 10, 2012
View
17 akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
@@ -128,9 +128,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def removeMember(m: Member): Unit = if (m.address != selfAddress)
state = state removeMember m.address
- def removeJoinInProgress(address: Address): Unit = if (address != selfAddress)
- state = state.removeJoinInProgress(address)
-
def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
state = state.addJoinInProgress(address, deadline)
@@ -208,8 +205,7 @@ private[cluster] object ClusterHeartbeatSenderState {
// start ending process for nodes not selected any more
// abort ending process for nodes that have been selected again
val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
- old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end,
- joinInProgress = old.joinInProgress -- all)
+ old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end)
}
}
@@ -252,24 +248,25 @@ private[cluster] case class ClusterHeartbeatSenderState private (
val active: Set[Address] = current ++ joinInProgress.keySet
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
- ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
+ ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ },
+ consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
all = nodes)
def addMember(a: Address): ClusterHeartbeatSenderState =
- ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a)
+ ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a)
def removeMember(a: Address): ClusterHeartbeatSenderState =
- ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a)
+ ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a)
- def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
+ private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
if (joinInProgress contains address)
copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
else this
}
def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
if (all contains address) this
- else copy(joinInProgress = joinInProgress + (address -> deadline))
+ else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address)
}
/**
View
22 akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala
@@ -40,13 +40,33 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
s.joinInProgress must be(Map.empty)
s.active must be(Set.empty)
+ s.ending must be(Map(aa -> 0))
}
"remove joinInProgress after reset" in {
- val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb))
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
+ s.joinInProgress must be(Map.empty)
+ }
+
+ "remove joinInProgress after addMember" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa)
s.joinInProgress must be(Map.empty)
}
+ "remove joinInProgress after removeMember" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
+ s.joinInProgress must be(Map.empty)
+ s.ending must be(Map(aa -> 0))
+ }
+
+ "remove from ending after addJoinInProgress" in {
+ val s = emptyState.reset(Set(aa, bb)).removeMember(aa)
+ s.ending must be(Map(aa -> 0))
+ val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds)
+ s2.joinInProgress.keySet must be(Set(aa))
+ s2.ending must be(Map.empty)
+ }
+
"include nodes from reset in active set" in {
val nodes = Set(aa, bb, cc)
val s = emptyState.reset(nodes)

0 comments on commit 279fd2b

Please sign in to comment.