Skip to content

Commit

Permalink
Add pipeToSelf to typed ActorContext #26199
Browse files Browse the repository at this point in the history
Implemented in terms of AdaptMessage, which makes sure to map the values on the actor's thread, in Scala.
  • Loading branch information
dwijnand authored and johanandren committed Jan 4, 2019
1 parent f8618b2 commit 1c370c1
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 15 deletions.
@@ -0,0 +1,83 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.actor.typed.javadsl;

import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.Behavior;
import akka.actor.typed.Props;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.*;

public final class ActorContextPipeToSelfTest extends JUnitSuite {

@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(ConfigFactory.parseString(
"pipe-to-self-spec-dispatcher.executor = thread-pool-executor\n" +
"pipe-to-self-spec-dispatcher.type = PinnedDispatcher\n"
));

static final class Msg {
final String response;
final String selfName;
final String threadName;

Msg(final String response, final String selfName, final String threadName) {
this.response = response;
this.selfName = selfName;
this.threadName = threadName;
}
}

@Test public void handlesSuccess() {
assertEquals("ok: hi", responseFrom(CompletableFuture.completedFuture("hi")));
}

@Test public void handlesFailure() {
assertEquals("ko: boom", responseFrom(failedFuture(new RuntimeException("boom"))));
}

private CompletableFuture<String> failedFuture(final Throwable ex) {
final CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(ex);
return future;
}

private String responseFrom(final CompletionStage<String> future) {
final TestProbe<Msg> probe = testKit.createTestProbe();
final Behavior<Msg> behavior = Behaviors.setup(context -> {
context.pipeToSelf(future, (string, exception) -> {
final String response;
if (string != null) response = String.format("ok: %s", string);
else if (exception != null) response = String.format("ko: %s", exception.getMessage());
else response = "???";
return new Msg(response, context.getSelf().path().name(), Thread.currentThread().getName());
});
return Behaviors.receiveMessage(msg -> {
probe.getRef().tell(msg);
return Behaviors.stopped();
});
});
final String name = "pipe-to-self-spec";
final Props props = Props.empty().withDispatcherFromConfig("pipe-to-self-spec-dispatcher");

testKit.spawn(behavior, name, props);

final Msg msg = probe.expectMessageClass(Msg.class);

assertEquals("pipe-to-self-spec", msg.selfName);
assertThat(msg.threadName, startsWith("ActorContextPipeToSelfTest-pipe-to-self-spec-dispatcher"));
return msg.response;
}
}
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.actor.typed.scaladsl

import scala.concurrent.Future
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success }

import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.Props
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike

object ActorContextPipeToSelfSpec {
val config = ConfigFactory.parseString(
"""
|pipe-to-self-spec-dispatcher {
| executor = thread-pool-executor
| type = PinnedDispatcher
|}
""".stripMargin)
}

final class ActorContextPipeToSelfSpec extends ScalaTestWithActorTestKit(ActorContextPipeToSelfSpec.config)
with WordSpecLike {

"The Scala DSL ActorContext pipeToSelf" must {
"handle success" in { responseFrom(Future.successful("hi")) should ===("ok: hi") }
"handle failure" in { responseFrom(Future.failed(Fail)) should ===(s"ko: $Fail") }
}

object Fail extends NoStackTrace

private def responseFrom(future: Future[String]) = {
final case class Msg(response: String, selfName: String, threadName: String)

val probe = TestProbe[Msg]()
val behavior = Behaviors.setup[Msg] { context
context.pipeToSelf(future) {
case Success(s) Msg(s"ok: $s", context.self.path.name, Thread.currentThread().getName)
case Failure(e) Msg(s"ko: $e", context.self.path.name, Thread.currentThread().getName)
}
Behaviors.receiveMessage { msg
probe.ref ! msg
Behaviors.stopped
}
}
val name = "pipe-to-self-spec"
val props = Props.empty.withDispatcherFromConfig("pipe-to-self-spec-dispatcher")

spawn(behavior, name, props)

val msg = probe.expectMessageType[Msg]

msg.selfName should ===("pipe-to-self-spec")
msg.threadName should startWith("ActorContextPipeToSelfSpec-pipe-to-self-spec-dispatcher")
msg.response
}

}
Expand Up @@ -9,13 +9,12 @@ import java.time.Duration
import java.util.function.{ Function JFunction }
import java.util.ArrayList
import java.util.Optional
import java.util.function
import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer
import java.util.function.BiFunction

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.reflect.ClassTag
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.annotation.InternalApi
import akka.util.OptionVal
Expand Down Expand Up @@ -84,17 +83,31 @@ import akka.util.JavaDurationConverters._
// Scala API impl
override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._
(target ? createRequest)(responseTimeout, system.scheduler).onComplete(res
self.asInstanceOf[ActorRef[AnyRef]] ! AdaptMessage(res, mapResponse)
)
pipeToSelf((target ? createRequest)(responseTimeout, system.scheduler))(mapResponse)
}

