Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Use consistent hash to heartbeat to a few nodes instead of all, see #…

…2284

* Previously heartbeat messages was sent to all other members, i.e.
  each member was monitored by all other members in the cluster.
* This was the number one know scalability bottleneck, due to the
  number of interconnections.
* Limit sending of heartbeats to a few (5) members. Select and
  re-balance with consistent hashing algorithm when new members
  are added or removed.
* Send a few EndHeartbeat when ending send of Heartbeat messages.
  • Loading branch information...
commit 3f73705abc7ca77e3f6166b588cb3ac060307c28 1 parent 7557433
@patriknw patriknw authored
View
1  .gitignore
@@ -67,3 +67,4 @@ redis/
beanstalk/
.scalastyle
bin/
+.worksheet
View
4 akka-cluster/src/main/resources/reference.conf
@@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats?
heartbeat-interval = 1s
+ # Number of member nodes that each member will send heartbeat messages to,
+ # i.e. each node will be monitored by this number of other nodes.
+ monitored-by-nr-of-members = 5
+
# defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
View
4 akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -62,7 +62,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
- val selfAddress = system.provider match {
+ val selfAddress: Address = system.provider match {
case c: ClusterActorRefProvider c.transport.address
case other throw new ConfigurationException(
"ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]".
@@ -74,7 +74,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
log.info("Cluster Node [{}] - is starting up...", selfAddress)
- val failureDetector = {
+ val failureDetector: FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({
View
2  akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -287,8 +287,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
- heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
publish(localGossip)
+ heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
context.become(initialized)
if (address == selfAddress)
View
210 akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
@@ -5,18 +5,32 @@ package akka.cluster
import language.postfixOps
import scala.collection.immutable.SortedSet
-import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props }
-import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
+import scala.annotation.tailrec
import scala.concurrent.util.duration._
import scala.concurrent.util.Deadline
import scala.concurrent.util.FiniteDuration
-import akka.cluster.ClusterEvent._
import java.net.URLEncoder
+import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props }
+import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
+import akka.cluster.ClusterEvent._
+import akka.routing.ConsistentHash
/**
- * Sent at regular intervals for failure detection.
+ * INTERNAL API
*/
-case class Heartbeat(from: Address) extends ClusterMessage
+private[akka] object ClusterHeartbeatReceiver {
+ /**
+ * Sent at regular intervals for failure detection.
+ */
+ case class Heartbeat(from: Address) extends ClusterMessage
+
+ /**
+ * Tell failure detector at receiving side that it should
+ * remove the monitoring, because heartbeats will end from
+ * this node.
+ */
+ case class EndHeartbeat(from: Address) extends ClusterMessage
+}
/**
* INTERNAL API.
@@ -26,11 +40,13 @@ case class Heartbeat(from: Address) extends ClusterMessage
* to Cluster message after message, but concurrent with other types of messages.
*/
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
+ import ClusterHeartbeatReceiver._
val failureDetector = Cluster(context.system).failureDetector
def receive = {
- case Heartbeat(from) failureDetector heartbeat from
+ case Heartbeat(from) failureDetector heartbeat from
+ case EndHeartbeat(from) failureDetector remove from
}
}
@@ -40,15 +56,10 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
*/
private[cluster] object ClusterHeartbeatSender {
/**
- * Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]]
- * to the other node.
- * Local only, no need to serialize.
- */
- case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
-
- /**
* Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
- * another node and heartbeats should be sent until it becomes member or deadline is overdue.
+ * another node and heartbeats should be sent unconditionally until it becomes
+ * member or deadline is overdue. This is done to be able to detect immediate death
+ * of the joining node.
* Local only, no need to serialize.
*/
case class JoinInProgress(address: Address, deadline: Deadline)
@@ -58,14 +69,17 @@ private[cluster] object ClusterHeartbeatSender {
* INTERNAL API
*
* This actor is responsible for sending the heartbeat messages to
- * other nodes. Netty blocks when sending to broken connections. This actor
- * isolates sending to different nodes by using child workers for each target
+ * a few other nodes that will monitor this node.
+ *
+ * Netty blocks when sending to broken connections. This actor
+ * isolates sending to different nodes by using child actors for each target
* address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes.
*/
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._
- import Member.addressOrdering
+ import ClusterHeartbeatSenderConnection._
+ import ClusterHeartbeatReceiver._
import InternalClusterAction.HeartbeatTick
val cluster = Cluster(context.system)
@@ -74,9 +88,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
import context.dispatcher
val selfHeartbeat = Heartbeat(selfAddress)
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
- var nodes: SortedSet[Address] = SortedSet.empty
- var joinInProgress: Map[Address, Deadline] = Map.empty
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask =
@@ -99,63 +118,146 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
- /**
- * Child name URL encoded target address.
- */
- def encodeChildName(name: String): String = URLEncoder.encode(name, "UTF-8")
-
def receive = {
+ case HeartbeatTick heartbeat()
case state: CurrentClusterState init(state)
case MemberUnreachable(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
- case JoinInProgress(a, d) joinInProgress += (a -> d)
- case HeartbeatTick heartbeat()
+ case JoinInProgress(a, d) addJoinInProgress(a, d)
}
def init(state: CurrentClusterState): Unit = {
- nodes = state.members.map(_.address)
- joinInProgress --= nodes
+ all = state.members.collect { case m if m.address != selfAddress m.address }
+ joinInProgress --= all
+ consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
+ }
+
+ def addMember(m: Member): Unit = if (m.address != selfAddress) {
+ all += m.address
+ consistentHash = consistentHash :+ m.address
+ removeJoinInProgress(m.address)
+ update()
}
- def addMember(m: Member): Unit = {
- nodes += m.address
- joinInProgress -= m.address
+ def removeMember(m: Member): Unit = if (m.address != selfAddress) {
+ all -= m.address
+ consistentHash = consistentHash :- m.address
+ removeJoinInProgress(m.address)
+ update()
}
- def removeMember(m: Member): Unit = {
- nodes -= m.address
- joinInProgress -= m.address
+ def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) {
+ joinInProgress -= address
+ ending += (address -> 0)
+ }
+
+ def addJoinInProgress(address: Address, deadline: Deadline): Unit = {
+ if (address != selfAddress && !all.contains(address))
+ joinInProgress += (address -> deadline)
}
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
- val beatTo = nodes ++ joinInProgress.keys
-
- val deadline = Deadline.now + HeartbeatInterval
- for (to beatTo; if to != selfAddress) {
- val workerName = encodeChildName(to.toString)
- val worker = context.actorFor(workerName) match {
+ def connection(to: Address): ActorRef = {
+ // URL encoded target address as child actor name
+ val connectionName = URLEncoder.encode(to.toString, "UTF-8")
+ context.actorFor(connectionName) match {
case notFound if notFound.isTerminated
- context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName)
+ context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName)
case child child
}
- worker ! SendHeartbeat(selfHeartbeat, to, deadline)
}
+
+ val deadline = Deadline.now + HeartbeatInterval
+ (current ++ joinInProgress.keys) foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
+
+ // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
+ // sent to notify it that no more heartbeats will be sent.
+ for ((to, count) ending) {
+ val c = connection(to)
+ c ! SendEndHeartbeat(selfEndHeartbeat, to)
+ if (count == NumberOfEndHeartbeats) {
+ ending -= to
+ c ! PoisonPill
+ } else {
+ ending += (to -> (count + 1))
+ }
+ }
+ }
+
+ /**
+ * Update current peers to send heartbeats to, and
+ * keep track of which nodes to stop sending heartbeats to.
+ */
+ def update(): Unit = {
+ val previous = current
+ current = selectPeers
+ // start ending process for nodes not selected any more
+ ending ++= (previous -- current).map(_ -> 0)
+ // abort ending process for nodes that have been selected again
+ ending --= current
}
/**
- * Removes overdue joinInProgress from State.
+ * Select a few peers that heartbeats will be sent to, i.e. that will
+ * monitor this node. Try to send heartbeats to same nodes as much
+ * as possible, but re-balance with consistent hashing algorithm when
+ * new members are added or removed.
+ */
+ def selectPeers: Set[Address] = {
+ val allSize = all.size
+ val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers)
+ // try more if consistentHash results in same node as already selected
+ val attemptLimit = nrOfPeers * 2
+ @tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
+ if (acc.size == nrOfPeers || n == attemptLimit) acc
+ else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1)
+ }
+ if (nrOfPeers >= allSize) all
+ else select(Set.empty[Address], 0)
+ }
+
+ /**
+ * Cleanup overdue joinInProgress, in case a joining node never
+ * became member, for some reason.
*/
def removeOverdueJoinInProgress(): Unit = {
- joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue address }
+ val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
+ if (overdue.nonEmpty) {
+ log.info("Overdue join in progress [{}]", overdue.mkString(", "))
+ ending ++= overdue.map(_ -> 0)
+ joinInProgress --= overdue
+ }
}
}
/**
- * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address.
+ * INTERNAL API
+ */
+private[cluster] object ClusterHeartbeatSenderConnection {
+ import ClusterHeartbeatReceiver._
+
+ /**
+ * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
+ * [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node.
+ * Local only, no need to serialize.
+ */
+ case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
+
+ /**
+ * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
+ * [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node.
+ * Local only, no need to serialize.
+ */
+ case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address)
+}
+
+/**
+ * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
+ * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
*
* Netty blocks when sending to broken connections, and this actor uses
* a configurable circuit breaker to reduce connect attempts to broken
@@ -163,10 +265,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
*
* @see ClusterHeartbeatSender
*/
-private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
+private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging {
- import ClusterHeartbeatSender._
+ import ClusterHeartbeatSenderConnection._
val breaker = {
val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings
@@ -177,21 +279,19 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
}
- // make sure it will cleanup when not used any more
- context.setReceiveTimeout(30 seconds)
-
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
+ log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
- log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
toRef ! heartbeatMsg
- if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
}
-
- case ReceiveTimeout context.stop(self) // cleanup when not used
-
+ if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
+ case SendEndHeartbeat(endHeartbeatMsg, _)
+ log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef)
+ toRef ! endHeartbeatMsg
}
+
}
View
3  akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -24,6 +24,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
+ final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
+ final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr
View
6 akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala
@@ -42,7 +42,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
gossip-interval = 500 ms
auto-join = off
auto-down = on
- failure-detector.acceptable-heartbeat-pause = 10s
+ failure-detector.acceptable-heartbeat-pause = 5s
publish-stats-interval = 0 s # always, when it happens
}
akka.event-handlers = ["akka.testkit.TestEventListener"]
@@ -57,7 +57,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
akka.scheduler.tick-duration = 33 ms
akka.remote.log-remote-lifecycle-events = off
akka.remote.netty.execution-pool-size = 4
- #akka.remote.netty.reconnection-time-window = 1s
+ #akka.remote.netty.reconnection-time-window = 10s
+ akka.remote.netty.read-timeout = 5s
+ akka.remote.netty.write-timeout = 5s
akka.remote.netty.backoff-timeout = 500ms
akka.remote.netty.connection-timeout = 500ms
View
2  akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -29,6 +29,8 @@ class ClusterConfigSpec extends AkkaSpec {
PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second)
HeartbeatInterval must be(1 second)
+ NumberOfEndHeartbeats must be(4)
+ MonitoredByNrOfMembers must be(5)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
PublishStatsInterval must be(10 second)
Please sign in to comment.
Something went wrong with that request. Please try again.