Use consistent hash to heartbeat to a few nodes instead of all, see #2284 #787

Merged
merged 7 commits into from Oct 15, 2012

Projects

None yet

3 participants

@patriknw
Member
patriknw commented Oct 8, 2012
  • Previously heartbeat messages was sent to all other members, i.e.
    each member was monitored by all other members in the cluster.
  • This was the number one know scalability bottleneck, due to the
    number of interconnections.
  • Limit sending of heartbeats to a few (5) members. Select and
    re-balance with consistent hashing algorithm when new members
    are added or removed.
  • Send a few EndHeartbeat when ending send of Heartbeat messages.
patriknw added some commits Oct 1, 2012
@patriknw patriknw Move heartbeat sending out from ClusterCoreDaemon, see #2284 cecde67
@patriknw patriknw URLEncode heartbeat sender child names
* Names can be url encoded now, instead of MD5
7557433
@patriknw patriknw Use consistent hash to heartbeat to a few nodes instead of all, see #…
…2284

* Previously heartbeat messages was sent to all other members, i.e.
  each member was monitored by all other members in the cluster.
* This was the number one know scalability bottleneck, due to the
  number of interconnections.
* Limit sending of heartbeats to a few (5) members. Select and
  re-balance with consistent hashing algorithm when new members
  are added or removed.
* Send a few EndHeartbeat when ending send of Heartbeat messages.
3f73705
@viktorklang viktorklang commented on the diff Oct 8, 2012
.gitignore
@@ -67,3 +67,4 @@ redis/
beanstalk/
.scalastyle
bin/
+.worksheet
@viktorklang
viktorklang Oct 8, 2012 Akka Project member

good one!

@viktorklang viktorklang commented on the diff Oct 8, 2012
akka-cluster/src/main/resources/reference.conf
@@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats?
heartbeat-interval = 1s
+ # Number of member nodes that each member will send heartbeat messages to,
+ # i.e. each node will be monitored by this number of other nodes.
+ monitored-by-nr-of-members = 5
@viktorklang
viktorklang Oct 8, 2012 Akka Project member

how about a factor/percentage of nodes instead of a fixed number?

@patriknw
patriknw Oct 8, 2012 Akka Project member

why would that be better?
factor 5 % is reasonable for a 100 node cluster, but not for 1000 node cluster

@viktorklang
viktorklang Oct 8, 2012 Akka Project member

What if you do not know the grand total of nodes when you boot the app?

@patriknw
patriknw Oct 8, 2012 Akka Project member

I don't understand what you mean, or what you think is the problem. Please elaborate.

When would it not be good that each node is monitored by 5 other nodes? It will of course not be more than number of members - 1. Why should it be proportional to number of nodes in cluster? That doesn't scale.

/Patrik

8 okt 2012 kl. 18:17 skrev Viktor Klang (√) notifications@github.com:

In akka-cluster/src/main/resources/reference.conf:

@@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats?
heartbeat-interval = 1s

  •  # Number of member nodes that each member will send heartbeat messages to,
    
  •  # i.e. each node will be monitored by this number of other nodes.
    
  •  monitored-by-nr-of-members = 5
    
    What if you do not know the grand total of nodes when you boot the app?


Reply to this email directly or view it on GitHub.

@viktorklang
viktorklang Oct 8, 2012 Akka Project member

Nevermind, I might just be overexaggerating.

@viktorklang viktorklang and 1 other commented on an outdated diff Oct 8, 2012
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
+ import cluster.settings._
+ import context.dispatcher
+
+ val selfHeartbeat = Heartbeat(selfAddress)
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
+
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
+
+ // start periodic heartbeat to other nodes in cluster
+ val heartbeatTask =
+ FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
@viktorklang
viktorklang Oct 8, 2012 Akka Project member

Did you need FixedRateTask after the scheduler fix?

@patriknw
patriknw Oct 8, 2012 Akka Project member

I will remove that, but that will be done as a separate ticket (today)

