Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #992 from akka/wip-2843-port-zero-∂π

ticket 2843: make all ports dynamic in remoting tests
  • Loading branch information...
commit cca5e5a4f1b94c1e550528fb0ed15f3658c3aa53 2 parents f2aa947 + 6c31d53
@rkuhn rkuhn authored
Showing with 134 additions and 67 deletions.
  1. +1 −1  akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala
  2. +1 −1  akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala
  3. +1 −1  akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala
  4. +2 −2 ...-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala
  5. +2 −2 akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala
  6. +2 −2 akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
  7. +24 −13 akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
  8. +6 −1 akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala
  9. +6 −1 akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala
  10. +36 −16 akka-remote/src/test/scala/akka/remote/RemotingSpec.scala
  11. +1 −1  akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala
  12. +1 −1  akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala
  13. +7 −7 akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala
  14. +19 −8 akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala
  15. +20 −8 akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala
  16. +5 −2 akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
View
2  akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala
@@ -100,7 +100,7 @@ abstract class LargeClusterSpec
*/
override def cluster: Cluster = Cluster(system)
- override def atTermination(): Unit = {
+ override def afterTermination(): Unit = {
systems foreach { _.shutdown }
val shutdownTimeout = 20.seconds
val deadline = Deadline.now + shutdownTimeout
View
2  akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala
@@ -127,7 +127,7 @@ class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config
override def atStartup() {
}
- override def atTermination() {
+ override def afterTermination() {
}
"MyMailbox" must {
View
2  akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala
@@ -28,7 +28,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
val other = ActorSystem("remote", system.settings.config)
val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("tcp.akka", "s", "host", 1)).get
- override def atTermination() { other.shutdown() }
+ override def afterTermination() { other.shutdown() }
"demonstrate programmatic deployment" in {
//#deploy
View
4 ...rable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala
@@ -45,8 +45,8 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp
super.atStartup()
}
- override def atTermination() {
+ override def afterTermination() {
clean()
- super.atTermination()
+ super.afterTermination()
}
}
View
4 akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala
@@ -84,14 +84,14 @@ abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
- atTermination()
+ afterTermination()
}
/**
* May be implemented in concrete subclass to do additional things once after all
* test cases have been run.
*/
- def atTermination() {}
+ def afterTermination() {}
protected def streamMustContain(in: InputStream, words: String): Unit = {
val output = new Array[Byte](8192)
View
4 akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
@@ -272,7 +272,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
if (verifySystemShutdown) throw new RuntimeException(msg)
else system.log.warning(msg)
}
- atTermination()
+ afterTermination()
}
/**
@@ -293,7 +293,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
/**
* Override this method to do something when the whole test is terminating.
*/
- protected def atTermination(): Unit = {}
+ protected def afterTermination(): Unit = {}
/**
* All registered roles
View
37 akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
@@ -41,20 +41,31 @@ akka {
remote.transport = "akka.remote.netty.NettyRemoteTransport"
remote.netty {
hostname = localhost
- port = 12345
- }
- actor.deployment {
- /blub.remote = "akka://remote-sys@localhost:12346"
- /looker/child.remote = "akka://remote-sys@localhost:12346"
- /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
+ port = 0
}
}
""") with ImplicitSender with DefaultTimeout {
import RemoteCommunicationSpec._
- val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)
- val other = ActorSystem("remote-sys", conf)
+ val other = ActorSystem("remote-sys", system.settings.config)
+
+ val localAddr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+ val remoteAddr = other.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+
+ val deploys = Seq(
+ Deploy("/blub", scope = RemoteScope(remoteAddr)),
+ Deploy("/looker/child", scope = RemoteScope(remoteAddr)),
+ Deploy("/looker/child/grandchild", scope = RemoteScope(localAddr)))
+
+ def deploy(sys: ActorSystem, d: Deploy) {
+ sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
+ }
+
+ for (d deploys) {
+ deploy(system, d)
+ deploy(other, d)
+ }
val remote = other.actorOf(Props(new Actor {
def receive = {
@@ -62,9 +73,9 @@ akka {
}
}), "echo")
- val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
+ val here = system.actorFor(RootActorPath(remoteAddr) / "user" / "echo")
- override def atTermination() {
+ override def afterTermination() {
other.shutdown()
}
@@ -81,7 +92,7 @@ akka {
val old = other.eventStream.logLevel
other.eventStream.setLogLevel(Logging.DebugLevel)
EventFilter.debug(start = "dropping", occurrences = 1).intercept {
- system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping"
+ system.actorFor(RootActorPath(remoteAddr.copy(system = "remotesys")) / "user" / "echo") ! "ping"
}(other)
other.eventStream.setLogLevel(old)
}
@@ -95,13 +106,13 @@ akka {
"send dead letters on remote if actor does not exist" in {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
- system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
+ system.actorFor(RootActorPath(remoteAddr) / "does" / "not" / "exist") ! "buh"
}(other)
}
"create and supervise children on remote node" in {
val r = system.actorOf(Props[Echo], "blub")
- r.path.toString must be === "akka://remote-sys@localhost:12346/remote/akka/RemoteCommunicationSpec@localhost:12345/user/blub"
+ r.path.toString must be === s"akka://remote-sys@localhost:${remoteAddr.port.get}/remote/akka/RemoteCommunicationSpec@localhost:${localAddr.port.get}/user/blub"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
View
7 akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala
@@ -26,7 +26,12 @@ akka {
val other = ActorSystem("other", ConfigFactory.parseString("akka.remoting.transports.tcp.port=2666")
.withFallback(system.settings.config))
- override def atTermination() {
+ override def beforeTermination() {
+ system.eventStream.publish(TestEvent.Mute(
+ EventFilter.warning(pattern = "received dead letter.*Disassociate")))
+ }
+
+ override def afterTermination() {
other.shutdown()
}
View
7 akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala
@@ -70,7 +70,7 @@ akka.actor.deployment {
}""").withFallback(system.settings.config)
val otherSystem = ActorSystem("remote-sys", conf)
- override def atTermination() {
+ override def afterTermination() {
otherSystem.shutdown()
}
@@ -219,4 +219,9 @@ akka.actor.deployment {
}
+ override def beforeTermination() {
+ system.eventStream.publish(TestEvent.Mute(
+ EventFilter.warning(pattern = "received dead letter.*Disassociate")))
+ }
+
}
View
52 akka-remote/src/test/scala/akka/remote/RemotingSpec.scala
@@ -53,9 +53,9 @@ object RemotingSpec {
remoting.log-remote-lifecycle-events = on
remoting.enabled-transports = [test, tcp, udp, ssl]
- remoting.transports.tcp.port = 12345
- remoting.transports.udp.port = 12345
- remoting.transports.ssl.port = 23456
+ remoting.transports.tcp.port = 0
+ remoting.transports.udp.port = 0
+ remoting.transports.ssl.port = 0
remoting.transports.ssl.ssl = ${common-ssl-settings}
remoting.transports.test {
@@ -69,9 +69,6 @@ object RemotingSpec {
actor.deployment {
/blub.remote = "test.akka://remote-sys@localhost:12346"
- /gonk.remote = "tcp.akka://remote-sys@localhost:12346"
- /zagzag.remote = "udp.akka://remote-sys@localhost:12346"
- /roghtaar.remote = "tcp.ssl.akka://remote-sys@localhost:23457"
/looker/child.remote = "test.akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "test.akka://RemotingSpec@localhost:12345"
}
@@ -90,14 +87,25 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
val conf = ConfigFactory.parseString(
"""
akka.remoting.transports {
- tcp.port = 12346
- udp.port = 12346
- ssl.port = 23457
test.local-address = "test://remote-sys@localhost:12346"
}
""").withFallback(system.settings.config).resolve()
val other = ActorSystem("remote-sys", conf)
+ for (
+ (name, proto) Seq(
+ "/gonk" -> "tcp",
+ "/zagzag" -> "udp",
+ "/roghtaar" -> "tcp.ssl")
+ ) deploy(system, Deploy(name, scope = RemoteScope(addr(other, proto))))
+
+ def addr(sys: ActorSystem, proto: String) =
+ sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"$proto.akka", "", "", 0)).get
+ def port(sys: ActorSystem, proto: String) = addr(sys, proto).port.get
+ def deploy(sys: ActorSystem, d: Deploy) {
+ sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
+ }
+
val remote = other.actorOf(Props(new Actor {
def receive = {
case "ping" sender ! (("pong", sender))
@@ -106,7 +114,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
val here = system.actorFor("test.akka://remote-sys@localhost:12346/user/echo")
- override def atTermination() {
+ override def afterTermination() {
other.shutdown()
AssociationRegistry.clear()
}
@@ -119,7 +127,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
}
"send error message for wrong address" in {
- EventFilter.error(start = "AssociationError").intercept {
+ filterEvents(EventFilter[EndpointException](occurrences = 6), EventFilter.error(start = "Association", occurrences = 6)) {
system.actorFor("test.akka://nonexistingsystem@localhost:12346/user/echo") ! "ping"
}
}
@@ -183,7 +191,8 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (TCP)" in {
val r = system.actorOf(Props[Echo], "gonk")
- r.path.toString must be === "tcp.akka://remote-sys@localhost:12346/remote/tcp.akka/RemotingSpec@localhost:12345/user/gonk"
+ r.path.toString must be ===
+ s"tcp.akka://remote-sys@localhost:${port(other, "tcp")}/remote/tcp.akka/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@@ -198,9 +207,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (UDP)" in {
val r = system.actorOf(Props[Echo], "zagzag")
- r.path.toString must be === "udp.akka://remote-sys@localhost:12346/remote/udp.akka/RemotingSpec@localhost:12345/user/zagzag"
+ r.path.toString must be ===
+ s"udp.akka://remote-sys@localhost:${port(other, "udp")}/remote/udp.akka/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
r ! 42
- expectMsg(10 seconds, 42)
+ expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
r ! new Exception("crash")
}(other)
@@ -213,9 +223,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (SSL)" in {
val r = system.actorOf(Props[Echo], "roghtaar")
- r.path.toString must be === "tcp.ssl.akka://remote-sys@localhost:23457/remote/tcp.ssl.akka/RemotingSpec@localhost:23456/user/roghtaar"
+ r.path.toString must be ===
+ s"tcp.ssl.akka://remote-sys@localhost:${port(other, "tcp.ssl")}/remote/tcp.ssl.akka/RemotingSpec@localhost:${port(system, "tcp.ssl")}/user/roghtaar"
r ! 42
- expectMsg(10 seconds, 42)
+ expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
r ! new Exception("crash")
}(other)
@@ -228,4 +239,13 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
}
+ override def beforeTermination() {
+ system.eventStream.publish(TestEvent.Mute(
+ EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
+ other.eventStream.publish(TestEvent.Mute(
+ EventFilter[EndpointException](),
+ EventFilter.error(start = "AssociationError"),
+ EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
+ }
+
}
View
2  akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala
@@ -125,7 +125,7 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten
"remote-sys",
ConfigFactory.parseString("akka.remote.netty.port=" + cipherConfig.remotePort).withFallback(system.settings.config))
- override def atTermination() {
+ override def afterTermination() {
if (cipherConfig.runTest) {
other.shutdown()
other.awaitTermination()
View
2  akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala
@@ -37,7 +37,7 @@ akka.loglevel = DEBUG
val target1 = other.actorFor(RootActorPath(addr) / "remote")
val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements)
- override def atTermination() {
+ override def afterTermination() {
other.shutdown()
}
View
14 akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala
@@ -202,7 +202,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec,
failureDetector)))
- Await.result(statusPromise.future, 3 seconds) match {
+ Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
@@ -241,7 +241,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
// finish connection by sending back a payload
reader ! testPayload
- Await.result(statusPromise.future, 3 seconds) match {
+ Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
@@ -315,7 +315,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec,
failureDetector)))
- Await.result(statusPromise.future, 3 seconds) match {
+ Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
@@ -341,7 +341,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec,
failureDetector)))
- val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
+ val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
@@ -379,7 +379,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
// Finish association with a heartbeat -- pushes state out of WaitActivity
reader ! testHeartbeat
- val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
+ val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
@@ -412,7 +412,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec,
failureDetector)))
- val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
+ val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
@@ -448,7 +448,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec,
failureDetector)))
- val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
+ val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
View
27 akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala
@@ -5,6 +5,9 @@ import com.typesafe.config.{ Config, ConfigFactory }
import AkkaProtocolStressTest._
import akka.actor._
import scala.concurrent.duration._
+import akka.testkit.TestEvent
+import akka.testkit.EventFilter
+import akka.remote.EndpointException
object AkkaProtocolStressTest {
val configA: Config = ConfigFactory parseString ("""
@@ -26,7 +29,7 @@ object AkkaProtocolStressTest {
remoting.transports.tcp {
applied-adapters = ["gremlin"]
- port = 12345
+ port = 0
}
}
@@ -62,29 +65,37 @@ object AkkaProtocolStressTest {
class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
- val configB = ConfigFactory.parseString("akka.remoting.transports.tcp.port = 12346")
- .withFallback(system.settings.config).resolve()
-
- val systemB = ActorSystem("systemB", configB)
+ val systemB = ActorSystem("systemB", system.settings.config)
val remote = systemB.actorOf(Props(new Actor {
def receive = {
case seq: Int sender ! seq
}
}), "echo")
- val here = system.actorFor("tcp.gremlin.akka://systemB@localhost:12346/user/echo")
+ val rootB = RootActorPath(systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)
+ val here = system.actorFor(rootB / "user" / "echo")
"AkkaProtocolTransport" must {
"guarantee at-most-once delivery and message ordering despite packet loss" taggedAs TimingTest in {
val tester = system.actorOf(Props(new SequenceVerifier(here, self))) ! "start"
- expectMsgPF(30 seconds) {
+ expectMsgPF(30.seconds) {
case (received: Int, lost: Int)
log.debug(s" ######## Received ${received - lost} messages from ${received} ########")
}
}
}
- override def atTermination(): Unit = systemB.shutdown()
+ override def beforeTermination() {
+ system.eventStream.publish(TestEvent.Mute(
+ EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"),
+ EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
+ systemB.eventStream.publish(TestEvent.Mute(
+ EventFilter[EndpointException](),
+ EventFilter.error(start = "AssociationError"),
+ EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
+ }
+
+ override def afterTermination(): Unit = systemB.shutdown()
}
View
28 akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala
@@ -8,6 +8,9 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle }
import akka.remote.RemoteActorRefProvider
+import akka.testkit.TestEvent
+import akka.testkit.EventFilter
+import akka.remote.EndpointException
object ThrottlerTransportAdapterSpec {
val configA: Config = ConfigFactory parseString ("""
@@ -19,7 +22,7 @@ object ThrottlerTransportAdapterSpec {
remoting.log-remote-lifecycle-events = on
remoting.transports.tcp.applied-adapters = ["trttl"]
- remoting.transports.tcp.port = 12345
+ remoting.transports.tcp.port = 0
}
""")
@@ -56,28 +59,37 @@ object ThrottlerTransportAdapterSpec {
}
class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
- val configB = ConfigFactory.parseString("akka.remoting.transports.tcp.port = 12346")
- .withFallback(system.settings.config).resolve()
- val systemB = ActorSystem("systemB", configB)
+ val systemB = ActorSystem("systemB", system.settings.config)
val remote = systemB.actorOf(Props[Echo], "echo")
- val here = system.actorFor("tcp.trttl.akka://systemB@localhost:12346/user/echo")
+ val rootB = RootActorPath(systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)
+ val here = system.actorFor(rootB / "user" / "echo")
"ThrottlerTransportAdapter" must {
"maintain average message rate" taggedAs TimingTest in {
Await.result(
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
- .managementCommand(SetThrottle(Address("akka", "systemB", "localhost", 12346), Direction.Send, TokenBucket(200, 500, 0, 0))), 3 seconds)
+ .managementCommand(SetThrottle(Address("akka", "systemB", "localhost", rootB.address.port.get), Direction.Send, TokenBucket(200, 500, 0, 0))), 3.seconds)
val tester = system.actorOf(Props(new ThrottlingTester(here, self))) ! "start"
- expectMsgPF((TotalTime + 3) seconds) {
+ expectMsgPF((TotalTime + 3).seconds) {
case time: Long log.warning("Total time of transmission: " + NANOSECONDS.toSeconds(time))
}
}
}
- override def atTermination(): Unit = systemB.shutdown()
+ override def beforeTermination() {
+ system.eventStream.publish(TestEvent.Mute(
+ EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"),
+ EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
+ systemB.eventStream.publish(TestEvent.Mute(
+ EventFilter[EndpointException](),
+ EventFilter.error(start = "AssociationError"),
+ EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
+ }
+
+ override def afterTermination(): Unit = systemB.shutdown()
}
class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) {
View
7 akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
@@ -70,18 +70,21 @@ abstract class AkkaSpec(_system: ActorSystem)
}
final override def afterAll {
+ beforeTermination()
system.shutdown()
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException
system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
println(system.asInstanceOf[ActorSystemImpl].printTree)
}
- atTermination()
+ afterTermination()
}
protected def atStartup() {}
- protected def atTermination() {}
+ protected def beforeTermination() {}
+
+ protected def afterTermination() {}
def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: Unit): Unit =
Future(body)(system.dispatchers.lookup(dispatcherId))
Please sign in to comment.
Something went wrong with that request. Please try again.