Skip to content

Commit

Permalink
MapAsync and already failed futures #24117
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 7, 2017
1 parent 2fd9bb7 commit af8a81f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
Expand Up @@ -80,6 +80,23 @@ class FlowMapAsyncSpec extends StreamSpec {
c.expectNoMsg(200.millis)
}

"signal future already failed" in assertAllStagesStopped {
val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsync(4)(n
if (n == 3) Future.failed[Int](new TE("err1"))
else Future {
Await.ready(latch, 10.seconds)
n
}
).to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError().getMessage should be("err1")
latch.countDown()
}

"signal future failure" in assertAllStagesStopped {
val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]()
Expand Down Expand Up @@ -229,6 +246,22 @@ class FlowMapAsyncSpec extends StreamSpec {
c.expectComplete()
}

"resume after already failed future" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5)
.mapAsync(4)(n
if (n == 3) Future.failed(new TE("err3"))
else Future.successful(n)
)
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
sub.request(10)
for (n List(1, 2, 4, 5)) c.expectNext(n)
c.expectComplete()
}

"resume after multiple failures" in assertAllStagesStopped {
val futures: List[Future[String]] = List(
Future.failed(Utils.TE("failure1")),
Expand Down Expand Up @@ -371,6 +404,25 @@ class FlowMapAsyncSpec extends StreamSpec {
failCount.get() should ===(1)
}

"not invoke the decider twice for the same already failed future" in {
import system.dispatcher
val failCount = new AtomicInteger(0)
val result = Source(List(true, false))
.mapAsync(1)(elem
if (elem) Future.failed(TE("this has gone too far"))
else Future.successful(elem)
).addAttributes(supervisionStrategy {
case TE("this has gone too far")
failCount.incrementAndGet()
Supervision.resume
case _ Supervision.stop
})
.runWith(Sink.seq)

result.futureValue should ===(Seq(false))
failCount.get() should ===(1)
}

"not invoke the decider twice for the same failure to produce a future" in {
import system.dispatcher
val failCount = new AtomicInteger(0)
Expand Down
13 changes: 10 additions & 3 deletions akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
Expand Up @@ -1172,7 +1172,7 @@ private[stream] object Collect {
private val futureCB = getAsyncCallback[Holder[Out]](holder
holder.elem match {
case Success(_) pushNextIfPossible()
case Failure(NonFatal(ex))
case Failure(ex)
holder.supervisionDirectiveFor(decider, ex) match {
// fail fast as if supervision says so
case Supervision.Stop failStage(ex)
Expand All @@ -1195,9 +1195,14 @@ private[stream] object Collect {
future.value match {
case None future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(v)
// #20217 the future is already here, avoid scheduling it on the dispatcher
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread
holder.setElem(v)
pushNextIfPossible()
v match {
// this optimization also requires us to stop the stage to fail fast if the decider says so:
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop failStage(ex)
case _ pushNextIfPossible()
}
}

} catch {
Expand Down Expand Up @@ -1225,6 +1230,8 @@ private[stream] object Collect {

case Failure(NonFatal(ex))
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
// onComplete callback has run
case Supervision.Stop failStage(ex)
case _
// try next element
Expand Down

0 comments on commit af8a81f

Please sign in to comment.