Permalink
Browse files

Merge pull request #22094 from akka/wip-22093-ClusterClient-ask-patriknw

stop ClusterClient ResponseTunnel after first reply when ask is used, #22093
  • Loading branch information...
2 parents 1803618 + b213a7a commit 1eba8656cd0cce7c8e3a821a14b819a5e7be4731 @patriknw patriknw committed on GitHub Jan 10, 2017
@@ -812,12 +812,21 @@ object ClusterReceptionist {
*/
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
context.setReceiveTimeout(timeout)
+
+ private val isAsk = {
+ val pathElements = client.path.elements
+ pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$")
+ }
+
def receive = {
case Ping // keep alive from client
case ReceiveTimeout
log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
context stop self
- case msg client.tell(msg, Actor.noSender)
+ case msg
+ client.tell(msg, Actor.noSender)
+ if (isAsk)
+ context stop self
}
}
}
@@ -213,6 +213,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-2")
}
+ "work with ask" in within(10 seconds) {
+ runOn(client) {
+ import akka.pattern.ask
+ import system.dispatcher
+ val c = system.actorOf(ClusterClient.props(
+ ClusterClientSettings(system).withInitialContacts(initialContacts)), "ask-client")
+ implicit val timeout = Timeout(remaining)
+ val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true)
+ Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack")
+ system.stop(c)
+ }
+ runOn(fourth) {
+ expectMsg("hello-request")
+ }
+
+ enterBarrier("after-3")
+ }
+
"demonstrate usage" in within(15 seconds) {
def host1 = first
def host2 = second
@@ -261,7 +279,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
// strange, barriers fail without this sleep
Thread.sleep(1000)
- enterBarrier("after-3")
+ enterBarrier("after-4")
}
"report events" in within(15 seconds) {
@@ -305,7 +323,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
}
}
- enterBarrier("after-6")
+ enterBarrier("after-5")
}
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
@@ -356,7 +374,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
}
}
}
- enterBarrier("after-4")
+ enterBarrier("after-6")
}
"re-establish connection to receptionist after partition" in within(30 seconds) {
@@ -397,7 +415,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.stop(c)
}
- enterBarrier("after-5")
+ enterBarrier("after-7")
}
"re-establish connection to receptionist after server restart" in within(30 seconds) {
@@ -436,8 +454,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.name,
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
- else s"akka.remote.netty.tcp.port=$port"
- ).withFallback(system.settings.config))
+ else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config))
Cluster(sys2).join(Cluster(sys2).selfAddress)
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(sys2).registerService(service2)

0 comments on commit 1eba865

Please sign in to comment.