Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parasitic context by default in Classic pipe pattern #31917

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,22 @@ import akka.testkit.TestProbe

class PipeToSpec extends AkkaSpec {

import system.dispatcher
// Do not try this at home, kids. In real code, this should be a `Future.successful`
def future42(): Future[Int] = Future(42)(system.dispatcher)

"PipeTo" must {

"work" in {
val p = TestProbe()
Future(42).pipeTo(p.ref)
future42().pipeTo(p.ref)
p.expectMsg(42)
}

"work with an implicit ExecutionContext" in {
import system.dispatcher // installs an EC in implicit scope

val p = TestProbe()
future42().pipeTo(p.ref)
p.expectMsg(42)
}

Expand All @@ -31,20 +40,20 @@ class PipeToSpec extends AkkaSpec {
"pick up an implicit sender()" in {
val p = TestProbe()
implicit val s = testActor
Future(42).pipeTo(p.ref)
future42().pipeTo(p.ref)
p.expectMsg(42)
p.lastSender should ===(s)
}

"work in Java form" in {
val p = TestProbe()
pipe(Future(42)) to p.ref
pipe(future42()) to p.ref
p.expectMsg(42)
}

"work in Java form with sender()" in {
val p = TestProbe()
pipe(Future(42)).to(p.ref, testActor)
pipe(future42()).to(p.ref, testActor)
p.expectMsg(42)
p.lastSender should ===(testActor)
}
Expand All @@ -56,7 +65,16 @@ class PipeToSpec extends AkkaSpec {
"work" in {
val p = TestProbe()
val sel = system.actorSelection(p.ref.path)
Future(42).pipeToSelection(sel)
future42().pipeToSelection(sel)
p.expectMsg(42)
}

"work with an implicit ExecutionContext" in {
import system.dispatcher

val p = TestProbe()
val sel = system.actorSelection(p.ref.path)
future42().pipeToSelection(sel)
p.expectMsg(42)
}

Expand All @@ -71,22 +89,22 @@ class PipeToSpec extends AkkaSpec {
val p = TestProbe()
val sel = system.actorSelection(p.ref.path)
implicit val s = testActor
Future(42).pipeToSelection(sel)
future42().pipeToSelection(sel)
p.expectMsg(42)
p.lastSender should ===(s)
}

"work in Java form" in {
val p = TestProbe()
val sel = system.actorSelection(p.ref.path)
pipe(Future(42)) to sel
pipe(future42()) to sel
p.expectMsg(42)
}

"work in Java form with sender()" in {
val p = TestProbe()
val sel = system.actorSelection(p.ref.path)
pipe(Future(42)).to(sel, testActor)
pipe(future42()).to(sel, testActor)
p.expectMsg(42)
p.lastSender should ===(testActor)
}
Expand Down
19 changes: 14 additions & 5 deletions akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import language.implicitConversions

import akka.actor.{ Actor, ActorRef, Status }
import akka.actor.ActorSelection
import akka.dispatch.ExecutionContexts
import akka.util.unused

trait PipeToSupport {
Expand Down Expand Up @@ -78,7 +79,6 @@ trait PipeToSupport {
*
* {{{
* import akka.pattern.pipe
* // requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor
*
* Future { doExpensiveCalc() } pipeTo nextActor
*
Expand All @@ -90,16 +90,20 @@ trait PipeToSupport {
*
* The successful result of the future is sent as a message to the recipient, or
* the failure is sent in a [[akka.actor.Status.Failure]] to the recipient.
*
* By default this uses a [[scala.concurrent.ExecutionContext]] which sends the message on the
* calling thread if the future has already completed, or on the thread which completes the future
* if the future has not yet completed.
*/
implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T] =
implicit def pipe[T](future: Future[T])(
implicit executionContext: ExecutionContext = ExecutionContexts.parasitic): PipeableFuture[T] =
new PipeableFuture(future)

/**
* Import this implicit conversion to gain the `pipeTo` method on [[scala.concurrent.Future]]:
* Import this implicit conversion to gain the `pipeTo` method on [[java.util.concurrent.CompletionStage]]:
*
* {{{
* import akka.pattern.pipe
* // requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor
*
* Future { doExpensiveCalc() } pipeTo nextActor
*
Expand All @@ -111,7 +115,12 @@ trait PipeToSupport {
*
* The successful result of the future is sent as a message to the recipient, or
* the failure is sent in a [[akka.actor.Status.Failure]] to the recipient.
*
* Regardless of the passed [[scala.concurrent.ExecutionContext]], the message will be
* sent from the calling thread if the future has already completed, or on the thread which
* completes the future if the future has not yet completed.
*/
implicit def pipeCompletionStage[T](future: CompletionStage[T])(
implicit executionContext: ExecutionContext): PipeableCompletionStage[T] = new PipeableCompletionStage(future)
implicit executionContext: ExecutionContext = ExecutionContexts.parasitic): PipeableCompletionStage[T] =
new PipeableCompletionStage(future)
}