Skip to content

Commit

Permalink
Merge pull request #927 from akka/wip-2018-cluster-jmx-tests-patriknw
Browse files Browse the repository at this point in the history
Tests for the Cluster JMX API, see #2018
  • Loading branch information
patriknw committed Dec 6, 2012
2 parents a51ba96 + 1df787d commit cd0fa5a
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 70 deletions.
8 changes: 4 additions & 4 deletions akka-cluster/src/main/scala/akka/cluster/Cluster.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
format(system, other.getClass.getName)) format(system, other.getClass.getName))
} }


private val _isRunning = new AtomicBoolean(true) private val _isTerminated = new AtomicBoolean(false)
private val log = Logging(system, "Cluster") private val log = Logging(system, "Cluster")


log.info("Cluster Node [{}] - is starting up...", selfAddress) log.info("Cluster Node [{}] - is starting up...", selfAddress)
Expand Down Expand Up @@ -169,9 +169,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
// ====================================================== // ======================================================


/** /**
* Returns true if the cluster node is up and running, false if it is shut down. * Returns true if this cluster instance has be shutdown.
*/ */
def isRunning: Boolean = _isRunning.get def isTerminated: Boolean = _isTerminated.get


/** /**
* Subscribe to cluster domain events. * Subscribe to cluster domain events.
Expand Down Expand Up @@ -253,7 +253,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`. * to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
*/ */
private[cluster] def shutdown(): Unit = { private[cluster] def shutdown(): Unit = {
if (_isRunning.compareAndSet(true, false)) { if (_isTerminated.compareAndSet(false, true)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)


system.stop(clusterDaemons) system.stop(clusterDaemons)
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto


def isSingletonCluster: Boolean = latestGossip.isSingletonCluster def isSingletonCluster: Boolean = latestGossip.isSingletonCluster


def isAvailable: Boolean = latestGossip.isAvailable(selfAddress) def isAvailable: Boolean = !latestGossip.isUnreachable(selfAddress)


/** /**
* Gossips latest gossip to a random member in the set of members passed in as argument. * Gossips latest gossip to a random member in the set of members passed in as argument.
Expand Down
79 changes: 62 additions & 17 deletions akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,17 +16,70 @@ import javax.management.InstanceNotFoundException
* Interface for the cluster JMX MBean. * Interface for the cluster JMX MBean.
*/ */
trait ClusterNodeMBean { trait ClusterNodeMBean {

/**
* Member status for this node.
*/
def getMemberStatus: String def getMemberStatus: String

/**
* Comma separated addresses of member nodes, sorted in the cluster ring order.
* The address format is `akka://actor-system-name@hostname:port`
*/
def getMembers: String

/**
* Comma separated addresses of unreachable member nodes.
* The address format is `akka://actor-system-name@hostname:port`
*/
def getUnreachable: String

/*
* String that will list all nodes in the node ring as follows:
* {{{
* Members:
* Member(address = akka://system0@localhost:5550, status = Up)
* Member(address = akka://system1@localhost:5551, status = Up)
* Unreachable:
* Member(address = akka://system2@localhost:5553, status = Down)
* }}}
*/
def getClusterStatus: String def getClusterStatus: String

/**
* Get the address of the current leader.
* The address format is `akka://actor-system-name@hostname:port`
*/
def getLeader: String def getLeader: String


/**
* Does the cluster consist of only one member?
*/
def isSingleton: Boolean def isSingleton: Boolean
def isConvergence: Boolean
/**
* Returns true if the node is not unreachable and not `Down`
* and not `Removed`.
*/
def isAvailable: Boolean def isAvailable: Boolean
def isRunning: Boolean


/**
* Try to join this cluster node with the node specified by 'address'.
* The address format is `akka://actor-system-name@hostname:port`.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: String) def join(address: String)

/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The address format is `akka://actor-system-name@hostname:port`
*/
def leave(address: String) def leave(address: String)

/**
* Send command to DOWN the node specified by 'address'.
* The address format is `akka://actor-system-name@hostname:port`
*/
def down(address: String) def down(address: String)
} }


Expand All @@ -47,34 +100,26 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {


// JMX attributes (bean-style) // JMX attributes (bean-style)


/*
* Sends a string to the JMX client that will list all nodes in the node ring as follows:
* {{{
* Members:
* Member(address = akka://system0@localhost:5550, status = Up)
* Member(address = akka://system1@localhost:5551, status = Up)
* Unreachable:
* Member(address = akka://system2@localhost:5553, status = Down)
* }}}
*/
def getClusterStatus: String = { def getClusterStatus: String = {
val unreachable = clusterView.unreachableMembers val unreachable = clusterView.unreachableMembers
"\nMembers:\n\t" + clusterView.members.mkString("\n\t") + "\nMembers:\n\t" + clusterView.members.mkString("\n\t") +
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" }
} }


def getMembers: String =
clusterView.members.toSeq.map(_.address).mkString(",")

def getUnreachable: String =
clusterView.unreachableMembers.map(_.address).mkString(",")

def getMemberStatus: String = clusterView.status.toString def getMemberStatus: String = clusterView.status.toString


def getLeader: String = clusterView.leader.toString def getLeader: String = clusterView.leader.fold("")(_.toString)


def isSingleton: Boolean = clusterView.isSingletonCluster def isSingleton: Boolean = clusterView.isSingletonCluster


def isConvergence: Boolean = clusterView.convergence

def isAvailable: Boolean = clusterView.isAvailable def isAvailable: Boolean = clusterView.isAvailable


def isRunning: Boolean = clusterView.isRunning

// JMX commands // JMX commands


def join(address: String) = cluster.join(AddressFromURIString(address)) def join(address: String) = cluster.join(AddressFromURIString(address))
Expand Down
13 changes: 8 additions & 5 deletions akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
} }


/** /**
* Returns true if the cluster node is up and running, false if it is shut down. * Returns true if this cluster instance has be shutdown.
*/ */
def isRunning: Boolean = cluster.isRunning def isTerminated: Boolean = cluster.isTerminated


/** /**
* Current cluster members, sorted by address. * Current cluster members, sorted by address.
Expand Down Expand Up @@ -108,7 +108,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def leader: Option[Address] = state.leader def leader: Option[Address] = state.leader


/** /**
* Is this node a singleton cluster? * Does the cluster consist of only one member?
*/ */
def isSingletonCluster: Boolean = members.size == 1 def isSingletonCluster: Boolean = members.size == 1


Expand All @@ -118,11 +118,14 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def convergence: Boolean = state.convergence def convergence: Boolean = state.convergence


/** /**
* Returns true if the node is UP or JOINING. * Returns true if the node is not unreachable and not `Down`
* and not `Removed`.
*/ */
def isAvailable: Boolean = { def isAvailable: Boolean = {
val myself = self val myself = self
!unreachableMembers.contains(myself) && !myself.status.isUnavailable !unreachableMembers.contains(myself) &&
myself.status != MemberStatus.Down &&
myself.status != MemberStatus.Removed
} }


/** /**
Expand Down
11 changes: 3 additions & 8 deletions akka-cluster/src/main/scala/akka/cluster/Gossip.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -168,15 +168,10 @@ private[cluster] case class Gossip(
def isSingletonCluster: Boolean = members.size == 1 def isSingletonCluster: Boolean = members.size == 1


/** /**
* Returns true if the node is UP or JOINING. * Returns true if the node is in the unreachable set
*/ */
def isAvailable(address: Address): Boolean = !isUnavailable(address) def isUnreachable(address: Address): Boolean =

overview.unreachable exists { _.address == address }
def isUnavailable(address: Address): Boolean = {
val isUnreachable = overview.unreachable exists { _.address == address }
val hasUnavailableMemberStatus = members exists { m m.status.isUnavailable && m.address == address }
isUnreachable || hasUnavailableMemberStatus
}


def member(address: Address): Member = { def member(address: Address): Member = {
members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)). members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)).
Expand Down
8 changes: 1 addition & 7 deletions akka-cluster/src/main/scala/akka/cluster/Member.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -87,13 +87,7 @@ object Member {
* *
* Can be one of: Joining, Up, Leaving, Exiting and Down. * Can be one of: Joining, Up, Leaving, Exiting and Down.
*/ */
abstract class MemberStatus extends ClusterMessage { abstract class MemberStatus extends ClusterMessage

/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
*/
def isUnavailable: Boolean = this == Down
}


object MemberStatus { object MemberStatus {
case object Joining extends MemberStatus case object Joining extends MemberStatus
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ abstract class LeaderLeavingSpec
enterBarrier("leader-left") enterBarrier("leader-left")


// verify that the LEADER is shut down // verify that the LEADER is shut down
awaitCond(!cluster.isRunning) awaitCond(cluster.isTerminated)


// verify that the LEADER is REMOVED // verify that the LEADER is REMOVED
awaitCond(clusterView.status == Removed) awaitCond(clusterView.status == Removed)
Expand Down
149 changes: 149 additions & 0 deletions akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster

import language.postfixOps
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import java.lang.management.ManagementFactory
import javax.management.InstanceNotFoundException
import javax.management.ObjectName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.util.Try

object MBeanMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")

commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster.jmx.enabled = on
""")).withFallback(MultiNodeClusterSpec.clusterConfig))

}

class MBeanMultiJvmNode1 extends MBeanSpec
class MBeanMultiJvmNode2 extends MBeanSpec
class MBeanMultiJvmNode3 extends MBeanSpec
class MBeanMultiJvmNode4 extends MBeanSpec

abstract class MBeanSpec
extends MultiNodeSpec(MBeanMultiJvmSpec)
with MultiNodeClusterSpec {

import MBeanMultiJvmSpec._
import ClusterEvent._

val mbeanName = new ObjectName("akka:type=Cluster")
lazy val mbeanServer = ManagementFactory.getPlatformMBeanServer

"Cluster MBean" must {
"expose attributes" taggedAs LongRunningTest in {
val info = mbeanServer.getMBeanInfo(mbeanName)
info.getAttributes.map(_.getName).toSet must be(Set(
"ClusterStatus", "Members", "Unreachable", "MemberStatus", "Leader", "Singleton", "Available"))
enterBarrier("after-1")
}

"expose operations" taggedAs LongRunningTest in {
val info = mbeanServer.getMBeanInfo(mbeanName)
info.getOperations.map(_.getName).toSet must be(Set(
"join", "leave", "down"))
enterBarrier("after-2")
}

"change attributes after startup" taggedAs LongRunningTest in {
runOn(first) {
mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(false)
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)
mbeanServer.getAttribute(mbeanName, "Leader") must be("")
mbeanServer.getAttribute(mbeanName, "Members") must be("")
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
mbeanServer.getAttribute(mbeanName, "MemberStatus") must be("Removed")
}
awaitClusterUp(first)
runOn(first) {
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up")
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == address(first).toString)
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true)
mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString)
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] must be(true)
}
enterBarrier("after-3")
}

"support join" taggedAs LongRunningTest in {
runOn(second, third, fourth) {
mbeanServer.invoke(mbeanName, "join", Array(address(first).toString), Array("java.lang.String"))
}
enterBarrier("joined")

awaitUpConvergence(4)
assertMembers(clusterView.members, roles.map(address(_)): _*)
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up")
val expectedMembers = roles.sorted.map(address(_)).mkString(",")
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
val expectedLeader = address(roleOfLeader())
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == expectedLeader.toString)
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)

enterBarrier("after-4")
}

"support down" taggedAs LongRunningTest in {
val fourthAddress = address(fourth)
runOn(first) {
testConductor.shutdown(fourth, 0).await
}
enterBarrier("fourth-shutdown")

runOn(first, second, third) {
awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == fourthAddress.toString)
val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(",")
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
}
enterBarrier("fourth-unreachable")

runOn(second) {
mbeanServer.invoke(mbeanName, "down", Array(fourthAddress.toString), Array("java.lang.String"))
}
enterBarrier("fourth-down")

runOn(first, second, third) {
awaitUpConvergence(3, canNotBePartOfMemberRing = List(fourthAddress))
assertMembers(clusterView.members, first, second, third)
}

enterBarrier("after-5")
}

"support leave" taggedAs LongRunningTest in within(20 seconds) {
runOn(second) {
mbeanServer.invoke(mbeanName, "leave", Array(address(third).toString), Array("java.lang.String"))
}
enterBarrier("third-left")
runOn(first, second) {
awaitUpConvergence(2)
assertMembers(clusterView.members, first, second)
val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(",")
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers)
}
runOn(third) {
awaitCond(cluster.isTerminated)
// mbean should be unregistered, i.e. throw InstanceNotFoundException
awaitCond(Try { mbeanServer.getMBeanInfo(mbeanName); false } recover {
case e: InstanceNotFoundException true
case _ false
} get)
}

enterBarrier("after-6")
}

}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec


runOn(second) { runOn(second) {
// verify that the second node is shut down and has status REMOVED // verify that the second node is shut down and has status REMOVED
awaitCond(!cluster.isRunning, reaperWaitingTime) awaitCond(cluster.isTerminated, reaperWaitingTime)
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime) awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
} }


Expand Down
Loading

0 comments on commit cd0fa5a

Please sign in to comment.