Permalink
Browse files

Move state of ClusterHeartbeatSender to separate immutable class, see…

… #2284
  • Loading branch information...
1 parent 668d5a5 commit 66c81e915e818598c1b564b120fb2f6f2f990240 @patriknw patriknw committed Oct 10, 2012
@@ -18,7 +18,7 @@ import java.util.Arrays
* hash, i.e. make sure it is different for different nodes.
*
*/
-class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) {
+class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) {
import ConsistentHash._
@@ -89,13 +89,9 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
val selfHeartbeat = Heartbeat(selfAddress)
val selfEndHeartbeat = EndHeartbeat(selfAddress)
- val selfAddressStr = selfAddress.toString
- var all = Set.empty[Address]
- var current = Set.empty[Address]
- var ending = Map.empty[Address, Int]
- var joinInProgress = Map.empty[Address, Deadline]
- var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
+ var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor),
+ selfAddress.toString, MonitoredByNrOfMembers)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
@@ -115,47 +111,31 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
def receive = {
- case HeartbeatTick heartbeat()
- case state: CurrentClusterState init(state)
- case MemberUnreachable(m) removeMember(m)
- case MemberRemoved(m) removeMember(m)
- case e: MemberEvent addMember(e.member)
- case JoinInProgress(a, d) addJoinInProgress(a, d)
+ case HeartbeatTick heartbeat()
+ case s: CurrentClusterState reset(s)
+ case MemberUnreachable(m) removeMember(m)
+ case MemberRemoved(m) removeMember(m)
+ case e: MemberEvent addMember(e.member)
+ case JoinInProgress(a, d) addJoinInProgress(a, d)
}
- def init(state: CurrentClusterState): Unit = {
- all = state.members.collect { case m if m.address != selfAddress m.address }
- joinInProgress --= all
- consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
- update()
- }
+ def reset(snapshot: CurrentClusterState): Unit =
+ state = state.reset(snapshot.members.collect { case m if m.address != selfAddress m.address })
- def addMember(m: Member): Unit = if (m.address != selfAddress) {
- all += m.address
- consistentHash = consistentHash :+ m.address
- removeJoinInProgress(m.address)
- update()
- }
+ def addMember(m: Member): Unit = if (m.address != selfAddress)
+ state = state addMember m.address
- def removeMember(m: Member): Unit = if (m.address != selfAddress) {
- all -= m.address
- consistentHash = consistentHash :- m.address
- removeJoinInProgress(m.address)
- update()
- }
+ def removeMember(m: Member): Unit = if (m.address != selfAddress)
+ state = state removeMember m.address
- def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) {
- joinInProgress -= address
- ending += (address -> 0)
- }
+ def removeJoinInProgress(address: Address): Unit = if (address != selfAddress)
+ state = state.removeJoinInProgress(address)
- def addJoinInProgress(address: Address, deadline: Deadline): Unit = {
- if (address != selfAddress && !all.contains(address))
- joinInProgress += (address -> deadline)
- }
+ def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
+ state = state.addJoinInProgress(address, deadline)
def heartbeat(): Unit = {
- removeOverdueJoinInProgress()
+ state = state.removeOverdueJoinInProgress()
def connection(to: Address): ActorRef = {
// URL encoded target address as child actor name
@@ -168,67 +148,145 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
val deadline = Deadline.now + HeartbeatInterval
- (current ++ joinInProgress.keys) foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
+ state.active foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
// When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
// sent to notify it that no more heartbeats will be sent.
- for ((to, count) ending) {
+ for ((to, count) state.ending) {
val c = connection(to)
c ! SendEndHeartbeat(selfEndHeartbeat, to)
if (count == NumberOfEndHeartbeats) {
- ending -= to
+ state = state.removeEnding(to)
c ! PoisonPill
- } else {
- ending += (to -> (count + 1))
- }
+ } else
+ state = state.increaseEndingCount(to)
}
}
+}
+
+/**
+ * INTERNAL API
+ */
+private[cluster] object ClusterHeartbeatSenderState {
/**
- * Update current peers to send heartbeats to, and
+ * Initial, empty state
+ */
+ def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String,
+ monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers)
+
+ /**
+ * Create a new state based on previous state, and
* keep track of which nodes to stop sending heartbeats to.
*/
- def update(): Unit = {
- val previous = current
- current = selectPeers
+ private def apply(
+ old: ClusterHeartbeatSenderState,
+ consistentHash: ConsistentHash[Address],
+ all: Set[Address]): ClusterHeartbeatSenderState = {
+
+ /**
+ * Select a few peers that heartbeats will be sent to, i.e. that will
+ * monitor this node. Try to send heartbeats to same nodes as much
+ * as possible, but re-balance with consistent hashing algorithm when
+ * new members are added or removed.
+ */
+ def selectPeers: Set[Address] = {
+ val allSize = all.size
+ val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers)
+ // try more if consistentHash results in same node as already selected
+ val attemptLimit = nrOfPeers * 2
+ @tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
+ if (acc.size == nrOfPeers || n == attemptLimit) acc
+ else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1)
+ }
+ if (nrOfPeers >= allSize) all
+ else select(Set.empty[Address], 0)
+ }
+
+ val curr = selectPeers
// start ending process for nodes not selected any more
- ending ++= (previous -- current).map(_ -> 0)
// abort ending process for nodes that have been selected again
- ending --= current
+ val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
+ old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end,
+ joinInProgress = old.joinInProgress -- all)
}
- /**
- * Select a few peers that heartbeats will be sent to, i.e. that will
- * monitor this node. Try to send heartbeats to same nodes as much
- * as possible, but re-balance with consistent hashing algorithm when
- * new members are added or removed.
- */
- def selectPeers: Set[Address] = {
- val allSize = all.size
- val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers)
- // try more if consistentHash results in same node as already selected
- val attemptLimit = nrOfPeers * 2
- @tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
- if (acc.size == nrOfPeers || n == attemptLimit) acc
- else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1)
- }
- if (nrOfPeers >= allSize) all
- else select(Set.empty[Address], 0)
+}
+
+/**
+ * INTERNAL API
+ *
+ * State used by [akka.cluster.ClusterHeartbeatSender].
+ * The initial state is created with `empty` in the of
+ * the companion object, thereafter the state is modified
+ * with the methods, such as `addMember`. It is immutable,
+ * i.e. the methods return new instances.
+ */
+private[cluster] case class ClusterHeartbeatSenderState private (
+ consistentHash: ConsistentHash[Address],
+ selfAddressStr: String,
+ monitoredByNrOfMembers: Int,
+ all: Set[Address] = Set.empty,
+ current: Set[Address] = Set.empty,
+ ending: Map[Address, Int] = Map.empty,
+ joinInProgress: Map[Address, Deadline] = Map.empty) {
+
+ // FIXME can be disabled as optimization
+ assertInvariants
+
+ private def assertInvariants: Unit = {
+ val currentAndEnding = current.intersect(ending.keySet)
+ require(currentAndEnding.isEmpty,
+ "Same nodes in current and ending not allowed, got [%s]" format currentAndEnding)
+ val joinInProgressAndAll = joinInProgress.keySet.intersect(all)
+ require(joinInProgressAndAll.isEmpty,
+ "Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll)
+ val currentNotInAll = current -- all
+ require(currentNotInAll.isEmpty,
+ "Nodes in current but not in all not allowed, got [%s]" format currentNotInAll)
+ require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]"
+ format all)
+ }
+
+ val active: Set[Address] = current ++ joinInProgress.keySet
+
+ def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
+ all = nodes)
+
+ def addMember(a: Address): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a)
+
+ def removeMember(a: Address): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a)
+
+ def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
+ if (joinInProgress contains address)
+ copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
+ else this
+ }
+
+ def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
+ if (all contains address) this
+ else copy(joinInProgress = joinInProgress + (address -> deadline))
}
/**
* Cleanup overdue joinInProgress, in case a joining node never
* became member, for some reason.
*/
- def removeOverdueJoinInProgress(): Unit = {
+ def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = {
val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
- if (overdue.nonEmpty) {
- log.info("Overdue join in progress [{}]", overdue.mkString(", "))
- ending ++= overdue.map(_ -> 0)
- joinInProgress --= overdue
- }
+ if (overdue.isEmpty) this
+ else
+ copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue)
}
+ def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
+
+ def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1)))
+
}
/**
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package akka.cluster
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import akka.actor.Address
+import akka.routing.ConsistentHash
+import scala.concurrent.util.Deadline
+import scala.concurrent.util.duration._
+
+@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
+class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
+
+ val selfAddress = Address("akka", "sys", "myself", 2552)
+ val aa = Address("akka", "sys", "aa", 2552)
+ val bb = Address("akka", "sys", "bb", 2552)
+ val cc = Address("akka", "sys", "cc", 2552)
+ val dd = Address("akka", "sys", "dd", 2552)
+ val ee = Address("akka", "sys", "ee", 2552)
+
+ val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10),
+ selfAddress.toString, 3)
+
+ "A ClusterHeartbeatSenderState" must {
+
+ "return empty active set when no nodes" in {
+ emptyState.active.isEmpty must be(true)
+ }
+
+ "include joinInProgress in active set" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds)
+ s.joinInProgress.keySet must be(Set(aa))
+ s.active must be(Set(aa))
+ }
+
+ "remove joinInProgress from active set after removeOverdueJoinInProgress" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
+ s.joinInProgress must be(Map.empty)
+ s.active must be(Set.empty)
+ }
+
+ "remove joinInProgress after reset" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb))
+ s.joinInProgress must be(Map.empty)
+ }
+
+ "include nodes from reset in active set" in {
+ val nodes = Set(aa, bb, cc)
+ val s = emptyState.reset(nodes)
+ s.all must be(nodes)
+ s.current must be(nodes)
+ s.ending must be(Map.empty)
+ s.active must be(nodes)
+ }
+
+ "limit current nodes to monitoredByNrOfMembers when adding members" in {
+ val nodes = Set(aa, bb, cc, dd)
+ val s = nodes.foldLeft(emptyState) { _ addMember _ }
+ s.all must be(nodes)
+ s.current.size must be(3)
+ s.addMember(ee).current.size must be(3)
+ }
+
+ "move meber to ending set when removing member" in {
+ val nodes = Set(aa, bb, cc, dd, ee)
+ val s = emptyState.reset(nodes)
+ s.ending must be(Map.empty)
+ val included = s.current.head
+ val s2 = s.removeMember(included)
+ s2.ending must be(Map(included -> 0))
+ s2.current must not contain (included)
+ val s3 = s2.addMember(included)
+ s3.current must contain(included)
+ s3.ending.keySet must not contain (included)
+ }
+
+ "increase ending count correctly" in {
+ val s = emptyState.reset(Set(aa)).removeMember(aa)
+ s.ending must be(Map(aa -> 0))
+ val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa)
+ s2.ending must be(Map(aa -> 2))
+ }
+
+ }
+}

0 comments on commit 66c81e9

Please sign in to comment.