Permalink
Browse files

Stress / long running test of cluster, see #2786

* akka.cluster.StressSpec
* Configurable number of nodes and duration for each step
* Report metrics and phi periodically to see progress
* Configurable payload size
* Test of various join and remove scenarios
* Test of watch
* Exercise supervision
* Report cluster stats
* Test with many actors in tree structure

Apart from the test this commit also solves some issues:

* Avoid adding back members when downed in ClusterHeartbeatSender
* Avoid duplicate close of ClusterReadView
* Add back the publish of AddressTerminated when MemberDowned/Removed
  it was lost in merge of "publish on convergence", see #2779
  • Loading branch information...
1 parent 7944b45 commit f147f4d3d27ce626cec21d879666212ec5a1ed79 @patriknw patriknw committed Dec 12, 2012
View
18 akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
@@ -284,18 +284,24 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
// keep the latestGossip to be sent to new subscribers
latestGossip = newGossip
// first publish the diffUnreachable between the last two gossips
- diffUnreachable(oldGossip, newGossip) foreach { event
- publish(event)
- // notify DeathWatch about unreachable node
- publish(AddressTerminated(event.member.address))
- }
+ diffUnreachable(oldGossip, newGossip) foreach publish
// buffer up the MemberEvents waiting for convergence
memberEvents ++= diffMemberEvents(oldGossip, newGossip)
// if we have convergence then publish the MemberEvents and possibly a LeaderChanged
if (newGossip.convergence) {
val previousConvergedGossip = latestConvergedGossip
latestConvergedGossip = newGossip
- memberEvents foreach publish
+ memberEvents foreach { event
+ event match {
+ case m @ (MemberDowned(_) | MemberRemoved(_))
+ // TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788
+ // but right now we don't change Downed to Removed
+ publish(event)
+ // notify DeathWatch about downed node
+ publish(AddressTerminated(m.member.address))
+ case _ publish(event)
+ }
+ }
memberEvents = immutable.Seq.empty
diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish
}
View
1 akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
@@ -116,6 +116,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case HeartbeatTick heartbeat()
case s: CurrentClusterState reset(s)
case UnreachableMember(m) removeMember(m)
+ case MemberDowned(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) addJoinInProgress(a, d)
View
1 akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala
@@ -140,7 +140,6 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
* Unregisters the cluster JMX MBean from MBean server.
*/
def unregisterMBean(): Unit = {
- clusterView.close()
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
View
4 akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala
@@ -52,7 +52,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
cluster.down(thirdAddress)
enterBarrier("down-third-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
@@ -63,7 +63,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
runOn(second, fourth) {
enterBarrier("down-third-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
}
enterBarrier("await-completion")
View
4 akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala
@@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT
markNodeAsUnavailable(thirdAddress)
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
@@ -61,7 +61,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT
runOn(second, fourth) {
enterBarrier("down-third-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
}
enterBarrier("await-completion")
View
8 akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala
@@ -60,7 +60,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds)
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds)
}
runOn(fourth) {
@@ -70,7 +70,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
runOn(second, third) {
enterBarrier("down-fourth-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds)
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds)
}
enterBarrier("await-completion-1")
@@ -90,7 +90,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
- awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30.seconds)
+ awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds)
}
runOn(second) {
@@ -100,7 +100,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
runOn(third) {
enterBarrier("down-second-node")
- awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30 seconds)
+ awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30 seconds)
}
enterBarrier("await-completion-2")
View
2 akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala
@@ -115,7 +115,7 @@ abstract class MBeanSpec
enterBarrier("fourth-down")
runOn(first, second, third) {
- awaitUpConvergence(3, canNotBePartOfMemberRing = List(fourthAddress))
+ awaitUpConvergence(3, canNotBePartOfMemberRing = Set(fourthAddress))
assertMembers(clusterView.members, first, second, third)
}
View
8 akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
@@ -218,17 +218,17 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
*/
def awaitUpConvergence(
numberOfMembers: Int,
- canNotBePartOfMemberRing: immutable.Seq[Address] = Nil,
+ canNotBePartOfMemberRing: Set[Address] = Set.empty,
timeout: FiniteDuration = 20.seconds): Unit = {
within(timeout) {
+ if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
+ awaitCond(
+ canNotBePartOfMemberRing forall (address !(clusterView.members exists (_.address == address))))
awaitCond(clusterView.members.size == numberOfMembers)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
// clusterView.leader is updated by LeaderChanged, await that to be updated also
val expectedLeader = clusterView.members.headOption.map(_.address)
awaitCond(clusterView.leader == expectedLeader)
- if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
- awaitCond(
- canNotBePartOfMemberRing forall (address !(clusterView.members exists (_.address == address))))
}
}
View
2 akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala
@@ -66,7 +66,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo
markNodeAsUnavailable(secondAddress)
- awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = List(secondAddress), 30.seconds)
+ awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds)
clusterView.isSingletonCluster must be(true)
awaitCond(clusterView.isLeader)
}
View
4 akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala
@@ -102,15 +102,15 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
// auto-down = on
awaitCond(clusterView.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address)
- awaitUpConvergence(side1.size, side2 map address)
+ awaitUpConvergence(side1.size, side2.toSet map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
// auto-down = on
awaitCond(clusterView.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address)
- awaitUpConvergence(side2.size, side1 map address)
+ awaitUpConvergence(side2.size, side1.toSet map address)
assertLeader(side2: _*)
}
View
1,082 akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala
@@ -0,0 +1,1082 @@
+/**
+ * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
+ */
+package akka.cluster
+
+import language.postfixOps
+import scala.annotation.tailrec
+import scala.collection.immutable
+import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ThreadLocalRandom
+import org.scalatest.BeforeAndAfterEach
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.Address
+import akka.actor.Deploy
+import akka.actor.OneForOneStrategy
+import akka.actor.Props
+import akka.actor.RootActorPath
+import akka.actor.SupervisorStrategy._
+import akka.actor.Terminated
+import akka.cluster.ClusterEvent.ClusterMetricsChanged
+import akka.cluster.ClusterEvent.CurrentClusterState
+import akka.cluster.ClusterEvent.MemberEvent
+import akka.cluster.StandardMetrics.Cpu
+import akka.cluster.StandardMetrics.HeapMemory
+import akka.remote.RemoteScope
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.routing.FromConfig
+import akka.testkit._
+import akka.testkit.TestEvent._
+
+/**
+ * This test is intended to be used as long running stress test
+ * of cluster related features. Number of nodes and duration of
+ * the test steps can be configured. The test scenario is organized as
+ * follows:
+ * 1. join nodes in various ways up to the configured total number of nodes
+ * 2 while nodes are joining a few cluster aware routers are also working
+ * 3. exercise concurrent joining and shutdown of nodes repeatedly
+ * 4. exercise cluster aware routers, including high throughput
+ * 5. exercise many actors in a tree structure
+ * 6. exercise remote supervision
+ * 7. leave and shutdown nodes in various ways
+ * 8. while nodes are removed remote death watch is also exercised
+ * 9. while nodes are removed a few cluster aware routers are also working
+ */
+object StressMultiJvmSpec extends MultiNodeConfig {
+
+ // Note that this test uses default configuration,
+ // not MultiNodeClusterSpec.clusterConfig
+ commonConfig(ConfigFactory.parseString("""
+ akka.test.cluster-stress-spec {
+ # scale the nr-of-nodes* settings with this factor
+ nr-of-nodes-factor = 1
+ nr-of-nodes = 13
+ # not scaled
+ nr-of-seed-nodes = 3
+ nr-of-nodes-joining-to-seed-initally = 2
+ nr-of-nodes-joining-one-by-one-small = 2
+ nr-of-nodes-joining-one-by-one-large = 2
+ nr-of-nodes-joining-to-one = 2
+ nr-of-nodes-joining-to-seed = 2
+ nr-of-nodes-leaving-one-by-one-small = 1
+ nr-of-nodes-leaving-one-by-one-large = 2
+ nr-of-nodes-leaving = 2
+ nr-of-nodes-shutdown-one-by-one-small = 1
+ nr-of-nodes-shutdown-one-by-one-large = 2
+ nr-of-nodes-shutdown = 2
+ nr-of-nodes-join-remove = 2
+ # not scaled
+ join-remove-duration = 90s
+ work-batch-size = 100
+ work-batch-interval = 2s
+ payload-size = 1000
+ # scale the *-duration settings with this factor
+ duration-factor = 1
+ normal-throughput-duration = 30s
+ high-throughput-duration = 10s
+ supervision-duration = 10s
+ # actors are created in a tree structure defined
+ # by tree-width (number of children for each actor) and
+ # tree-levels, total number of actors can be calculated by
+ # (width * math.pow(width, levels) - 1) / (width - 1)
+ tree-width = 5
+ tree-levels = 4
+ report-metrics-interval = 10s
+ # scale convergence within timeouts with this factor
+ convergence-within-factor = 1.0
+ }
+
+ akka.actor.provider = akka.cluster.ClusterActorRefProvider
+ akka.cluster {
+ auto-join = off
+ auto-down = on
+ publish-stats-interval = 0 s # always, when it happens
+ }
+ akka.event-handlers = ["akka.testkit.TestEventListener"]
+ akka.loglevel = INFO
+ akka.remote.log-remote-lifecycle-events = off
+
+ akka.actor.deployment {
+ /master-node-1/workers {
+ router = round-robin
+ nr-of-instances = 100
+ cluster {
+ enabled = on
+ max-nr-of-instances-per-node = 1
+ allow-local-routees = off
+ }
+ }
+ /master-node-2/workers {
+ router = round-robin
+ nr-of-instances = 100
+ cluster {
+ enabled = on
+ routees-path = "/user/worker"
+ allow-local-routees = off
+ }
+ }
+ /master-node-3/workers = {
+ router = adaptive
+ nr-of-instances = 100
+ cluster {
+ enabled = on
+ max-nr-of-instances-per-node = 1
+ allow-local-routees = off
+ }
+ }
+ }
+ """))
+
+ class Settings(conf: Config) {
+ private val testConfig = conf.getConfig("akka.test.cluster-stress-spec")
+ import testConfig._
+
+ private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS)
+
+ val nFactor = getInt("nr-of-nodes-factor")
+ val totalNumberOfNodes = getInt("nr-of-nodes") * nFactor ensuring (
+ _ >= 10, "nr-of-nodes must be >= 10")
+ val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor
+ val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor
+ val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor
+ val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor
+ val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor
+ val numberOfNodesJoiningToSeedNodes = getInt("nr-of-nodes-joining-to-seed") * nFactor
+ val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor
+ val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor
+ val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor
+ val numberOfNodesShutdownOneByOneSmall = getInt("nr-of-nodes-shutdown-one-by-one-small") * nFactor
+ val numberOfNodesShutdownOneByOneLarge = getInt("nr-of-nodes-shutdown-one-by-one-large") * nFactor
+ val numberOfNodesShutdown = getInt("nr-of-nodes-shutdown") * nFactor
+ val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor
+
+ val workBatchSize = getInt("work-batch-size")
+ val workBatchInterval = Duration(getMilliseconds("work-batch-interval"), MILLISECONDS)
+ val payloadSize = getInt("payload-size")
+ val dFactor = getInt("duration-factor")
+ val joinRemoveDuration = getDuration("join-remove-duration") * dFactor
+ val normalThroughputDuration = getDuration("normal-throughput-duration") * dFactor
+ val highThroughputDuration = getDuration("high-throughput-duration") * dFactor
+ val supervisionDuration = getDuration("supervision-duration") * dFactor
+ val treeWidth = getInt("tree-width")
+ val treeLevels = getInt("tree-levels")
+ val reportMetricsInterval = getDuration("report-metrics-interval")
+ val convergenceWithinFactor = getDouble("convergence-within-factor")
+
+ require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall +
+ numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes,
+ s"specified number of joining nodes <= ${totalNumberOfNodes}")
+
+ // don't shutdown the 3 nodes hosting the master actors
+ require(numberOfNodesLeavingOneByOneSmall + numberOfNodesLeavingOneByOneLarge + numberOfNodesLeaving +
+ numberOfNodesShutdownOneByOneSmall + numberOfNodesShutdownOneByOneLarge + numberOfNodesShutdown <= totalNumberOfNodes - 3,
+ s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}")
+
+ require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove must be <= ${totalNumberOfNodes}")
+ }
+
+ // FIXME configurable number of nodes
+ for (n 1 to 13) role("node-" + n)
+
+ implicit class FormattedDouble(val d: Double) extends AnyVal {
+ def form: String = d.formatted("%.2f")
+ }
+
+ case class ClusterResult(
+ address: Address,
+ duration: Duration,
+ clusterStats: ClusterStats)
+
+ case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: ClusterStats)
+
+ /**
+ * Central aggregator of cluster statistics and metrics.
+ * Reports the result via log periodically and when all
+ * expected results has been collected. It shuts down
+ * itself when expected results has been collected.
+ */
+ class ClusterResultAggregator(title: String, expectedResults: Int, reportMetricsInterval: FiniteDuration) extends Actor with ActorLogging {
+ val cluster = Cluster(context.system)
+ var reportTo: Option[ActorRef] = None
+ var results = Vector.empty[ClusterResult]
+ var nodeMetrics = Set.empty[NodeMetrics]
+ var phiValuesObservedByNode = {
+ import akka.cluster.Member.addressOrdering
+ immutable.SortedMap.empty[Address, Set[PhiValue]]
+ }
+
+ import context.dispatcher
+ val reportMetricsTask = context.system.scheduler.schedule(
+ reportMetricsInterval, reportMetricsInterval, self, ReportTick)
+
+ // subscribe to ClusterMetricsChanged, re-subscribe when restart
+ override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
+ override def postStop(): Unit = {
+ cluster.unsubscribe(self)
+ reportMetricsTask.cancel()
+ super.postStop()
+ }
+
+ def receive = {
+ case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics
+ case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues
+ case ReportTick
+ log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}")
+ case r: ClusterResult
+ results :+= r
+ if (results.size == expectedResults) {
+ val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats)
+ log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}")
+ reportTo foreach { _ ! aggregated }
+ context stop self
+ }
+ case _: CurrentClusterState
+ case ReportTo(ref) reportTo = ref
+ }
+
+ def maxDuration = results.map(_.duration).max
+
+ def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()) { (acc, s)
+ ClusterStats(
+ receivedGossipCount = acc.receivedGossipCount + s.receivedGossipCount,
+ mergeConflictCount = acc.mergeConflictCount + s.mergeConflictCount,
+ mergeCount = acc.mergeCount + s.mergeCount,
+ mergeDetectedCount = acc.mergeDetectedCount + s.mergeDetectedCount)
+ }
+
+ def formatMetrics: String = {
+ import akka.cluster.Member.addressOrdering
+ (formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n")
+ }
+
+ def formatMetricsHeader: String = "Node\tHeap (MB)\tCPU (%)\tLoad"
+
+ def formatMetricsLine(nodeMetrics: NodeMetrics): String = {
+ val heap = nodeMetrics match {
+ case HeapMemory(address, timestamp, used, committed, max)
+ (used.doubleValue / 1024 / 1024).form
+ case _ ""
+ }
+ val cpuAndLoad = nodeMetrics match {
+ case Cpu(address, timestamp, loadOption, cpuOption, processors)
+ format(cpuOption) + "\t" + format(loadOption)
+ case _ "N/A\tN/A"
+ }
+ s"${nodeMetrics.address}\t${heap}\t${cpuAndLoad}"
+ }
+
+ def format(opt: Option[Double]) = opt match {
+ case None "N/A"
+ case Some(x) x.form
+ }
+
+ def formatPhi: String = {
+ if (phiValuesObservedByNode.isEmpty) ""
+ else {
+ import akka.cluster.Member.addressOrdering
+ val lines =
+ for {
+ (monitor, phiValues) phiValuesObservedByNode.toSeq
+ phi phiValues.toSeq.sortBy(_.address)
+ } yield formatPhiLine(monitor, phi.address, phi)
+
+ (formatPhiHeader +: lines).mkString("\n")
+ }
+ }
+
+ def formatPhiHeader: String = "Monitor\tSubject\tcount\tcount phi > 1.0\tmax phi"
+
+ def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
+ s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
+
+ }
+
+ /**
+ * Keeps cluster statistics and metrics reported by
+ * ClusterResultAggregator. Logs the list of historical
+ * results when a new AggregatedClusterResult is received.
+ */
+ class ClusterResultHistory extends Actor with ActorLogging {
+ var history = Vector.empty[AggregatedClusterResult]
+
+ def receive = {
+ case result: AggregatedClusterResult
+ history :+= result
+ log.info("Cluster result history\n" + formatHistory)
+ }
+
+ def formatHistory: String =
+ (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n")
+
+ def formatHistoryHeader: String = "title\tduration (ms)\tgossip count\tmerge count"
+
+ def formatHistoryLine(result: AggregatedClusterResult): String =
+ s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats.receivedGossipCount}\t${result.clusterStats.mergeCount}"
+
+ }
+
+ /**
+ * Collect phi values of the failure detector and report to the
+ * central ClusterResultAggregator.
+ */
+ class PhiObserver extends Actor with ActorLogging {
+ val cluster = Cluster(context.system)
+ val fd = cluster.failureDetector.asInstanceOf[AccrualFailureDetector]
+ var reportTo: Option[ActorRef] = None
+ val emptyPhiByNode = Map.empty[Address, PhiValue].withDefault(address PhiValue(address, 0, 0, 0.0))
+ var phiByNode = emptyPhiByNode
+ var nodes = Set.empty[Address]
+
+ import context.dispatcher
+ val checkPhiTask = context.system.scheduler.schedule(
+ 1.second, 1.second, self, PhiTick)
+
+ // subscribe to MemberEvent, re-subscribe when restart
+ override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
+ override def postStop(): Unit = {
+ cluster.unsubscribe(self)
+ checkPhiTask.cancel()
+ super.postStop()
+ }
+
+ def receive = {
+ case PhiTick
+ nodes foreach { node
+ val previous = phiByNode(node)
+ val φ = fd.phi(node)
+ if> 0) {
+ val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0
+ phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1,
+ math.max(previous.max, φ))
+ }
+ }
+ reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiByNode.values.toSet) }
+ case state: CurrentClusterState nodes = state.members.map(_.address)
+ case memberEvent: MemberEvent nodes += memberEvent.member.address
+ case ReportTo(ref) reportTo = ref
+ case Reset
+ phiByNode = emptyPhiByNode
+ nodes = Set.empty[Address]
+ cluster.unsubscribe(self)
+ cluster.subscribe(self, classOf[MemberEvent])
+
+ }
+ }
+
+ /**
+ * Master of routers
+ *
+ * Flow control, to not flood the consumers, is handled by scheduling a
+ * batch of messages to be sent to the router when half of the number
+ * of outstanding messages remains.
+ *
+ * It uses a simple message retry mechanism. If an ack of a sent message
+ * is not received within a timeout, that message will be resent to the router,
+ * infinite number of times.
+ *
+ * When it receives the `End` command it will stop sending messages to the router,
+ * resends continuous, until all outstanding acks have been received, and then
+ * finally it replies with `WorkResult` to the sender of the `End` command, and stops
+ * itself.
+ */
+ class Master(settings: StressMultiJvmSpec.Settings, batchInterval: FiniteDuration, tree: Boolean) extends Actor {
+ val workers = context.actorOf(Props[Worker].withRouter(FromConfig), "workers")
+ val payload = Array.fill(settings.payloadSize)(ThreadLocalRandom.current.nextInt(127).toByte)
+ val retryTimeout = 5.seconds.dilated(context.system)
+ val idCounter = Iterator from 0
+ var sendCounter = 0L
+ var ackCounter = 0L
+ var outstanding = Map.empty[JobId, JobState]
+ var startTime = 0L
+
+ import context.dispatcher
+ val resendTask = context.system.scheduler.schedule(3.seconds, 3.seconds, self, RetryTick)
+
+ override def postStop(): Unit = {
+ resendTask.cancel()
+ super.postStop()
+ }
+
+ def receive = {
+ case Begin
+ startTime = System.nanoTime
+ self ! SendBatch
+ context.become(working)
+ case RetryTick
+ }
+
+ def working: Receive = {
+ case Ack(id)
+ outstanding -= id
+ ackCounter += 1
+ if (outstanding.size == settings.workBatchSize / 2)
+ if (batchInterval == Duration.Zero) self ! SendBatch
+ else context.system.scheduler.scheduleOnce(batchInterval, self, SendBatch)
+ case SendBatch sendJobs()
+ case RetryTick resend()
+ case End
+ done(sender)
+ context.become(ending(sender))
+ }
+
+ def ending(replyTo: ActorRef): Receive = {
+ case Ack(id)
+ outstanding -= id
+ ackCounter += 1
+ done(replyTo)
+ case SendBatch
+ case RetryTick resend()
+ }
+
+ def done(replyTo: ActorRef): Unit = if (outstanding.isEmpty) {
+ val duration = (System.nanoTime - startTime).nanos
+ replyTo ! WorkResult(duration, sendCounter, ackCounter)
+ context stop self
+ }
+
+ def sendJobs(): Unit = {
+ 0 until settings.workBatchSize foreach { _
+ send(createJob())
+ }
+ }
+
+ def createJob(): Job = {
+ if (tree) TreeJob(idCounter.next(), payload, ThreadLocalRandom.current.nextInt(settings.treeWidth),
+ settings.treeLevels, settings.treeWidth)
+ else SimpleJob(idCounter.next(), payload)
+ }
+
+ def resend(): Unit = {
+ outstanding.values foreach { jobState
+ if (jobState.deadline.isOverdue)
+ send(jobState.job)
+ }
+ }
+
+ def send(job: Job): Unit = {
+ outstanding += job.id -> JobState(Deadline.now + retryTimeout, job)
+ sendCounter += 1
+ workers ! job
+ }
+ }
+
+ /**
+ * Used by Master as routee
+ */
+ class Worker extends Actor with ActorLogging {
+ def receive = {
+ case SimpleJob(id, payload) sender ! Ack(id)
+ case TreeJob(id, payload, idx, levels, width)
+ // create the actors when first TreeJob message is received
+ val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt
+ log.info("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children",
+ totalActors, levels, width)
+ val tree = context.actorOf(Props(new TreeNode(levels, width)), "tree")
+ tree forward (idx, SimpleJob(id, payload))
+ context.become(treeWorker(tree))
+ }
+
+ def treeWorker(tree: ActorRef): Receive = {
+ case SimpleJob(id, payload) sender ! Ack(id)
+ case TreeJob(id, payload, idx, _, _)
+ tree forward (idx, SimpleJob(id, payload))
+ }
+ }
+
+ class TreeNode(level: Int, width: Int) extends Actor {
+ require(level >= 1)
+ def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width)
+ val indexedChildren =
+ 0 until width map { i context.actorOf(Props(createChild()), name = i.toString) } toVector
+
+ def receive = {
+ case (idx: Int, job: SimpleJob) if idx < width indexedChildren(idx) forward (idx, job)
+ }
+ }
+
+ class Leaf extends Actor {
+ def receive = {
+ case (_: Int, job: SimpleJob) sender ! Ack(job.id)
+ }
+ }
+
+ /**
+ * Used for remote death watch testing
+ */
+ class Watchee extends Actor {
+ def receive = Actor.emptyBehavior
+ }
+
+ /**
+ * Used for remote supervision testing
+ */
+ class Supervisor extends Actor {
+
+ var restartCount = 0
+
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) {
+ case _: Exception
+ restartCount += 1
+ Restart
+ }
+
+ def receive = {
+ case props: Props context.actorOf(props)
+ case e: Exception context.children foreach { _ ! e }
+ case GetChildrenCount sender ! ChildrenCount(context.children.size, restartCount)
+ case Reset
+ require(context.children.isEmpty,
+ s"ResetChildrenCount not allowed when children exists, [${context.children.size}]")
+ restartCount = 0
+ }
+ }
+
+ /**
+ * Child of Supervisor for remote supervision testing
+ */
+ class RemoteChild extends Actor {
+ def receive = {
+ case e: Exception throw e
+ }
+ }
+
+ case object Begin
+ case object End
+ case object RetryTick
+ case object ReportTick
+ case object PhiTick
+ case class PhiResult(from: Address, phiValues: Set[PhiValue])
+ case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double)
+ case class ReportTo(ref: Option[ActorRef])
+
+ type JobId = Int
+ trait Job { def id: JobId }
+ case class SimpleJob(id: JobId, payload: Any) extends Job
+ case class TreeJob(id: JobId, payload: Any, idx: Int, levels: Int, width: Int) extends Job
+ case class Ack(id: JobId)
+ case class JobState(deadline: Deadline, job: Job)
+ case class WorkResult(duration: Duration, sendCount: Long, ackCount: Long) {
+ def retryCount: Long = sendCount - ackCount
+ def jobsPerSecond: Double = ackCount * 1000.0 / duration.toMillis
+ }
+ case object SendBatch
+ case class CreateTree(levels: Int, width: Int)
+
+ case object GetChildrenCount
+ case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int)
+ case object Reset
+
+}
+
+class StressMultiJvmNode1 extends StressSpec
+class StressMultiJvmNode2 extends StressSpec
+class StressMultiJvmNode3 extends StressSpec
+class StressMultiJvmNode4 extends StressSpec
+class StressMultiJvmNode5 extends StressSpec
+class StressMultiJvmNode6 extends StressSpec
+class StressMultiJvmNode7 extends StressSpec
+class StressMultiJvmNode8 extends StressSpec
+class StressMultiJvmNode9 extends StressSpec
+class StressMultiJvmNode10 extends StressSpec
+class StressMultiJvmNode11 extends StressSpec
+class StressMultiJvmNode12 extends StressSpec
+class StressMultiJvmNode13 extends StressSpec
+
+abstract class StressSpec
+ extends MultiNodeSpec(StressMultiJvmSpec)
+ with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender {
+
+ import StressMultiJvmSpec._
+ import ClusterEvent._
+
+ val settings = new Settings(system.settings.config)
+ import settings._
+
+ var step = 0
+ var nbrUsedRoles = 0
+
+ override def beforeEach(): Unit = { step += 1 }
+
+ override def muteLog(sys: ActorSystem = system): Unit = {
+ super.muteLog(sys)
+ sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
+ sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*")))
+ sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*")))
+ }
+
+ val seedNodes = roles.take(numberOfSeedNodes)
+
+ override def cluster: Cluster = {
+ createWorker
+ super.cluster
+ }
+
+ // always create one worker when the cluster is started
+ lazy val createWorker: Unit =
+ system.actorOf(Props[Worker], "worker")
+
+ def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = {
+ runOn(roles.head) {
+ val aggregator = system.actorOf(Props(new ClusterResultAggregator(title, expectedResults, reportMetricsInterval)),
+ name = "result" + step)
+ if (includeInHistory) aggregator ! ReportTo(Some(clusterResultHistory))
+ else aggregator ! ReportTo(None)
+ }
+ enterBarrier("result-aggregator-created-" + step)
+ runOn(roles.take(nbrUsedRoles): _*) {
+ phiObserver ! ReportTo(Some(clusterResultAggregator))
+ }
+ }
+
+ def clusterResultAggregator: ActorRef = system.actorFor(node(roles.head) / "user" / ("result" + step))
+
+ lazy val clusterResultHistory = system.actorOf(Props[ClusterResultHistory], "resultHistory")
+
+ lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver")
+
+ def awaitClusterResult: Unit = {
+ runOn(roles.head) {
+ val r = clusterResultAggregator
+ watch(r)
+ expectMsgPF(remaining) { case Terminated(`r`) true }
+ }
+ enterBarrier("cluster-result-done-" + step)
+ }
+
+ def joinOneByOne(numberOfNodes: Int): Unit = {
+ 0 until numberOfNodes foreach { _
+ joinOne()
+ nbrUsedRoles += 1
+ step += 1
+ }
+ }
+
+ def convergenceWithin(base: FiniteDuration, nodes: Int): FiniteDuration =
+ (base.toMillis * convergenceWithinFactor * nodes).millis
+
+ def joinOne(): Unit = within(5.seconds + convergenceWithin(2.seconds, nbrUsedRoles + 1)) {
+ val currentRoles = roles.take(nbrUsedRoles + 1)
+ val title = s"join one to ${nbrUsedRoles} nodes cluster"
+ createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
+ runOn(currentRoles: _*) {
+ reportResult {
+ runOn(currentRoles.last) {
+ cluster.join(roles.head)
+ }
+ awaitUpConvergence(currentRoles.size, timeout = remaining)
+ }
+
+ }
+ awaitClusterResult
+ enterBarrier("join-one-" + step)
+ }
+
+ def joinSeveral(numberOfNodes: Int, toSeedNodes: Boolean): Unit =
+ within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles + numberOfNodes)) {
+ val currentRoles = roles.take(nbrUsedRoles + numberOfNodes)
+ val joiningRoles = currentRoles.takeRight(numberOfNodes)
+ val title = s"join ${numberOfNodes} to ${if (toSeedNodes) "seed nodes" else "one node"}, in ${nbrUsedRoles} nodes cluster"
+ createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
+ runOn(currentRoles: _*) {
+ reportResult {
+ runOn(joiningRoles: _*) {
+ if (toSeedNodes) cluster.joinSeedNodes(seedNodes.toIndexedSeq map address)
+ else cluster.join(roles.head)
+ }
+ awaitUpConvergence(currentRoles.size, timeout = remaining)
+ }
+
+ }
+ awaitClusterResult
+ enterBarrier("join-several-" + step)
+ }
+
+ def removeOneByOne(numberOfNodes: Int, shutdown: Boolean): Unit = {
+ 0 until numberOfNodes foreach { _
+ removeOne(shutdown)
+ nbrUsedRoles -= 1
+ step += 1
+ }
+ }
+
+ def removeOne(shutdown: Boolean): Unit = within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) {
+ val currentRoles = roles.take(nbrUsedRoles - 1)
+ val title = s"${if (shutdown) "shutdown" else "remove"} one from ${nbrUsedRoles} nodes cluster"
+ createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
+ val removeRole = roles(nbrUsedRoles - 1)
+ val removeAddress = address(removeRole)
+ runOn(removeRole) {
+ system.actorOf(Props[Watchee], "watchee")
+ if (!shutdown) cluster.leave(myself)
+ }
+ enterBarrier("watchee-created-" + step)
+ runOn(roles.head) {
+ watch(system.actorFor(node(removeRole) / "user" / "watchee"))
+ }
+ enterBarrier("watch-estabilished-" + step)
+
+ runOn(currentRoles: _*) {
+ reportResult {
+ runOn(roles.head) {
+ if (shutdown) {
+ log.info("Shutting down [{}]", removeAddress)
+ testConductor.shutdown(removeRole, 0).await
+ }
+ }
+ awaitUpConvergence(currentRoles.size, timeout = remaining)
+ }
+ }
+
+ runOn(roles.head) {
+ val expectedRef = system.actorFor(RootActorPath(removeAddress) / "user" / "watchee")
+ expectMsgPF(remaining) {
+ case Terminated(`expectedRef`) true
+ }
+ }
+ enterBarrier("watch-verified-" + step)
+
+ awaitClusterResult
+ enterBarrier("remove-one-" + step)
+ }
+
+ def removeSeveral(numberOfNodes: Int, shutdown: Boolean): Unit =
+ within(10.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) {
+ val currentRoles = roles.take(nbrUsedRoles - numberOfNodes)
+ val removeRoles = roles.slice(currentRoles.size, nbrUsedRoles)
+ val title = s"${if (shutdown) "shutdown" else "leave"} ${numberOfNodes} in ${nbrUsedRoles} nodes cluster"
+ createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
+ runOn(removeRoles: _*) {
+ if (!shutdown) cluster.leave(myself)
+ }
+ runOn(currentRoles: _*) {
+ reportResult {
+ runOn(roles.head) {
+ if (shutdown) removeRoles.foreach { r
+ log.info("Shutting down [{}]", address(r))
+ testConductor.shutdown(r, 0).await
+ }
+ }
+ awaitUpConvergence(currentRoles.size, timeout = remaining)
+ }
+ }
+ awaitClusterResult
+ enterBarrier("remove-several-" + step)
+ }
+
+ def reportResult[T](thunk: T): T = {
+ val startTime = System.nanoTime
+ val startStats = clusterView.latestStats
+
+ val returnValue = thunk
+
+ val duration = (System.nanoTime - startTime).nanos
+ val latestStats = clusterView.latestStats
+ val clusterStats = ClusterStats(
+ receivedGossipCount = latestStats.receivedGossipCount - startStats.receivedGossipCount,
+ mergeConflictCount = latestStats.mergeConflictCount - startStats.mergeConflictCount,
+ mergeCount = latestStats.mergeCount - startStats.mergeCount,
+ mergeDetectedCount = latestStats.mergeDetectedCount - startStats.mergeDetectedCount)
+ clusterResultAggregator ! ClusterResult(cluster.selfAddress, duration, clusterStats)
+ returnValue
+ }
+
+ def exerciseJoinRemove(title: String, duration: FiniteDuration): Unit = {
+ val activeRoles = roles.take(numberOfNodesJoinRemove)
+ val loopDuration = 10.seconds + convergenceWithin(4.seconds, nbrUsedRoles + activeRoles.size)
+ val deadline = Deadline.now + duration - loopDuration
+ val usedRoles = roles.take(nbrUsedRoles)
+ val usedAddresses = usedRoles.map(address(_)).toSet
+
+ @tailrec def loop(counter: Int, previousAS: Option[ActorSystem], allPreviousAddresses: Set[Address]): Option[ActorSystem] = {
+ if (deadline.isOverdue) previousAS
+ else {
+ val t = title + " round " + counter
+ runOn(usedRoles: _*) {
+ phiObserver ! Reset
+ }
+ createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true)
+ val (nextAS, nextAddresses) = within(loopDuration) {
+ reportResult {
+ val nextAS =
+ if (activeRoles contains myself) {
+ previousAS foreach { _.shutdown() }
+ val sys = ActorSystem(system.name, system.settings.config)
+ muteLog(sys)
+ Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq map address)
+ Some(sys)
+ } else previousAS
+ runOn(usedRoles: _*) {
+ awaitUpConvergence(
+ nbrUsedRoles + activeRoles.size,
+ canNotBePartOfMemberRing = allPreviousAddresses,
+ timeout = remaining)
+ }
+ val nextAddresses = clusterView.members.map(_.address) -- usedAddresses
+ runOn(usedRoles: _*) {
+ nextAddresses.size must be(numberOfNodesJoinRemove)
+ }
+
+ enterBarrier("join-remove-" + step)
+ (nextAS, nextAddresses)
+ }
+ }
+ awaitClusterResult
+
+ step += 1
+ loop(counter + 1, nextAS, nextAddresses)
+
+ }
+ }
+
+ loop(1, None, Set.empty) foreach { _.shutdown }
+ within(loopDuration) {
+ runOn(usedRoles: _*) {
+ awaitUpConvergence(nbrUsedRoles, timeout = remaining)
+ phiObserver ! Reset
+ }
+ }
+ enterBarrier("join-remove-shutdown-" + step)
+
+ }
+
+ def master: ActorRef = system.actorFor("/user/master-" + myself.name)
+
+ def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration,
+ expectDroppedMessages: Boolean, tree: Boolean): Unit =
+ within(duration + 10.seconds) {
+ createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
+
+ val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
+ runOn(masterRoles: _*) {
+ reportResult {
+ val m = system.actorOf(Props(new Master(settings, batchInterval, tree)),
+ name = "master-" + myself.name)
+ m ! Begin
+ import system.dispatcher
+ system.scheduler.scheduleOnce(highThroughputDuration) {
+ m.tell(End, testActor)
+ }
+ val workResult = awaitWorkResult
+ workResult.sendCount must be > (0L)
+ workResult.ackCount must be > (0L)
+ if (!expectDroppedMessages)
+ workResult.retryCount must be(0)
+
+ enterBarrier("routers-done-" + step)
+ }
+ }
+ runOn(otherRoles: _*) {
+ reportResult {
+ enterBarrier("routers-done-" + step)
+ }
+ }
+
+ awaitClusterResult
+ }
+
+ def awaitWorkResult: WorkResult = {
+ val m = master
+ val workResult = expectMsgType[WorkResult]
+ log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", m.path.name,
+ workResult.jobsPerSecond.form,
+ workResult.retryCount, workResult.sendCount)
+ watch(m)
+ expectMsgPF(remaining) { case Terminated(`m`) true }
+ workResult
+ }
+
+ def exerciseSupervision(title: String, duration: FiniteDuration): Unit =
+ within(duration + 10.seconds) {
+ val supervisor = system.actorOf(Props[Supervisor], "supervisor")
+ while (remaining > 10.seconds) {
+ createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
+
+ reportResult {
+ roles.take(nbrUsedRoles) foreach { r
+ supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r))))
+ }
+ supervisor ! GetChildrenCount
+ expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0))
+
+ 1 to 5 foreach { _ supervisor ! new RuntimeException("Simulated exception") }
+ awaitCond {
+ supervisor ! GetChildrenCount
+ val c = expectMsgType[ChildrenCount]
+ c == ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles)
+ }
+
+ // after 5 restart attempts the children should be stopped
+ supervisor ! new RuntimeException("Simulated exception")
+ awaitCond {
+ supervisor ! GetChildrenCount
+ val c = expectMsgType[ChildrenCount]
+ // zero children
+ c == ChildrenCount(0, 6 * nbrUsedRoles)
+ }
+ supervisor ! Reset
+
+ }
+
+ awaitClusterResult
+ }
+ }
+
+ "A cluster under stress" must {
+
+ "join seed nodes" taggedAs LongRunningTest in {
+
+ val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
+ val size = seedNodes.size + otherNodesJoiningSeedNodes.size
+
+ createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true)
+
+ runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) {
+ reportResult {
+ cluster.joinSeedNodes(seedNodes.toIndexedSeq map address)
+ awaitUpConvergence(size)
+ }
+ }
+
+ awaitClusterResult
+
+ nbrUsedRoles += size
+ enterBarrier("after-" + step)
+ }
+
+ "start routers that are running while nodes are joining" taggedAs LongRunningTest in {
+ runOn(roles.take(3): _*) {
+ system.actorOf(Props(new Master(settings, settings.workBatchInterval, tree = false)),
+ name = "master-" + myself.name) ! Begin
+ }
+ enterBarrier("after-" + step)
+ }
+
+ "join nodes one-by-one to small cluster" taggedAs LongRunningTest in {
+ joinOneByOne(numberOfNodesJoiningOneByOneSmall)
+ enterBarrier("after-" + step)
+ }
+
+ "join several nodes to one node" taggedAs LongRunningTest in {
+ joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false)
+ nbrUsedRoles += numberOfNodesJoiningToOneNode
+ enterBarrier("after-" + step)
+ }
+
+ "join several nodes to seed nodes" taggedAs LongRunningTest in {
+ joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = true)
+ nbrUsedRoles += numberOfNodesJoiningToSeedNodes
+ enterBarrier("after-" + step)
+ }
+
+ "join nodes one-by-one to large cluster" taggedAs LongRunningTest in {
+ joinOneByOne(numberOfNodesJoiningOneByOneLarge)
+ enterBarrier("after-" + step)
+ }
+
+ "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
+ runOn(roles.take(3): _*) {
+ val m = master
+ m.tell(End, testActor)
+ val workResult = awaitWorkResult
+ workResult.retryCount must be(0)
+ workResult.sendCount must be > (0L)
+ workResult.ackCount must be > (0L)
+ }
+ enterBarrier("after-" + step)
+ }
+
+ "use routers with normal throughput" taggedAs LongRunningTest in {
+ exerciseRouters("use routers with normal throughput", normalThroughputDuration,
+ batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false)
+ enterBarrier("after-" + step)
+ }
+
+ "use routers with high throughput" taggedAs LongRunningTest in {
+ exerciseRouters("use routers with high throughput", highThroughputDuration,
+ batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false)
+ enterBarrier("after-" + step)
+ }
+
+ "use many actors with normal throughput" taggedAs LongRunningTest in {
+ exerciseRouters("use many actors with normal throughput", normalThroughputDuration,
+ batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true)
+ enterBarrier("after-" + step)
+ }
+
+ "use many actors with high throughput" taggedAs LongRunningTest in {
+ exerciseRouters("use many actors with high throughput", highThroughputDuration,
+ batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true)
+ enterBarrier("after-" + step)
+ }
+
+ "excercise join/remove/join/remove" taggedAs LongRunningTest in {
+ exerciseJoinRemove("excercise join/remove", joinRemoveDuration)
+ enterBarrier("after-" + step)
+ }
+
+ "excercise supervision" taggedAs LongRunningTest in {
+ exerciseSupervision("excercise supervision", supervisionDuration)
+ enterBarrier("after-" + step)
+ }
+
+ "start routers that are running while nodes are removed" taggedAs LongRunningTest in {
+ runOn(roles.take(3): _*) {
+ system.actorOf(Props(new Master(settings, settings.workBatchInterval, tree = false)),
+ name = "master-" + myself.name) ! Begin
+ }
+ enterBarrier("after-" + step)
+ }
+
+ "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in {
+ removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false)
+ enterBarrier("after-" + step)
+ }
+
+ "shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in {
+ removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true)
+ enterBarrier("after-" + step)
+ }
+
+ "leave several nodes" taggedAs LongRunningTest in {
+ removeSeveral(numberOfNodesLeaving, shutdown = false)
+ nbrUsedRoles -= numberOfNodesLeaving
+ enterBarrier("after-" + step)
+ }
+
+ "shutdown several nodes" taggedAs LongRunningTest in {
+ removeSeveral(numberOfNodesShutdown, shutdown = true)
+ nbrUsedRoles -= numberOfNodesShutdown
+ enterBarrier("after-" + step)
+ }
+
+ "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in {
+ removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false)
+ enterBarrier("after-" + step)
+ }
+
+ "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in {
+ removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true)
+ enterBarrier("after-" + step)
+ }
+
+ "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
+ runOn(roles.take(3): _*) {
+ val m = master
+ m.tell(End, testActor)
+ val workResult = awaitWorkResult
+ workResult.sendCount must be > (0L)
+ workResult.ackCount must be > (0L)
+ }
+ enterBarrier("after-" + step)
+ }
+
+ }
+}
View
2 akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala
@@ -130,7 +130,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
}
runOn(allBut(victim): _*) {
- awaitUpConvergence(roles.size - 1, List(victim))
+ awaitUpConvergence(roles.size - 1, Set(victim))
}
endBarrier

0 comments on commit f147f4d

Please sign in to comment.