// Java API impl
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
this.ask(target)(createRequest.apply) {
case Success(message) applyToResponse.apply(message, null)
case Failure(ex) applyToResponse.apply(null.asInstanceOf[Res], ex)
}(responseTimeout.asScala, ClassTag[Res](resClass))
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: JFunction[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
import akka.actor.typed.javadsl.AskPattern
val message = new akka.japi.function.Function[ActorRef[Res], Req] {
def apply(ref: ActorRef[Res]): Req = createRequest(ref)
}
pipeToSelf(AskPattern.ask(target, message, responseTimeout, system.scheduler), applyToResponse)
}

// Scala API impl
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] T): Unit = {
future.onComplete(value self.unsafeUpcast ! AdaptMessage(value, mapResult))
}

// Java API impl
def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit = {
future.whenComplete(new BiConsumer[Value, Throwable] {
def accept(value: Value, ex: Throwable): Unit = {
if (value != null) self ! applyToResult.apply(value, null)
if (ex != null) self ! applyToResult.apply(null.asInstanceOf[Value], ex)
}
})
}

private[akka] override def spawnMessageAdapter[U](f: U T, name: String): ActorRef[U] =
Expand Down
Expand Up @@ -19,7 +19,7 @@ import akka.annotation.InternalApi

/**
* INTERNAL API: Wrapping of messages that should be adapted by the included
* function. Used by `ActorContext.spawnMessageAdapter` so that the function is
* function. Used by `ActorContext.spawnMessageAdapter` and `ActorContext.ask` so that the function is
* applied in the "parent" actor (for better thread safety)..
*/
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U T) extends InternalMessage {
Expand Down
Expand Up @@ -11,6 +11,7 @@ import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.actor.typed._
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.concurrent.ExecutionContextExecutor

Expand Down Expand Up @@ -278,4 +279,13 @@ trait ActorContext[T] extends TypedActorContext[T] {
createRequest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit

/**
* Sends the result of the given `CompletionStage` to this Actor (“`self`”), after adapted it with
* the given function.
*
* This method is thread-safe and can be called from other threads than the ordinary
* actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/
def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit

}
Expand Up @@ -30,6 +30,6 @@ import scala.compat.java8.FutureConverters._
*
*/
object AskPattern {
def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
(actor.?(message.apply)(timeout.asScala, scheduler)).toJava
}
Expand Up @@ -8,7 +8,7 @@ import akka.actor.typed._
import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.util.Timeout

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.Try
Expand Down Expand Up @@ -275,4 +275,13 @@ trait ActorContext[T] extends TypedActorContext[T] { this: akka.actor.typed.java
*/
def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

/**
* Sends the result of the given `Future` to this Actor (“`self`”), after adapted it with
* the given function.
*
* This method is thread-safe and can be called from other threads than the ordinary
* actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
*/
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] T): Unit

}

0 comments on commit 1c370c1

Please sign in to comment.