@viktorklang viktorklang commented on an outdated diff Oct 8, 2012
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
+
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
+
+ // start periodic heartbeat to other nodes in cluster
+ val heartbeatTask =
+ FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
+ self ! HeartbeatTick
+ }
+
+ override def preStart(): Unit = {
@viktorklang
viktorklang Oct 8, 2012 Akka Project member

one-liner :-)

@drewhk drewhk commented on an outdated diff Oct 8, 2012
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
+ ending --= current
+ }
+
+ /**
+ * Select a few peers that heartbeats will be sent to, i.e. that will
+ * monitor this node. Try to send heartbeats to same nodes as much
+ * as possible, but re-balance with consistent hashing algorithm when
+ * new members are added or removed.
+ */
+ def selectPeers: Set[Address] = {
+ val allSize = all.size
+ val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers)
+ // try more if consistentHash results in same node as already selected
+ val attemptLimit = nrOfPeers * 2
+ @tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
+ if (acc.size == nrOfPeers || n == attemptLimit) acc
@drewhk
drewhk Oct 8, 2012 Akka Project member

Maybe log that fewer peers were selected because attemptLimit was reached.

@drewhk drewhk and 1 other commented on an outdated diff Oct 8, 2012
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
- /**
- * Child name is MD5 hash of the address.
- * FIXME Change to URLEncode when ticket #2123 has been fixed
- */
- def encodeChildName(name: String): String = {
- digester update name.getBytes("UTF-8")
- digester.digest.map { h "%02x".format(0xFF & h) }.mkString
+ def init(state: CurrentClusterState): Unit = {
+ all = state.members.collect { case m if m.address != selfAddress m.address }
+ joinInProgress --= all
+ consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
@drewhk
drewhk Oct 8, 2012 Akka Project member

shouldn't be update() called here?

@patriknw
patriknw Oct 8, 2012 Akka Project member

it certainly should, thanks

@patriknw
patriknw Oct 9, 2012 Akka Project member

fixed

@drewhk drewhk and 1 other commented on an outdated diff Oct 8, 2012
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
+ log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
@drewhk
drewhk Oct 8, 2012 Akka Project member

This deserves a comment :)

@patriknw
patriknw Oct 9, 2012 Akka Project member

ok, added yet another comment why this workaround exists - can't wait for the new remoting :-)

@viktorklang viktorklang and 1 other commented on an outdated diff Oct 8, 2012
...ter/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -24,6 +24,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
+ final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
+ final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
@viktorklang
viktorklang Oct 8, 2012 Akka Project member

What if heartbeatInterval is -1?

@patriknw
patriknw Oct 9, 2012 Akka Project member

alright, added some boundary checks

@viktorklang viktorklang and 1 other commented on an outdated diff Oct 10, 2012
...er/src/main/scala/akka/cluster/ClusterHeartbeat.scala
+ import InternalClusterAction.HeartbeatTick
+
+ val cluster = Cluster(context.system)
+ import cluster.{ selfAddress, scheduler }
+ import cluster.settings._
+ import context.dispatcher
+
+ val selfHeartbeat = Heartbeat(selfAddress)
+ val selfEndHeartbeat = EndHeartbeat(selfAddress)
+ val selfAddressStr = selfAddress.toString
+
+ var all = Set.empty[Address]
+ var current = Set.empty[Address]
+ var ending = Map.empty[Address, Int]
+ var joinInProgress = Map.empty[Address, Deadline]
+ var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
@viktorklang
viktorklang Oct 10, 2012 Akka Project member

Instead of having a lot of vars, aren't most of the vars interconnected? Create a case class and use copy?

@patriknw
patriknw Oct 10, 2012 Akka Project member

I have moved the state and related logic to a separate immutable case class, ClusterHeartbeatSenderState. Pretty slick. Easier to unit test.

@patriknw
Member

I introduced a bug in previous refactoring, I'll update test and fix.

@patriknw
Member

bug fixed

@viktorklang
Member

👍

@patriknw patriknw merged commit 8dfb943 into master Oct 15, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment