-
Notifications
You must be signed in to change notification settings - Fork 10
/
MultiNodeClusterSpec.scala
347 lines (306 loc) · 12.2 KB
/
MultiNodeClusterSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
package org.apache.pekko.cluster.persistence.cassandra
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import org.apache.pekko
import pekko.actor.{ Actor, ActorRef, ActorSystem, Address, RootActorPath }
import pekko.cluster.{ Cluster, ClusterReadView, MemberStatus, _ }
import pekko.event.Logging.ErrorLevel
import pekko.remote.testconductor.RoleName
import pekko.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec }
import pekko.testkit.TestEvent._
import pekko.testkit._
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.exceptions.TestCanceledException
import org.scalatest.{ Canceled, Outcome, Suite }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.language.implicitConversions
object MultiNodeClusterSpec {
def clusterConfigWithFailureDetectorPuppet: Config =
ConfigFactory.parseString(
"pekko.cluster.failure-detector.implementation-class = org.apache.pekko.cluster.FailureDetectorPuppet").withFallback(
clusterConfig)
def clusterConfig(failureDetectorPuppet: Boolean): Config =
if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
def clusterConfig: Config = ConfigFactory.parseString(s"""
pekko.actor.provider = cluster
pekko.actor.warn-about-java-serializer-usage = off
pekko.cluster {
jmx.enabled = off
gossip-interval = 200 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 500 ms
periodic-tasks-initial-delay = 300 ms
publish-stats-interval = 0 s # always, when it happens
failure-detector.heartbeat-interval = 500 ms
run-coordinated-shutdown-when-down = off
sharding {
retry-interval = 200ms
waiting-for-state-timeout = 200ms
}
}
pekko.loglevel = INFO
pekko.log-dead-letters = off
pekko.log-dead-letters-during-shutdown = off
pekko.remote {
log-remote-lifecycle-events = off
artery.advanced.flight-recorder {
enabled=on
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
}
}
pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
pekko.test {
single-expect-default = 5 s
}
""")
// sometimes we need to coordinate test shutdown with messages instead of barriers
object EndActor {
case object SendEnd
case object End
case object EndAck
}
class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
import EndActor._
def receive: Receive = {
case SendEnd =>
target.foreach { t =>
context.actorSelection(RootActorPath(t) / self.path.elements) ! End
}
case End =>
testActor.forward(End)
sender() ! EndAck
case EndAck =>
testActor.forward(EndAck)
}
}
}
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec =>
override def initialParticipants = roles.size
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
override protected def atStartup(): Unit = {
muteLog()
self.atStartup()
}
override protected def afterTermination(): Unit = {
self.afterTermination()
if (failed || sys.props.get("pekko.remote.artery.always-dump-flight-recorder").isDefined) {
printFlightRecording()
}
deleteFlightRecorderFile()
}
def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) {
Seq(
".*Cluster Node.* - registered cluster JMX MBean.*",
".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*",
".*Cluster node successfully shut down.*",
".*Using a dedicated scheduler for cluster.*").foreach { s =>
sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
}
muteDeadLetters(
classOf[pekko.actor.PoisonPill],
classOf[pekko.dispatch.sysmsg.DeathWatchNotification],
classOf[pekko.remote.transport.AssociationHandle.Disassociated],
// pekko.remote.transport.AssociationHandle.Disassociated.getClass,
classOf[pekko.remote.transport.ActorTransportAdapter.DisassociateUnderlying],
// pekko.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
classOf[pekko.remote.transport.AssociationHandle.InboundPayload])(sys)
}
}
def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
def muteMarkingAsReachable(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.info(pattern = ".*Marking.* as REACHABLE.*")))
override def afterAll(): Unit = {
if (!log.isDebugEnabled) {
muteDeadLetters()()
system.eventStream.setLogLevel(ErrorLevel)
}
super.afterAll()
}
/**
* Lookup the Address for the role.
*
* Implicit conversion from RoleName to Address.
*
* It is cached, which has the implication that stopping
* and then restarting a role (jvm) with another address is not
* supported.
*/
implicit def address(role: RoleName): Address = {
cachedAddresses.get(role) match {
case null =>
val address = node(role).address
cachedAddresses.put(role, address)
address
case address => address
}
}
// Cluster tests are written so that if previous step (test method) failed
// it will most likely not be possible to run next step. This ensures
// fail fast of steps after the first failure.
private var failed = false
override protected def withFixture(test: NoArgTest): Outcome =
if (failed) {
Canceled(new TestCanceledException("Previous step failed", 0))
} else {
val out = super.withFixture(test)
if (!out.isSucceeded)
failed = true
out
}
def clusterView: ClusterReadView = cluster.readView
/**
* Get the cluster node to use.
*/
def cluster: Cluster = Cluster(system)
/**
* Use this method for the initial startup of the cluster node.
*/
def startClusterNode(): Unit = {
if (clusterView.members.isEmpty) {
cluster.join(myself)
awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
} else
clusterView.self
}
/**
* Initialize the cluster of the specified member
* nodes (roles) and wait until all joined and `Up`.
* First node will be started first and others will join
* the first.
*/
def awaitClusterUp(roles: RoleName*): Unit = {
runOn(roles.head) {
// make sure that the node-to-join is started before other join
startClusterNode()
}
enterBarrier(roles.head.name + "-started")
if (roles.tail.contains(myself)) {
cluster.join(roles.head)
}
if (roles.contains(myself)) {
awaitMembersUp(numberOfMembers = roles.length)
}
enterBarrier(roles.map(_.name).mkString("-") + "-joined")
}
/**
* Join the specific node within the given period by sending repeated join
* requests at periodic intervals until we succeed.
*/
def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
clusterView.members.exists { m => (m.address == member) && status.contains(m.status) }
cluster.join(joinNode)
awaitCond(
{
clusterView.refreshCurrentState()
if (memberInState(joinNode, List(MemberStatus.up)) &&
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
true
else {
cluster.join(joinNode)
false
}
}, max, interval)
}
/**
* Assert that the member addresses match the expected addresses in the
* sort order used by the cluster.
*/
def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
val members = gotMembers.toIndexedSeq
members.size should ===(expectedAddresses.length)
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) => members(i).address should ===(a) }
}
/**
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def assertLeader(nodesInCluster: RoleName*): Unit =
if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq])
/**
* Assert that the cluster has elected the correct leader
* out of all nodes in the cluster. First
* member in the cluster ring is expected leader.
*
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
if (nodesInCluster.contains(myself)) {
nodesInCluster.length should not be 0
val expectedLeader = roleOfLeader(nodesInCluster)
val leader = clusterView.leader
val isLeader = leader == Some(clusterView.selfAddress)
assert(
isLeader == isNode(expectedLeader),
"expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
clusterView.status should (be(MemberStatus.Up).or(be(MemberStatus.Leaving)))
}
/**
* Wait until the expected number of members has status Up has been reached.
* Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
*/
def awaitMembersUp(
numberOfMembers: Int,
canNotBePartOfMemberRing: Set[Address] = Set.empty,
timeout: FiniteDuration = 25.seconds): Unit = {
within(timeout) {
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
awaitAssert(canNotBePartOfMemberRing.foreach(a => clusterView.members.map(_.address) should not contain a))
awaitAssert(clusterView.members.size should ===(numberOfMembers))
awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
// clusterView.leader is updated by LeaderChanged, await that to be updated also
val expectedLeader = clusterView.members.collectFirst {
case m if m.dataCenter == cluster.settings.SelfDataCenter => m.address
}
awaitAssert(clusterView.leader should ===(expectedLeader))
}
}
def awaitAllReachable(): Unit =
awaitAssert(clusterView.unreachableMembers should ===(Set.empty))
/**
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Address*): Unit =
awaitAssert((addresses.toSet.diff(clusterView.seenBy)) should ===(Set.empty))
/**
* Leader according to the address ordering of the roles.
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
nodesInCluster.length should not be 0
nodesInCluster.sorted.head
}
/**
* Sort the roles in the address order used by the cluster node ring.
*/
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import Member.addressOrdering
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y))
}
def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
}