Skip to content

Commit

Permalink
wrap up MultiNodeSpec, see #1934 and #2063
Browse files Browse the repository at this point in the history
- restructure message classes in sealed traits according to message flow
  direction and include confirmed/unconfirmed status in the type
- add GetAddress query for obtaining the remote transport address of
  another test participant
- add reconnects to Player
- add small DSL with runOn(node...), ifNode(node...)(<true>)(<false>)
  and node(<node>):ActorPath
- rewrite TestConductorSpec to use that DSL and run within a single test
  procedure instead of separate NodeX classes
- hook up that test into current multi-jvm infrastructure temporarily
  for testing (will use Björn’s new remote-multi-jvm stuff later)
  • Loading branch information
rkuhn committed May 18, 2012
1 parent 439f653 commit c860515
Show file tree
Hide file tree
Showing 13 changed files with 1,290 additions and 301 deletions.

Large diffs are not rendered by default.

Expand Up @@ -16,6 +16,7 @@ message Wrapper {
optional EnterBarrier barrier = 2;
optional InjectFailure failure = 3;
optional string done = 4;
optional AddressRequest addr = 5;
}

message Hello {
Expand All @@ -25,7 +26,12 @@ message Hello {

message EnterBarrier {
required string name = 1;
optional bool failed = 2;
optional bool status = 2;
}

message AddressRequest {
required string node = 1;
optional Address addr = 2;
}

message Address {
Expand Down
15 changes: 7 additions & 8 deletions akka-remote-tests/src/main/resources/reference.conf
Expand Up @@ -20,15 +20,14 @@ akka {
# than HashedWheelTimer resolution (would not make sense)
packet-split-threshold = 100ms

# Default port to start the conductor on; 0 means <auto>
port = 0
# amount of time for the ClientFSM to wait for the connection to the conductor
# to be successful
connect-timeout = 20s

# Hostname of the TestConductor server, used by the server to bind to the IP
# and by the client to connect to it.
host = localhost
# Number of connect attempts to be made to the conductor controller
client-reconnects = 10

# Name of the TestConductor client (for identification on the server e.g. for
# failure injection)
name = "noname"
# minimum time interval which is to be inserted between reconnect attempts
reconnect-backoff = 1s
}
}
Expand Up @@ -71,21 +71,21 @@ trait Conductor { this: TestConductorExt ⇒
* @param participants gives the number of participants which shall connect
* before any of their startClient() operations complete.
*/
def startController(participants: Int): Future[Int] = {
def startController(participants: Int, name: String, controllerPort: InetSocketAddress): Future[InetSocketAddress] = {
if (_controller ne null) throw new RuntimeException("TestConductorServer was already started")
_controller = system.actorOf(Props(new Controller(participants)), "controller")
_controller = system.actorOf(Props(new Controller(participants, controllerPort)), "controller")
import Settings.BarrierTimeout
controller ? GetPort flatMap { case port: Int startClient(port) map (_ port) }
controller ? GetSockAddr flatMap { case sockAddr: InetSocketAddress startClient(name, sockAddr) map (_ sockAddr) }
}

/**
* Obtain the port to which the controller’s socket is actually bound. This
* will deviate from the configuration in `akka.testconductor.port` in case
* that was given as zero.
*/
def port: Future[Int] = {
def sockAddr: Future[InetSocketAddress] = {
import Settings.QueryTimeout
controller ? GetPort mapTo
controller ? GetSockAddr mapTo
}

/**
Expand Down Expand Up @@ -280,7 +280,7 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
channel.close()
stop()
case Event(Send(msg), _)
case Event(ToClient(msg), _)
log.warning("cannot send {} in state Initial", msg)
stay
case Event(StateTimeout, _)
Expand All @@ -290,22 +290,22 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
}

when(Ready) {
case Event(msg: EnterBarrier, _)
controller ! msg
stay
case Event(d: Done, Some(s))
s ! d
stay using None
case Event(op: ServerOp, _)
controller ! op
stay
case Event(msg: NetworkOp, _)
log.warning("client {} sent unsupported message {}", getAddrString(channel), msg)
stop()
case Event(Send(msg @ (_: EnterBarrier | _: Done)), _)
case Event(ToClient(msg: UnconfirmedClientOp), _)
channel.write(msg)
stay
case Event(Send(msg), None)
case Event(ToClient(msg), None)
channel.write(msg)
stay using Some(sender)
case Event(Send(msg), _)
case Event(ToClient(msg), _)
log.warning("cannot send {} while waiting for previous ACK", msg)
stay
}
Expand All @@ -320,7 +320,7 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
object Controller {
case class ClientDisconnected(name: String)
case object GetNodes
case object GetPort
case object GetSockAddr

case class NodeInfo(name: String, addr: Address, fsm: ActorRef)
}
Expand All @@ -330,12 +330,12 @@ object Controller {
* [[akka.remote.testconductor.BarrierCoordinator]], its child) and allowing
* network and other failures to be injected at the test nodes.
*/
class Controller(private var initialParticipants: Int) extends Actor {
class Controller(private var initialParticipants: Int, controllerPort: InetSocketAddress) extends Actor {
import Controller._
import BarrierCoordinator._

val settings = TestConductor().Settings
val connection = RemoteConnection(Server, settings.host, settings.port,
val connection = RemoteConnection(Server, controllerPort,
new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler")))

/*
Expand All @@ -348,61 +348,73 @@ class Controller(private var initialParticipants: Int) extends Actor {
override def supervisorStrategy = OneForOneStrategy() {
case BarrierTimeout(data) SupervisorStrategy.Resume
case BarrierEmpty(data, msg) SupervisorStrategy.Resume
case WrongBarrier(name, client, data) client ! Send(BarrierFailed(name)); failBarrier(data)
case WrongBarrier(name, client, data) client ! ToClient(BarrierResult(name, false)); failBarrier(data)
case ClientLost(data, node) failBarrier(data)
case DuplicateNode(data, node) failBarrier(data)
}

def failBarrier(data: Data): SupervisorStrategy.Directive = {
for (c data.arrived) c ! Send(BarrierFailed(data.barrier))
for (c data.arrived) c ! ToClient(BarrierResult(data.barrier, false))
SupervisorStrategy.Restart
}

val barrier = context.actorOf(Props[BarrierCoordinator], "barriers")
var nodes = Map[String, NodeInfo]()

// map keeping unanswered queries for node addresses (enqueued upon GetAddress, serviced upon NodeInfo)
var addrInterest = Map[String, Set[ActorRef]]()

override def receive = LoggingReceive {
case c @ NodeInfo(name, addr, fsm)
barrier forward c
if (nodes contains name) {
if (initialParticipants > 0) {
for (NodeInfo(_, _, client) nodes.values) client ! Send(BarrierFailed("initial startup"))
for (NodeInfo(_, _, client) nodes.values) client ! ToClient(BarrierResult("initial startup", false))
initialParticipants = 0
}
fsm ! Send(BarrierFailed("initial startup"))
fsm ! ToClient(BarrierResult("initial startup", false))
} else {
nodes += name -> c
if (initialParticipants <= 0) fsm ! Send(Done)
if (initialParticipants <= 0) fsm ! ToClient(Done)
else if (nodes.size == initialParticipants) {
for (NodeInfo(_, _, client) nodes.values) client ! Send(Done)
for (NodeInfo(_, _, client) nodes.values) client ! ToClient(Done)
initialParticipants = 0
}
if (addrInterest contains name) {
addrInterest(name) foreach (_ ! ToClient(AddressReply(name, addr)))
addrInterest -= name
}
}
case c @ ClientDisconnected(name)
nodes -= name
barrier forward c
case e @ EnterBarrier(name)
barrier forward e
case Throttle(node, target, direction, rateMBit)
val t = nodes(target)
nodes(node).fsm forward Send(ThrottleMsg(t.addr, direction, rateMBit))
case Disconnect(node, target, abort)
val t = nodes(target)
nodes(node).fsm forward Send(DisconnectMsg(t.addr, abort))
case Terminate(node, exitValueOrKill)
if (exitValueOrKill < 0) {
// TODO: kill via SBT
} else {
nodes(node).fsm forward Send(TerminateMsg(exitValueOrKill))
case op: ServerOp
op match {
case _: EnterBarrier barrier forward op
case GetAddress(node)
if (nodes contains node) sender ! ToClient(AddressReply(node, nodes(node).addr))
else addrInterest += node -> ((addrInterest get node getOrElse Set()) + sender)
}
case op: CommandOp
op match {
case Throttle(node, target, direction, rateMBit)
val t = nodes(target)
nodes(node).fsm forward ToClient(ThrottleMsg(t.addr, direction, rateMBit))
case Disconnect(node, target, abort)
val t = nodes(target)
nodes(node).fsm forward ToClient(DisconnectMsg(t.addr, abort))
case Terminate(node, exitValueOrKill)
if (exitValueOrKill < 0) {
// TODO: kill via SBT
} else {
nodes(node).fsm forward ToClient(TerminateMsg(exitValueOrKill))
}
case Remove(node)
nodes -= node
barrier ! BarrierCoordinator.RemoveClient(node)
}
case Remove(node)
nodes -= node
barrier ! BarrierCoordinator.RemoveClient(node)
case GetNodes sender ! nodes.keys
case GetPort
sender ! (connection.getLocalAddress match {
case inet: InetSocketAddress inet.getPort
})
case GetNodes sender ! nodes.keys
case GetSockAddr sender ! connection.getLocalAddress
}
}

Expand Down Expand Up @@ -463,13 +475,13 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
}

when(Idle) {
case Event(e @ EnterBarrier(name), d @ Data(clients, _, _))
case Event(EnterBarrier(name), d @ Data(clients, _, _))
if (failed)
stay replying Send(BarrierFailed(name))
stay replying ToClient(BarrierResult(name, false))
else if (clients.map(_.fsm) == Set(sender))
stay replying Send(e)
stay replying ToClient(BarrierResult(name, true))
else if (clients.find(_.fsm == sender).isEmpty)
stay replying Send(BarrierFailed(name))
stay replying ToClient(BarrierResult(name, false))
else
goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil)
case Event(RemoveClient(name), d @ Data(clients, _, _))
Expand All @@ -483,7 +495,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
}

when(Waiting) {
case Event(e @ EnterBarrier(name), d @ Data(clients, barrier, arrived))
case Event(EnterBarrier(name), d @ Data(clients, barrier, arrived))
if (name != barrier || clients.find(_.fsm == sender).isEmpty) throw WrongBarrier(name, sender, d)
val together = sender :: arrived
handleBarrier(d.copy(arrived = together))
Expand All @@ -504,8 +516,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
if (data.arrived.isEmpty) {
goto(Idle) using data.copy(barrier = "")
} else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) {
val e = EnterBarrier(data.barrier)
data.arrived foreach (_ ! Send(e))
data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, true)))
goto(Idle) using data.copy(barrier = "", arrived = Nil)
} else {
stay using data
Expand Down
Expand Up @@ -11,27 +11,42 @@ import com.google.protobuf.Message
import akka.actor.Address
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder

case class Send(msg: NetworkOp)
case class ToClient(msg: ClientOp with NetworkOp)
case class ToServer(msg: ServerOp with NetworkOp)

sealed trait ClientOp // messages sent to Player FSM
sealed trait ServerOp // messages sent to Conductor FSM
sealed trait ClientOp // messages sent to from Conductor to Player
sealed trait ServerOp // messages sent to from Player to Conductor
sealed trait CommandOp // messages sent from TestConductorExt to Conductor
sealed trait NetworkOp // messages sent over the wire
sealed trait UnconfirmedClientOp extends ClientOp // unconfirmed messages going to the Player
sealed trait ConfirmedClientOp extends ClientOp

/**
* First message of connection sets names straight.
*/
case class Hello(name: String, addr: Address) extends NetworkOp
case class EnterBarrier(name: String) extends ClientOp with ServerOp with NetworkOp
case class BarrierFailed(name: String) extends NetworkOp
case class Throttle(node: String, target: String, direction: Direction, rateMBit: Float) extends ServerOp
case class ThrottleMsg(target: Address, direction: Direction, rateMBit: Float) extends NetworkOp
case class Disconnect(node: String, target: String, abort: Boolean) extends ServerOp
case class DisconnectMsg(target: Address, abort: Boolean) extends NetworkOp
case class Terminate(node: String, exitValueOrKill: Int) extends ServerOp
case class TerminateMsg(exitValue: Int) extends NetworkOp
abstract class Done extends NetworkOp

case class EnterBarrier(name: String) extends ServerOp with NetworkOp
case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp

case class Throttle(node: String, target: String, direction: Direction, rateMBit: Float) extends CommandOp
case class ThrottleMsg(target: Address, direction: Direction, rateMBit: Float) extends ConfirmedClientOp with NetworkOp

case class Disconnect(node: String, target: String, abort: Boolean) extends CommandOp
case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp

case class Terminate(node: String, exitValueOrKill: Int) extends CommandOp
case class TerminateMsg(exitValue: Int) extends ConfirmedClientOp with NetworkOp

case class GetAddress(node: String) extends ServerOp with NetworkOp
case class AddressReply(node: String, addr: Address) extends UnconfirmedClientOp with NetworkOp

abstract class Done extends ServerOp with UnconfirmedClientOp with NetworkOp
case object Done extends Done {
def getInstance: Done = this
}

case class Remove(node: String) extends ServerOp
case class Remove(node: String) extends CommandOp

class MsgEncoder extends OneToOneEncoder {
def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
Expand All @@ -42,8 +57,8 @@ class MsgEncoder extends OneToOneEncoder {
w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr))
case EnterBarrier(name)
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name))
case BarrierFailed(name)
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setFailed(true))
case BarrierResult(name, success)
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setStatus(success))
case ThrottleMsg(target, dir, rate)
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
.setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate))
Expand All @@ -52,6 +67,10 @@ class MsgEncoder extends OneToOneEncoder {
.setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect))
case TerminateMsg(exitValue)
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown).setExitValue(exitValue))
case GetAddress(node)
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node))
case AddressReply(node, addr)
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node).setAddr(addr))
case _: Done
w.setDone("")
}
Expand All @@ -68,7 +87,7 @@ class MsgDecoder extends OneToOneDecoder {
Hello(h.getName, h.getAddress)
} else if (w.hasBarrier) {
val barrier = w.getBarrier
if (barrier.hasFailed && barrier.getFailed) BarrierFailed(barrier.getName)
if (barrier.hasStatus) BarrierResult(barrier.getName, barrier.getStatus)
else EnterBarrier(w.getBarrier.getName)
} else if (w.hasFailure) {
val f = w.getFailure
Expand All @@ -79,6 +98,10 @@ class MsgDecoder extends OneToOneDecoder {
case FT.Disconnect DisconnectMsg(f.getAddress, false)
case FT.Shutdown TerminateMsg(f.getExitValue)
}
} else if (w.hasAddr) {
val a = w.getAddr
if (a.hasAddr) AddressReply(a.getNode, a.getAddr)
else GetAddress(a.getNode)
} else if (w.hasDone) {
Done
} else {
Expand Down
Expand Up @@ -38,13 +38,13 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C
object Settings {
val config = system.settings.config

val ConnectTimeout = Duration(config.getMilliseconds("akka.testconductor.connect-timeout"), MILLISECONDS)
val ClientReconnects = config.getInt("akka.testconductor.client-reconnects")
val ReconnectBackoff = Duration(config.getMilliseconds("akka.testconductor.reconnect-backoff"), MILLISECONDS)

implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("akka.testconductor.barrier-timeout"), MILLISECONDS))
implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("akka.testconductor.query-timeout"), MILLISECONDS))
val PacketSplitThreshold = Duration(config.getMilliseconds("akka.testconductor.packet-split-threshold"), MILLISECONDS)

val name = config.getString("akka.testconductor.name")
val host = config.getString("akka.testconductor.host")
val port = config.getInt("akka.testconductor.port")
}

val transport = system.provider.asInstanceOf[RemoteActorRefProvider].transport
Expand Down

0 comments on commit c860515

Please sign in to comment.