Permalink
Browse files

Merge pull request #768 from akka/wip-2587-pub-metrics-patriknw

Publish cluster metrics through the publisher actor.
  • Loading branch information...
2 parents 5ca4bdb + 49b9ec6 commit 2a7b76c0740304f7fc3bcd3115146c150b77e673 @viktorklang viktorklang committed Oct 2, 2012
@@ -116,6 +116,7 @@ private[cluster] object InternalClusterAction {
case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage
case class PublishChanges(oldGossip: Gossip, newGossip: Gossip)
+ case class PublishEvent(event: ClusterDomainEvent)
case object PublishDone
}
@@ -150,11 +151,13 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
// cause deadlock. The Cluster extension is currently being created and is waiting
// for response from GetClusterCoreRef in its constructor.
- val core = context.actorOf(Props[ClusterCoreDaemon].
+ val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
+ withDispatcher(context.props.dispatcher), name = "publisher")
+ val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "core")
- val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon].
+ context.actorOf(Props[ClusterHeartbeatDaemon].
withDispatcher(context.props.dispatcher), name = "heartbeat")
- if (settings.MetricsEnabled) context.actorOf(Props[ClusterMetricsCollector].
+ if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics")
def receive = {
@@ -166,7 +169,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
/**
* INTERNAL API.
*/
-private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
+private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
import ClusterLeaderAction._
import InternalClusterAction._
import ClusterHeartbeatSender._
@@ -189,8 +192,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props[ClusterCoreSender].
withDispatcher(UseDispatcher), name = "coreSender")
- val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
- withDispatcher(UseDispatcher), name = "publisher")
import context.dispatcher
@@ -193,6 +193,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber, to) unsubscribe(subscriber, to)
+ case PublishEvent(event) publish(event)
case PublishDone sender ! PublishDone
}
@@ -207,7 +208,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
leader = latestGossip.leader)
receiver match {
case Some(ref) ref ! state
- case None eventStream publish state
+ case None publish(state)
}
}
@@ -232,7 +233,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence
// leader changed and immediate convergence
leaderChangedState = Some(Right(x))
- eventStream publish x
+ publish(x)
case x: LeaderChanged
// publish later, when convergence
@@ -243,25 +244,25 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
leaderChangedState match {
case Some(Left(x))
leaderChangedState = Some(Right(x))
- eventStream publish x
+ publish(x)
case _ // nothing stashed
}
- eventStream publish event
+ publish(event)
case MemberUnreachable(m)
- eventStream publish event
+ publish(event)
// notify DeathWatch about unreachable node
- eventStream publish AddressTerminated(m.address)
+ publish(AddressTerminated(m.address))
case _
// all other events
- eventStream publish event
+ publish(event)
}
}
}
- def publishInternalStats(currentStats: CurrentInternalStats): Unit = {
- eventStream publish currentStats
- }
+ def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)
+
+ def publish(event: AnyRef): Unit = eventStream publish event
}
@@ -36,7 +36,7 @@ import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
*
* @author Helena Edelson
*/
-private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
+private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
import InternalClusterAction._
import ClusterEvent._
@@ -107,6 +107,7 @@ private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
def removeMember(event: MemberEvent): Unit = {
nodes -= event.member.address
latestGossip = latestGossip remove event.member.address
+ publish()
}
/**
@@ -155,7 +156,7 @@ private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
/**
* Publishes to the event stream.
*/
- def publish(): Unit = context.system.eventStream publish ClusterMetricsChanged(latestGossip.nodes)
+ def publish(): Unit = publisher ! PublishEvent(ClusterMetricsChanged(latestGossip.nodes))
}

0 comments on commit 2a7b76c

Please sign in to comment.