Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Use consistent hash to heartbeat to a few nodes instead of all, see #2284 #787

Merged
merged 7 commits into from

3 participants

@patriknw
Owner
  • Previously heartbeat messages was sent to all other members, i.e. each member was monitored by all other members in the cluster.
  • This was the number one know scalability bottleneck, due to the number of interconnections.
  • Limit sending of heartbeats to a few (5) members. Select and re-balance with consistent hashing algorithm when new members are added or removed.
  • Send a few EndHeartbeat when ending send of Heartbeat messages.
patriknw added some commits
@patriknw patriknw Move heartbeat sending out from ClusterCoreDaemon, see #2284
cecde67
@patriknw patriknw URLEncode heartbeat sender child names
* Names can be url encoded now, instead of MD5
7557433
@patriknw patriknw Use consistent hash to heartbeat to a few nodes instead of all, see #…
…2284

* Previously heartbeat messages was sent to all other members, i.e.
  each member was monitored by all other members in the cluster.
* This was the number one know scalability bottleneck, due to the
  number of interconnections.
* Limit sending of heartbeats to a few (5) members. Select and
  re-balance with consistent hashing algorithm when new members
  are added or removed.
* Send a few EndHeartbeat when ending send of Heartbeat messages.
3f73705
@viktorklang viktorklang commented on the diff
.gitignore
@@ -67,3 +67,4 @@ redis/
beanstalk/
.scalastyle
bin/
+.worksheet
@viktorklang Owner

good one!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff
akka-cluster/src/main/resources/reference.conf
@@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats?
heartbeat-interval = 1s
+ # Number of member nodes that each member will send heartbeat messages to,
+ # i.e. each node will be monitored by this number of other nodes.
+ monitored-by-nr-of-members = 5
@viktorklang Owner

how about a factor/percentage of nodes instead of a fixed number?

@patriknw Owner
patriknw added a note

why would that be better?
factor 5 % is reasonable for a 100 node cluster, but not for 1000 node cluster

@viktorklang Owner

What if you do not know the grand total of nodes when you boot the app?

@patriknw Owner
patriknw added a note
@viktorklang Owner

Nevermind, I might just be overexaggerating.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
((38 lines not shown))
+ import cluster.settings._
+ import context.dispatcher
+
+ val selfHeartbeat = Heartbeat(selfAddress)
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
+
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
+
+ // start periodic heartbeat to other nodes in cluster
+ val heartbeatTask =
+ FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
@viktorklang Owner

Did you need FixedRateTask after the scheduler fix?

@patriknw Owner
patriknw added a note

I will remove that, but that will be done as a separate ticket (today)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
((42 lines not shown))
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
+
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
+
+ // start periodic heartbeat to other nodes in cluster
+ val heartbeatTask =
+ FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
+ self ! HeartbeatTick
+ }
+
+ override def preStart(): Unit = {
@viktorklang Owner

one-liner :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
((166 lines not shown))
+ ending --= current
+ }
+
+ /**
+ * Select a few peers that heartbeats will be sent to, i.e. that will
+ * monitor this node. Try to send heartbeats to same nodes as much
+ * as possible, but re-balance with consistent hashing algorithm when
+ * new members are added or removed.
+ */
+ def selectPeers: Set[Address] = {
+ val allSize = all.size
+ val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers)
+ // try more if consistentHash results in same node as already selected
+ val attemptLimit = nrOfPeers * 2
+ @tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
+ if (acc.size == nrOfPeers || n == attemptLimit) acc
@drewhk Owner
drewhk added a note

