Skip to content

Commit

Permalink
=str Avoid subMaterialization when the provided recover source is empty.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Oct 21, 2022
1 parent 2e2f0dc commit 72576b1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Expand Up @@ -39,6 +39,19 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}

"recover with empty source" in {
Source(1 to 4)
.map { a =>
if (a == 3) throw ex else a
}
.recoverWith { case _: Throwable => Source.empty }
.runWith(TestSink[Int]())
.request(2)
.expectNextN(1 to 2)
.request(1)
.expectComplete()
}

"cancel substream if parent is terminated when there is a handler" in {
Source(1 to 4)
.map { a =>
Expand Down
18 changes: 14 additions & 4 deletions akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
Expand Up @@ -29,6 +29,7 @@ import akka.stream.OverflowStrategies._
import akka.stream.Supervision.Decider
import akka.stream.impl.{ ContextPropagation, ReactiveStreamsCompliance, Buffer => BufferImpl }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.TraversalBuilder
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.{ DelayStrategy, Source }
import akka.stream.stage._
Expand Down Expand Up @@ -2147,12 +2148,21 @@ private[akka] object TakeWithin {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFailure(ex: Throwable): Unit = onFailure(ex)
override def onPull(): Unit = pull(in)
def onFailure(ex: Throwable): Unit =
if ((maximumRetries < 0 || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
switchTo(pf(ex))
attempt += 1
def onFailure(ex: Throwable): Unit = {
import Collect.NotApplied
if (maximumRetries < 0 || attempt < maximumRetries) {
pf.applyOrElse(ex, NotApplied) match {
case NotApplied => failStage(ex)
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) =>
completeStage()
case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
switchTo(other)
attempt += 1
case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser
}
} else
failStage(ex)
}

def switchTo(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
Expand Down

0 comments on commit 72576b1

Please sign in to comment.