diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java new file mode 100644 index 00000000000..84497e7ffe2 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextPipeToSelfTest.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.Behavior; +import akka.actor.typed.Props; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.*; + +public final class ActorContextPipeToSelfTest extends JUnitSuite { + + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource(ConfigFactory.parseString( + "pipe-to-self-spec-dispatcher.executor = thread-pool-executor\n" + + "pipe-to-self-spec-dispatcher.type = PinnedDispatcher\n" + )); + + static final class Msg { + final String response; + final String selfName; + final String threadName; + + Msg(final String response, final String selfName, final String threadName) { + this.response = response; + this.selfName = selfName; + this.threadName = threadName; + } + } + + @Test public void handlesSuccess() { + assertEquals("ok: hi", responseFrom(CompletableFuture.completedFuture("hi"))); + } + + @Test public void handlesFailure() { + assertEquals("ko: boom", responseFrom(failedFuture(new RuntimeException("boom")))); + } + + private CompletableFuture failedFuture(final Throwable ex) { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(ex); + return future; + } + + private String responseFrom(final CompletionStage future) { + final TestProbe probe = testKit.createTestProbe(); + final Behavior behavior = Behaviors.setup(context -> { + context.pipeToSelf(future, (string, exception) -> { + final String response; + if (string != null) response = String.format("ok: %s", string); + else if (exception != null) response = String.format("ko: %s", exception.getMessage()); + else response = "???"; + return new Msg(response, context.getSelf().path().name(), Thread.currentThread().getName()); + }); + return Behaviors.receiveMessage(msg -> { + probe.getRef().tell(msg); + return Behaviors.stopped(); + }); + }); + final String name = "pipe-to-self-spec"; + final Props props = Props.empty().withDispatcherFromConfig("pipe-to-self-spec-dispatcher"); + + testKit.spawn(behavior, name, props); + + final Msg msg = probe.expectMessageClass(Msg.class); + + assertEquals("pipe-to-self-spec", msg.selfName); + assertThat(msg.threadName, startsWith("ActorContextPipeToSelfTest-pipe-to-self-spec-dispatcher")); + return msg.response; + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala new file mode 100644 index 00000000000..576c8103367 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextPipeToSelfSpec.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.scaladsl + +import scala.concurrent.Future +import scala.util.control.NoStackTrace +import scala.util.{ Failure, Success } + +import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.Props +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object ActorContextPipeToSelfSpec { + val config = ConfigFactory.parseString( + """ + |pipe-to-self-spec-dispatcher { + | executor = thread-pool-executor + | type = PinnedDispatcher + |} + """.stripMargin) +} + +final class ActorContextPipeToSelfSpec extends ScalaTestWithActorTestKit(ActorContextPipeToSelfSpec.config) + with WordSpecLike { + + "The Scala DSL ActorContext pipeToSelf" must { + "handle success" in { responseFrom(Future.successful("hi")) should ===("ok: hi") } + "handle failure" in { responseFrom(Future.failed(Fail)) should ===(s"ko: $Fail") } + } + + object Fail extends NoStackTrace + + private def responseFrom(future: Future[String]) = { + final case class Msg(response: String, selfName: String, threadName: String) + + val probe = TestProbe[Msg]() + val behavior = Behaviors.setup[Msg] { context ⇒ + context.pipeToSelf(future) { + case Success(s) ⇒ Msg(s"ok: $s", context.self.path.name, Thread.currentThread().getName) + case Failure(e) ⇒ Msg(s"ko: $e", context.self.path.name, Thread.currentThread().getName) + } + Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.stopped + } + } + val name = "pipe-to-self-spec" + val props = Props.empty.withDispatcherFromConfig("pipe-to-self-spec-dispatcher") + + spawn(behavior, name, props) + + val msg = probe.expectMessageType[Msg] + + msg.selfName should ===("pipe-to-self-spec") + msg.threadName should startWith("ActorContextPipeToSelfSpec-pipe-to-self-spec-dispatcher") + msg.response + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index cb41376025c..132eddff8b4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -9,13 +9,12 @@ import java.time.Duration import java.util.function.{ Function ⇒ JFunction } import java.util.ArrayList import java.util.Optional -import java.util.function +import java.util.concurrent.CompletionStage +import java.util.function.BiConsumer import java.util.function.BiFunction -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.reflect.ClassTag -import scala.util.Failure -import scala.util.Success import scala.util.Try import akka.annotation.InternalApi import akka.util.OptionVal @@ -84,17 +83,31 @@ import akka.util.JavaDurationConverters._ // Scala API impl override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { import akka.actor.typed.scaladsl.AskPattern._ - (target ? createRequest)(responseTimeout, system.scheduler).onComplete(res ⇒ - self.asInstanceOf[ActorRef[AnyRef]] ! AdaptMessage(res, mapResponse) - ) + pipeToSelf((target ? createRequest)(responseTimeout, system.scheduler))(mapResponse) } // Java API impl - def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = { - this.ask(target)(createRequest.apply) { - case Success(message) ⇒ applyToResponse.apply(message, null) - case Failure(ex) ⇒ applyToResponse.apply(null.asInstanceOf[Res], ex) - }(responseTimeout.asScala, ClassTag[Res](resClass)) + def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: JFunction[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = { + import akka.actor.typed.javadsl.AskPattern + val message = new akka.japi.function.Function[ActorRef[Res], Req] { + def apply(ref: ActorRef[Res]): Req = createRequest(ref) + } + pipeToSelf(AskPattern.ask(target, message, responseTimeout, system.scheduler), applyToResponse) + } + + // Scala API impl + def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] ⇒ T): Unit = { + future.onComplete(value ⇒ self.unsafeUpcast ! AdaptMessage(value, mapResult)) + } + + // Java API impl + def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit = { + future.whenComplete(new BiConsumer[Value, Throwable] { + def accept(value: Value, ex: Throwable): Unit = { + if (value != null) self ! applyToResult.apply(value, null) + if (ex != null) self ! applyToResult.apply(null.asInstanceOf[Value], ex) + } + }) } private[akka] override def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala index 5a0eff2f787..3ddbbf33529 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala @@ -19,7 +19,7 @@ import akka.annotation.InternalApi /** * INTERNAL API: Wrapping of messages that should be adapted by the included - * function. Used by `ActorContext.spawnMessageAdapter` so that the function is + * function. Used by `ActorContext.spawnMessageAdapter` and `ActorContext.ask` so that the function is * applied in the "parent" actor (for better thread safety).. */ @InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U ⇒ T) extends InternalMessage { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index cb30a665fa5..4dafa294d79 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -11,6 +11,7 @@ import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange import akka.actor.typed._ import java.util.Optional +import java.util.concurrent.CompletionStage import scala.concurrent.ExecutionContextExecutor @@ -278,4 +279,13 @@ trait ActorContext[T] extends TypedActorContext[T] { createRequest: java.util.function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit + /** + * Sends the result of the given `CompletionStage` to this Actor (“`self`”), after adapted it with + * the given function. + * + * This method is thread-safe and can be called from other threads than the ordinary + * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. + */ + def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala index c34fcb35fbf..4173de8ad24 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala @@ -30,6 +30,6 @@ import scala.compat.java8.FutureConverters._ * */ object AskPattern { - def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] = + def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] = (actor.?(message.apply)(timeout.asScala, scheduler)).toJava } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 09c3d418a1f..ca500aff710 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -8,7 +8,7 @@ import akka.actor.typed._ import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.util.Timeout -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import scala.util.Try @@ -275,4 +275,13 @@ trait ActorContext[T] extends TypedActorContext[T] { this: akka.actor.typed.java */ def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + /** + * Sends the result of the given `Future` to this Actor (“`self`”), after adapted it with + * the given function. + * + * This method is thread-safe and can be called from other threads than the ordinary + * actor message processing thread, such as [[scala.concurrent.Future]] callbacks. + */ + def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] ⇒ T): Unit + }