Maybe log that fewer peers were selected because attemptLimit was reached.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
((82 lines not shown))
- /**
- * Child name is MD5 hash of the address.
- * FIXME Change to URLEncode when ticket #2123 has been fixed
- */
- def encodeChildName(name: String): String = {
- digester update name.getBytes("UTF-8")
- digester.digest.map { h "%02x".format(0xFF & h) }.mkString
+ def init(state: CurrentClusterState): Unit = {
+ all = state.members.collect { case m if m.address != selfAddress m.address }
+ joinInProgress --= all
+ consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
@drewhk Owner
drewhk added a note

shouldn't be update() called here?

@patriknw Owner
patriknw added a note

it certainly should, thanks

@patriknw Owner
patriknw added a note

fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
((7 lines not shown))
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
+ log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
@drewhk Owner
drewhk added a note

This deserves a comment :)

@patriknw Owner
patriknw added a note

ok, added yet another comment why this workaround exists - can't wait for the new remoting :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ter/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -24,6 +24,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
+ final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
@viktorklang Owner

What if heartbeatInterval is -1?

@patriknw Owner
patriknw added a note

alright, added some boundary checks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
((34 lines not shown))
+ import InternalClusterAction.HeartbeatTick
+
+ val cluster = Cluster(context.system)
+ import cluster.{ selfAddress, scheduler }
+ import cluster.settings._
+ import context.dispatcher
+
+ val selfHeartbeat = Heartbeat(selfAddress)
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
+
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
@viktorklang Owner

Instead of having a lot of vars, aren't most of the vars interconnected? Create a case class and use copy?

@patriknw Owner

I have moved the state and related logic to a separate immutable case class, ClusterHeartbeatSenderState. Pretty slick. Easier to unit test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw
Owner

I introduced a bug in previous refactoring, I'll update test and fix.

@patriknw
Owner

bug fixed

@viktorklang
Owner

:+1:

@patriknw patriknw merged commit 8dfb943 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 8, 2012
  1. @patriknw
  2. @patriknw

    URLEncode heartbeat sender child names

    patriknw authored
    * Names can be url encoded now, instead of MD5
  3. @patriknw

    Use consistent hash to heartbeat to a few nodes instead of all, see #…

    patriknw authored
    …2284
    
    * Previously heartbeat messages was sent to all other members, i.e.
      each member was monitored by all other members in the cluster.
    * This was the number one know scalability bottleneck, due to the
      number of interconnections.
    * Limit sending of heartbeats to a few (5) members. Select and
      re-balance with consistent hashing algorithm when new members
      are added or removed.
    * Send a few EndHeartbeat when ending send of Heartbeat messages.
Commits on Oct 9, 2012
  1. @patriknw
  2. @patriknw

    Merge branch 'master' into wip-2284-heartbeat-scalability-patriknw

    patriknw authored
    Conflicts:
    	akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
Commits on Oct 10, 2012
  1. @patriknw
  2. @patriknw
This page is out of date. Refresh to see the latest.
View
1  .gitignore
@@ -67,3 +67,4 @@ redis/
beanstalk/
.scalastyle
bin/
+.worksheet
@viktorklang Owner

good one!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
2  akka-actor/src/main/scala/akka/routing/ConsistentHash.scala
@@ -18,7 +18,7 @@ import java.util.Arrays
* hash, i.e. make sure it is different for different nodes.
*
*/
-class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) {
+class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) {
import ConsistentHash._
View
4 akka-cluster/src/main/resources/reference.conf
@@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats?
heartbeat-interval = 1s
+ # Number of member nodes that each member will send heartbeat messages to,
+ # i.e. each node will be monitored by this number of other nodes.
+ monitored-by-nr-of-members = 5
@viktorklang Owner

how about a factor/percentage of nodes instead of a fixed number?

@patriknw Owner
patriknw added a note

why would that be better?
factor 5 % is reasonable for a 100 node cluster, but not for 1000 node cluster

@viktorklang Owner

What if you do not know the grand total of nodes when you boot the app?

@patriknw Owner
patriknw added a note
@viktorklang Owner

Nevermind, I might just be overexaggerating.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
# defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
View
4 akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -62,7 +62,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
- val selfAddress = system.provider match {
+ val selfAddress: Address = system.provider match {
case c: ClusterActorRefProvider c.transport.address
case other throw new ConfigurationException(
"ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]".
@@ -74,7 +74,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
log.info("Cluster Node [{}] - is starting up...", selfAddress)
- val failureDetector = {
+ val failureDetector: FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({
View
42 akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -155,8 +155,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
withDispatcher(context.props.dispatcher), name = "publisher")
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "core")
- context.actorOf(Props[ClusterHeartbeatDaemon].
- withDispatcher(context.props.dispatcher), name = "heartbeat")
+ context.actorOf(Props[ClusterHeartbeatReceiver].
+ withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics")
@@ -172,26 +172,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
import ClusterLeaderAction._
import InternalClusterAction._
- import ClusterHeartbeatSender._
+ import ClusterHeartbeatSender.JoinInProgress
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._
val vclockNode = VectorClock.Node(selfAddress.toString)
- val selfHeartbeat = Heartbeat(selfAddress)
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip()
- var joinInProgress: Map[Address, Deadline] = Map.empty
var stats = ClusterStats()
- val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
- withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props[ClusterCoreSender].
withDispatcher(UseDispatcher), name = "coreSender")
+ val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
+ withDispatcher(UseDispatcher), name = "heartbeatSender")
import context.dispatcher
@@ -199,10 +197,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration],
GossipInterval, self, GossipTick)
- // start periodic heartbeat to all nodes in cluster
- val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
- HeartbeatInterval, self, HeartbeatTick)
-
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration],
UnreachableNodesReaperInterval, self, ReapUnreachableTick)
@@ -223,7 +217,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
override def postStop(): Unit = {
gossipTask.cancel()
- heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStatsTask foreach { _.cancel() }
@@ -241,7 +234,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
- case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
@@ -284,12 +276,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
- joinInProgress = Map(address -> (Deadline.now + JoinTimeout))
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
publish(localGossip)
+ heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
context.become(initialized)
if (address == selfAddress)
@@ -508,12 +500,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer
- val newJoinInProgress =
- if (joinInProgress.isEmpty) joinInProgress
- else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address)
-
latestGossip = winningGossip seen selfAddress
- joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).foreach {
@@ -735,27 +722,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
}
- def heartbeat(): Unit = {
- removeOverdueJoinInProgress()
-
- val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
-
- val deadline = Deadline.now + HeartbeatInterval
- beatTo.foreach { address if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) }
- }
-
- /**
- * Removes overdue joinInProgress from State.
- */
- def removeOverdueJoinInProgress(): Unit = {
- joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
- }
-
/**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {
-
if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available
View
311 akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
@@ -4,31 +4,49 @@
package akka.cluster
import language.postfixOps
-
-import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props }
-import java.security.MessageDigest
-import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
+import scala.collection.immutable.SortedSet
+import scala.annotation.tailrec
import scala.concurrent.util.duration._
import scala.concurrent.util.Deadline
+import scala.concurrent.util.FiniteDuration
+import java.net.URLEncoder
+import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props }
+import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
+import akka.cluster.ClusterEvent._
+import akka.routing.ConsistentHash
/**
- * Sent at regular intervals for failure detection.
+ * INTERNAL API
*/
-case class Heartbeat(from: Address) extends ClusterMessage
+private[akka] object ClusterHeartbeatReceiver {
+ /**
+ * Sent at regular intervals for failure detection.
+ */
+ case class Heartbeat(from: Address) extends ClusterMessage
+
+ /**
+ * Tell failure detector at receiving side that it should
+ * remove the monitoring, because heartbeats will end from
+ * this node.
+ */
+ case class EndHeartbeat(from: Address) extends ClusterMessage
+}
/**
* INTERNAL API.
*
- * Receives Heartbeat messages and delegates to Cluster.
+ * Receives Heartbeat messages and updates failure detector.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages.
*/
-private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging {
+private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
+ import ClusterHeartbeatReceiver._
val failureDetector = Cluster(context.system).failureDetector
def receive = {
- case Heartbeat(from) failureDetector heartbeat from
+ case Heartbeat(from) failureDetector heartbeat from
+ case EndHeartbeat(from) failureDetector remove from
}
}
@@ -38,69 +56,271 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg
*/
private[cluster] object ClusterHeartbeatSender {
/**
- *
- * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]]
- * to the other node.
+ * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
+ * another node and heartbeats should be sent unconditionally until it becomes
+ * member or deadline is overdue. This is done to be able to detect immediate death
+ * of the joining node.
* Local only, no need to serialize.
*/
- case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
+ case class JoinInProgress(address: Address, deadline: Deadline)
}
/*
* INTERNAL API
*
* This actor is responsible for sending the heartbeat messages to
- * other nodes. Netty blocks when sending to broken connections. This actor
- * isolates sending to different nodes by using child workers for each target
+ * a few other nodes that will monitor this node.
+ *
+ * Netty blocks when sending to broken connections. This actor
+ * isolates sending to different nodes by using child actors for each target
* address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes.
*/
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._
+ import ClusterHeartbeatSenderConnection._
+ import ClusterHeartbeatReceiver._
+ import InternalClusterAction.HeartbeatTick
+
+ val cluster = Cluster(context.system)
+ import cluster.{ selfAddress, scheduler }
+ import cluster.settings._
+ import context.dispatcher
+
+ val selfHeartbeat = Heartbeat(selfAddress)
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+
+ var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor),
+ selfAddress.toString, MonitoredByNrOfMembers)
+
+ // start periodic heartbeat to other nodes in cluster
+ val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
+ HeartbeatInterval, self, HeartbeatTick)
+
+ override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
+
+ override def postStop(): Unit = {
+ heartbeatTask.cancel()
+ cluster.unsubscribe(self)
+ }
/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
- context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
-
- val digester = MessageDigest.getInstance("MD5")
+ context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
- /**
- * Child name is MD5 hash of the address.
- * FIXME Change to URLEncode when ticket #2123 has been fixed
- */
- def encodeChildName(name: String): String = {
- digester update name.getBytes("UTF-8")
- digester.digest.map { h "%02x".format(0xFF & h) }.mkString
+ def receive = {
+ case HeartbeatTick heartbeat()
+ case s: CurrentClusterState reset(s)
+ case MemberUnreachable(m) removeMember(m)
+ case MemberRemoved(m) removeMember(m)
+ case e: MemberEvent addMember(e.member)
+ case JoinInProgress(a, d) addJoinInProgress(a, d)
}
- def receive = {
- case msg @ SendHeartbeat(from, to, deadline)
- val workerName = encodeChildName(to.toString)
- val worker = context.actorFor(workerName) match {
+ def reset(snapshot: CurrentClusterState): Unit =
+ state = state.reset(snapshot.members.collect { case m if m.address != selfAddress m.address })
+
+ def addMember(m: Member): Unit = if (m.address != selfAddress)
+ state = state addMember m.address
+
+ def removeMember(m: Member): Unit = if (m.address != selfAddress)
+ state = state removeMember m.address
+
+ def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
+ state = state.addJoinInProgress(address, deadline)
+
+ def heartbeat(): Unit = {
+ state = state.removeOverdueJoinInProgress()
+
+ def connection(to: Address): ActorRef = {
+ // URL encoded target address as child actor name
+ val connectionName = URLEncoder.encode(to.toString, "UTF-8")
+ context.actorFor(connectionName) match {
case notFound if notFound.isTerminated
- context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName)
+ context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName)
case child child
}
- worker ! msg
+ }
+
+ val deadline = Deadline.now + HeartbeatInterval
+ state.active foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
+
+ // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
+ // sent to notify it that no more heartbeats will be sent.
+ for ((to, count) state.ending) {
+ val c = connection(to)
+ c ! SendEndHeartbeat(selfEndHeartbeat, to)
+ if (count == NumberOfEndHeartbeats) {
+ state = state.removeEnding(to)
+ c ! PoisonPill
+ } else
+ state = state.increaseEndingCount(to)
+ }
}
}
/**
- * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address.
+ * INTERNAL API
+ */
+private[cluster] object ClusterHeartbeatSenderState {
+ /**
+ * Initial, empty state
+ */
+ def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String,
+ monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers)
+
+ /**
+ * Create a new state based on previous state, and
+ * keep track of which nodes to stop sending heartbeats to.
+ */
+ private def apply(
+ old: ClusterHeartbeatSenderState,
+ consistentHash: ConsistentHash[Address],
+ all: Set[Address]): ClusterHeartbeatSenderState = {
+
+ /**
+ * Select a few peers that heartbeats will be sent to, i.e. that will
+ * monitor this node. Try to send heartbeats to same nodes as much
+ * as possible, but re-balance with consistent hashing algorithm when
+ * new members are added or removed.
+ */
+ def selectPeers: Set[Address] = {
+ val allSize = all.size
+ val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers)
+ // try more if consistentHash results in same node as already selected
+ val attemptLimit = nrOfPeers * 2
+ @tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
+ if (acc.size == nrOfPeers || n == attemptLimit) acc
+ else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1)
+ }
+ if (nrOfPeers >= allSize) all
+ else select(Set.empty[Address], 0)
+ }
+
+ val curr = selectPeers
+ // start ending process for nodes not selected any more
+ // abort ending process for nodes that have been selected again
+ val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
+ old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end)
+ }
+
+}
+
+/**
+ * INTERNAL API
+ *
+ * State used by [akka.cluster.ClusterHeartbeatSender].
+ * The initial state is created with `empty` in the of
+ * the companion object, thereafter the state is modified
+ * with the methods, such as `addMember`. It is immutable,
+ * i.e. the methods return new instances.
+ */
+private[cluster] case class ClusterHeartbeatSenderState private (
+ consistentHash: ConsistentHash[Address],
+ selfAddressStr: String,
+ monitoredByNrOfMembers: Int,
+ all: Set[Address] = Set.empty,
+ current: Set[Address] = Set.empty,
+ ending: Map[Address, Int] = Map.empty,
+ joinInProgress: Map[Address, Deadline] = Map.empty) {
+
+ // FIXME can be disabled as optimization
+ assertInvariants
+
+ private def assertInvariants: Unit = {
+ val currentAndEnding = current.intersect(ending.keySet)
+ require(currentAndEnding.isEmpty,
+ "Same nodes in current and ending not allowed, got [%s]" format currentAndEnding)
+ val joinInProgressAndAll = joinInProgress.keySet.intersect(all)
+ require(joinInProgressAndAll.isEmpty,
+ "Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll)
+ val currentNotInAll = current -- all
+ require(currentNotInAll.isEmpty,
+ "Nodes in current but not in all not allowed, got [%s]" format currentNotInAll)
+ require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]"
+ format all)
+ }
+
+ val active: Set[Address] = current ++ joinInProgress.keySet
+
+ def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ },
+ consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
+ all = nodes)
+
+ def addMember(a: Address): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a)
+
+ def removeMember(a: Address): ClusterHeartbeatSenderState =
+ ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a)
+
+ private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
+ if (joinInProgress contains address)
+ copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
+ else this
+ }
+
+ def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
+ if (all contains address) this
+ else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address)
+ }
+
+ /**
+ * Cleanup overdue joinInProgress, in case a joining node never
+ * became member, for some reason.
+ */
+ def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = {
+ val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
+ if (overdue.isEmpty) this
+ else
+ copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue)
+ }
+
+ def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
+
+ def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1)))
+
+}
+
+/**
+ * INTERNAL API
+ */
+private[cluster] object ClusterHeartbeatSenderConnection {
+ import ClusterHeartbeatReceiver._
+
+ /**
+ * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
+ * [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node.
+ * Local only, no need to serialize.
+ */
+ case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
+
+ /**
+ * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
+ * [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node.
+ * Local only, no need to serialize.
+ */
+ case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address)
+}
+
+/**
+ * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
+ * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
*
- * Netty blocks when sending to broken connections, and this actor uses
- * a configurable circuit breaker to reduce connect attempts to broken
+ * This actor exists only because Netty blocks when sending to broken connections,
+ * and this actor uses a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
- * @see ClusterHeartbeatSender
+ * @see akka.cluster.ClusterHeartbeatSender
*/
-private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
+private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging {
- import ClusterHeartbeatSender._
+ import ClusterHeartbeatSenderConnection._
val breaker = {
val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings
@@ -111,21 +331,20 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
}
- // make sure it will cleanup when not used any more
- context.setReceiveTimeout(30 seconds)
-
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
- // the CircuitBreaker will measure elapsed time and open if too many long calls
+ log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
+ // Netty blocks when sending to broken connections, the CircuitBreaker will
+ // measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
- log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
toRef ! heartbeatMsg
- if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
}
-
- case ReceiveTimeout context.stop(self) // cleanup when not used
-
+ if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
+ case SendEndHeartbeat(endHeartbeatMsg, _)
+ log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef)
+ toRef ! endHeartbeatMsg
}
+
}
View
36 akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -16,14 +16,34 @@ import scala.concurrent.util.FiniteDuration
class ClusterSettings(val config: Config, val systemName: String) {
import config._
- final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
- final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
- final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class")
- final val FailureDetectorMinStdDeviation: FiniteDuration =
- Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
- final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
- Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
- final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ final val FailureDetectorThreshold: Double = {
+ val x = getDouble("akka.cluster.failure-detector.threshold")
+ require(x > 0.0, "failure-detector.threshold must be > 0")
+ x
+ }
+ final val FailureDetectorMaxSampleSize: Int = {
+ val n = getInt("akka.cluster.failure-detector.max-sample-size")
+ require(n > 0, "failure-detector.max-sample-size must be > 0"); n
+ }
+ final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class")
+ final val FailureDetectorMinStdDeviation: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
+ require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d
+ }
+ final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
+ require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d
+ }
+ final val HeartbeatInterval: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d
+ }
+ final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
+ final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
+ final val MonitoredByNrOfMembers: Int = {
+ val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
+ require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
+ }
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr
View
6 akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala
@@ -42,7 +42,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
gossip-interval = 500 ms
auto-join = off
auto-down = on
- failure-detector.acceptable-heartbeat-pause = 10s
+ failure-detector.acceptable-heartbeat-pause = 5s
publish-stats-interval = 0 s # always, when it happens
}
akka.event-handlers = ["akka.testkit.TestEventListener"]
@@ -57,7 +57,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
akka.scheduler.tick-duration = 33 ms
akka.remote.log-remote-lifecycle-events = off
akka.remote.netty.execution-pool-size = 4
- #akka.remote.netty.reconnection-time-window = 1s
+ #akka.remote.netty.reconnection-time-window = 10s
+ akka.remote.netty.read-timeout = 5s
+ akka.remote.netty.write-timeout = 5s
akka.remote.netty.backoff-timeout = 500ms
akka.remote.netty.connection-timeout = 500ms
View
2  akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -29,6 +29,8 @@ class ClusterConfigSpec extends AkkaSpec {
PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second)
HeartbeatInterval must be(1 second)
+ NumberOfEndHeartbeats must be(4)
+ MonitoredByNrOfMembers must be(5)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
PublishStatsInterval must be(10 second)
View
108 akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala
@@ -0,0 +1,108 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package akka.cluster
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import akka.actor.Address
+import akka.routing.ConsistentHash
+import scala.concurrent.util.Deadline
+import scala.concurrent.util.duration._
+
+@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
+class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
+
+ val selfAddress = Address("akka", "sys", "myself", 2552)
+ val aa = Address("akka", "sys", "aa", 2552)
+ val bb = Address("akka", "sys", "bb", 2552)
+ val cc = Address("akka", "sys", "cc", 2552)
+ val dd = Address("akka", "sys", "dd", 2552)
+ val ee = Address("akka", "sys", "ee", 2552)
+
+ val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10),
+ selfAddress.toString, 3)
+
+ "A ClusterHeartbeatSenderState" must {
+
+ "return empty active set when no nodes" in {
+ emptyState.active.isEmpty must be(true)
+ }
+
+ "include joinInProgress in active set" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds)
+ s.joinInProgress.keySet must be(Set(aa))
+ s.active must be(Set(aa))
+ }
+
+ "remove joinInProgress from active set after removeOverdueJoinInProgress" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
+ s.joinInProgress must be(Map.empty)
+ s.active must be(Set.empty)
+ s.ending must be(Map(aa -> 0))
+ }
+
+ "remove joinInProgress after reset" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
+ s.joinInProgress must be(Map.empty)
+ }
+
+ "remove joinInProgress after addMember" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa)
+ s.joinInProgress must be(Map.empty)
+ }
+
+ "remove joinInProgress after removeMember" in {
+ val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
+ s.joinInProgress must be(Map.empty)
+ s.ending must be(Map(aa -> 0))
+ }
+
+ "remove from ending after addJoinInProgress" in {
+ val s = emptyState.reset(Set(aa, bb)).removeMember(aa)
+ s.ending must be(Map(aa -> 0))
+ val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds)
+ s2.joinInProgress.keySet must be(Set(aa))
+ s2.ending must be(Map.empty)
+ }
+
+ "include nodes from reset in active set" in {
+ val nodes = Set(aa, bb, cc)
+ val s = emptyState.reset(nodes)
+ s.all must be(nodes)
+ s.current must be(nodes)
+ s.ending must be(Map.empty)
+ s.active must be(nodes)
+ }
+
+ "limit current nodes to monitoredByNrOfMembers when adding members" in {
+ val nodes = Set(aa, bb, cc, dd)
+ val s = nodes.foldLeft(emptyState) { _ addMember _ }
+ s.all must be(nodes)
+ s.current.size must be(3)
+ s.addMember(ee).current.size must be(3)
+ }
+
+ "move meber to ending set when removing member" in {
+ val nodes = Set(aa, bb, cc, dd, ee)
+ val s = emptyState.reset(nodes)
+ s.ending must be(Map.empty)
+ val included = s.current.head
+ val s2 = s.removeMember(included)
+ s2.ending must be(Map(included -> 0))
+ s2.current must not contain (included)
+ val s3 = s2.addMember(included)
+ s3.current must contain(included)
+ s3.ending.keySet must not contain (included)
+ }
+
+ "increase ending count correctly" in {
+ val s = emptyState.reset(Set(aa)).removeMember(aa)
+ s.ending must be(Map(aa -> 0))
+ val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa)
+ s2.ending must be(Map(aa -> 2))
+ }
+
+ }
+}
Something went wrong with that request. Please try again.