Permalink
Browse files

Merge pull request #940 from akka/wip-2786-cluster-stress-patriknw

Stress / long running test of cluster, see #2786
  • Loading branch information...
2 parents 7944b45 + 46d376b commit a0cb4b378222c3b210a57c8152e280cb4e038009 @patriknw patriknw committed Jan 8, 2013
@@ -284,18 +284,24 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
// keep the latestGossip to be sent to new subscribers
latestGossip = newGossip
// first publish the diffUnreachable between the last two gossips
- diffUnreachable(oldGossip, newGossip) foreach { event
- publish(event)
- // notify DeathWatch about unreachable node
- publish(AddressTerminated(event.member.address))
- }
+ diffUnreachable(oldGossip, newGossip) foreach publish
// buffer up the MemberEvents waiting for convergence
memberEvents ++= diffMemberEvents(oldGossip, newGossip)
// if we have convergence then publish the MemberEvents and possibly a LeaderChanged
if (newGossip.convergence) {
val previousConvergedGossip = latestConvergedGossip
latestConvergedGossip = newGossip
- memberEvents foreach publish
+ memberEvents foreach { event
+ event match {
+ case m @ (MemberDowned(_) | MemberRemoved(_))
+ // TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788
+ // but right now we don't change Downed to Removed
+ publish(event)
+ // notify DeathWatch about downed node
+ publish(AddressTerminated(m.member.address))
+ case _ publish(event)
+ }
+ }
memberEvents = immutable.Seq.empty
diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish
}
@@ -116,6 +116,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case HeartbeatTick heartbeat()
case s: CurrentClusterState reset(s)
case UnreachableMember(m) removeMember(m)
+ case MemberDowned(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) addJoinInProgress(a, d)
@@ -140,7 +140,6 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
* Unregisters the cluster JMX MBean from MBean server.
*/
def unregisterMBean(): Unit = {
- clusterView.close()
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
@@ -52,7 +52,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
cluster.down(thirdAddress)
enterBarrier("down-third-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
@@ -63,7 +63,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
runOn(second, fourth) {
enterBarrier("down-third-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
}
enterBarrier("await-completion")
@@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT
markNodeAsUnavailable(thirdAddress)
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
@@ -61,7 +61,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT
runOn(second, fourth) {
enterBarrier("down-third-node")
- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress))
+ awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress))
}
enterBarrier("await-completion")
Oops, something went wrong.

0 comments on commit a0cb4b3

Please sign in to comment.