Skip to content

Commit

Permalink
Move heartbeat sending out from ClusterCoreDaemon, see #2284
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 8, 2012
1 parent 5b0a2ec commit cecde67
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 46 deletions.
44 changes: 6 additions & 38 deletions akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
Expand Up @@ -155,8 +155,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
withDispatcher(context.props.dispatcher), name = "publisher")
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "core")
context.actorOf(Props[ClusterHeartbeatDaemon].
withDispatcher(context.props.dispatcher), name = "heartbeat")
context.actorOf(Props[ClusterHeartbeatReceiver].
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics")

Expand All @@ -172,26 +172,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
import ClusterLeaderAction._
import InternalClusterAction._
import ClusterHeartbeatSender._
import ClusterHeartbeatSender.JoinInProgress

val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._

val vclockNode = VectorClock.Node(selfAddress.toString)
val selfHeartbeat = Heartbeat(selfAddress)

// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip()
var joinInProgress: Map[Address, Deadline] = Map.empty

var stats = ClusterStats()

val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props[ClusterCoreSender].
withDispatcher(UseDispatcher), name = "coreSender")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")

import context.dispatcher

Expand All @@ -201,12 +199,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
self ! GossipTick
}

// start periodic heartbeat to all nodes in cluster
val heartbeatTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
self ! HeartbeatTick
}

// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) {
Expand All @@ -232,7 +224,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto

override def postStop(): Unit = {
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStatsTask foreach { _.cancel() }
Expand All @@ -250,7 +241,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
Expand Down Expand Up @@ -293,11 +283,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
joinInProgress = Map(address -> (Deadline.now + JoinTimeout))

// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()

heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
publish(localGossip)

context.become(initialized)
Expand Down Expand Up @@ -517,12 +507,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer

val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress
else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address)

latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress

// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).foreach {
Expand Down Expand Up @@ -744,27 +729,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
}

def heartbeat(): Unit = {
removeOverdueJoinInProgress()

val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys

val deadline = Deadline.now + HeartbeatInterval
beatTo.foreach { address if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) }
}

/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
}

/**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {

if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available

Expand Down
88 changes: 80 additions & 8 deletions akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
Expand Up @@ -4,12 +4,14 @@
package akka.cluster

import language.postfixOps

import scala.collection.immutable.SortedSet
import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props }
import java.security.MessageDigest
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import scala.concurrent.util.duration._
import scala.concurrent.util.Deadline
import scala.concurrent.util.FiniteDuration
import akka.cluster.ClusterEvent._

/**
* Sent at regular intervals for failure detection.
Expand All @@ -19,11 +21,11 @@ case class Heartbeat(from: Address) extends ClusterMessage
/**
* INTERNAL API.
*
* Receives Heartbeat messages and delegates to Cluster.
* Receives Heartbeat messages and updates failure detector.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages.
*/
private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging {
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {

val failureDetector = Cluster(context.system).failureDetector

Expand All @@ -38,12 +40,18 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg
*/
private[cluster] object ClusterHeartbeatSender {
/**
*
* Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]]
* Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]]
* to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)

/**
* Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
* another node and heartbeats should be sent until it becomes member or deadline is overdue.
* Local only, no need to serialize.
*/
case class JoinInProgress(address: Address, deadline: Deadline)
}

/*
Expand All @@ -57,12 +65,39 @@ private[cluster] object ClusterHeartbeatSender {
*/
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._
import Member.addressOrdering
import InternalClusterAction.HeartbeatTick

val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler }
import cluster.settings._
import context.dispatcher

val selfHeartbeat = Heartbeat(selfAddress)

var nodes: SortedSet[Address] = SortedSet.empty
var joinInProgress: Map[Address, Deadline] = Map.empty

// start periodic heartbeat to other nodes in cluster
val heartbeatTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
self ! HeartbeatTick
}

override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
}

override def postStop(): Unit = {
heartbeatTask.cancel()
cluster.unsubscribe(self)
}

/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")

val digester = MessageDigest.getInstance("MD5")

Expand All @@ -76,14 +111,51 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}

def receive = {
case msg @ SendHeartbeat(from, to, deadline)
case state: CurrentClusterState init(state)
case MemberUnreachable(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) joinInProgress += (a -> d)
case HeartbeatTick heartbeat()
}

def init(state: CurrentClusterState): Unit = {
nodes = state.members.map(_.address)
joinInProgress --= nodes
}

def addMember(m: Member): Unit = {
nodes += m.address
joinInProgress -= m.address
}

def removeMember(m: Member): Unit = {
nodes -= m.address
joinInProgress -= m.address
}

def heartbeat(): Unit = {
removeOverdueJoinInProgress()

val beatTo = nodes ++ joinInProgress.keys

val deadline = Deadline.now + HeartbeatInterval
for (to beatTo; if to != selfAddress) {
val workerName = encodeChildName(to.toString)
val worker = context.actorFor(workerName) match {
case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName)
case child child
}
worker ! msg
worker ! SendHeartbeat(selfHeartbeat, to, deadline)
}
}

/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue address }
}

}
Expand Down

0 comments on commit cecde67

Please sign in to comment.