Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.5 backport] stream: add CancellationStrategy attribute to c… #28008

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.scaladsl

import akka.NotUsed
import akka.stream.ActorMaterializer
import akka.stream.Attributes
import akka.stream.Attributes.CancellationStrategy
import akka.stream.Attributes.CancellationStrategy.FailStage
import akka.stream.Attributes.CancellationStrategy.SiblingDownstreamWasCancelled
import akka.stream.BidiShape
import akka.stream.ClosedShape
import akka.stream.Inlet
import akka.stream.Materializer
import akka.stream.Outlet
import akka.stream.SharedKillSwitch
import akka.stream.UniformFanOutShape
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.StageLogging
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.Utils.TE
import akka.testkit.WithLogCapturing
import akka.testkit._

import scala.concurrent.duration._

class CancellationStrategySpec extends StreamSpec("""akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]""") with WithLogCapturing {
implicit val mat = ActorMaterializer()

"CancellationStrategyAttribute" should {
"support strategies" should {
"CompleteStage" should {
"complete if no failure cancellation" in new TestSetup(CancellationStrategy.CompleteStage) {
out1Probe.cancel()
inProbe.expectCancellation()
out2Probe.expectComplete()
}
"complete and propagate cause if failure cancellation" in new TestSetup(CancellationStrategy.CompleteStage) {
out1Probe.cancel()
inProbe.expectCancellation()
out2Probe.expectComplete()
}
}
"FailStage" should {
"fail if no failure cancellation" in new TestSetup(CancellationStrategy.FailStage) {
out1Probe.cancel()
inProbe.expectCancellation()
out2Probe.expectError(SiblingDownstreamWasCancelled)
}
"fail if failure cancellation" in new TestSetup(CancellationStrategy.FailStage) {
out1Probe.cancel()
inProbe.expectCancellation()
out2Probe.expectError(SiblingDownstreamWasCancelled)
}
}
/* PropagateFailure unsupported in 2.5
"PropagateFailure" should {
"complete if no failure" in new TestSetup(CancellationStrategy.PropagateFailure) {
out1Probe.cancel()
inProbe.expectCancellationWithCause(SubscriptionWithCancelException.NoMoreElementsNeeded)
out2Probe.expectComplete()
}
"propagate failure" in new TestSetup(CancellationStrategy.PropagateFailure) {
val theError = TE("This is a TestException")
out1Probe.cancel(theError)
inProbe.expectCancellationWithCause(theError)
out2Probe.expectError(theError)
}
}
*/
"AfterDelay" should {
"apply given strategy after delay" in new TestSetup(CancellationStrategy.AfterDelay(500.millis, FailStage)) {
out1Probe.cancel()
inProbe.expectNoMessage(200.millis)
out2Probe.expectNoMessage(200.millis)

inProbe.expectCancellation()
out2Probe.expectError(SiblingDownstreamWasCancelled)
}
"prevent further elements from coming through" in new TestSetup(
CancellationStrategy.AfterDelay(500.millis, FailStage)) {
out1Probe.request(1)
out2Probe.request(1)
out1Probe.cancel()
inProbe.sendNext(B(123))
inProbe.expectNoMessage(200.millis) // cancellation should not have propagated yet
out2Probe.expectNext(B(123)) // so the element still goes to out2
out1Probe.expectNoMessage(200.millis) // but not to out1 which has already cancelled

// after delay cancellation and error should have propagated
inProbe.expectCancellation()
out2Probe.expectError(SiblingDownstreamWasCancelled)
}
}
}

"cancellation races with BidiStacks" should {
"accidentally convert errors to completions when CompleteStage strategy is chosen (2.5 default)" in new RaceTestSetup(
CancellationStrategy.CompleteStage) {
val theError = TE("Duck meowed")
killSwitch.abort(theError)
toStream.expectCancellation()

// this asserts the previous broken behavior (which can still be seen with CompleteStage strategy)
fromStream.expectComplete()
}
/* PropagateFailure unsupported in 2.5
"be prevented by PropagateFailure strategy (default in 2.6)" in new RaceTestSetup(
CancellationStrategy.PropagateFailure) {
val theError = TE("Duck meowed")
killSwitch.abort(theError)
toStream.expectCancellationWithCause(theError)
fromStream.expectError(theError)
}*/
"be prevented by AfterDelay strategy" in new RaceTestSetup(
CancellationStrategy.AfterDelay(500.millis.dilated, CancellationStrategy.CompleteStage)) {
val theError = TE("Duck meowed")
killSwitch.abort(theError)
toStream.expectCancellation()
fromStream.expectError(theError)
}

class RaceTestSetup(cancellationStrategy: CancellationStrategy.Strategy) {
val toStream = TestPublisher.probe[A]()
val fromStream = TestSubscriber.probe[B]()

val bidi: BidiFlow[A, A, B, B, NotUsed] = BidiFlow.fromGraph(new NaiveBidiStage)

val killSwitch = new SharedKillSwitch("test")
def errorPropagationDelay: FiniteDuration = 200.millis.dilated

Source
.fromPublisher(toStream)
.via(
bidi
.atop(BidiFlow.fromFlows(
new DelayCompletionSignal[A](errorPropagationDelay),
new DelayCompletionSignal[B](errorPropagationDelay)))
.join(Flow[A].via(killSwitch.flow).map(_.toB)))
.to(Sink.fromSubscriber(fromStream))
.addAttributes(Attributes(CancellationStrategy(cancellationStrategy))) // fails for `CompleteStage`
.run()

fromStream.request(1)
toStream.sendNext(A("125"))
fromStream.expectNext(B(125))
}
}
}

case class A(str: String) {
def toB: B = B(str.toInt)
}
case class B(i: Int)

class TestSetup(cancellationStrategy: Option[CancellationStrategy.Strategy]) {
def this(strategy: CancellationStrategy.Strategy) = this(Some(strategy))

val inProbe = TestPublisher.probe[B]()
val out1Probe = TestSubscriber.probe[B]()
val out2Probe = TestSubscriber.probe[B]()

RunnableGraph
.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val fanOut = b.add(new TestFanOut)

Source.fromPublisher(inProbe) ~> fanOut.in
fanOut.out(0) ~> Sink.fromSubscriber(out1Probe)
fanOut.out(1) ~> Sink.fromSubscriber(out2Probe)

ClosedShape
}
}
.addAttributes(Attributes(cancellationStrategy.toList.map(CancellationStrategy(_))))
.run()

// some basic testing that data flow
out1Probe.request(1)
out2Probe.request(1)

inProbe.expectRequest()
inProbe.sendNext(B(42))
out1Probe.expectNext(B(42))
out2Probe.expectNext(B(42))
}

// a simple broadcast stage
class TestFanOut extends GraphStage[UniformFanOutShape[B, B]] {
val in = Inlet[B]("in")
val out1 = Outlet[B]("out1")
val out2 = Outlet[B]("out2")

val shape = UniformFanOutShape(in, out1, out2)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
setHandler(in, this)
setHandler(out1, this)
setHandler(out2, this)

var waitingForPulls = 2
override def onPush(): Unit = {
val el = grab(in)
push(out1, el)
push(out2, el)
waitingForPulls = 2
}

override def onPull(): Unit = {
waitingForPulls -= 1
require(waitingForPulls >= 0)
if (waitingForPulls == 0)
pull(in)
}
}
}
class NaiveBidiStage extends GraphStage[BidiShape[A, A, B, B]] {
val upIn = Inlet[A]("upIn")
val upOut = Outlet[A]("upOut")

val downIn = Inlet[B]("downIn")
val downOut = Outlet[B]("downOut")

val shape = BidiShape(upIn, upOut, downIn, downOut)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
def connect[T](in: Inlet[T], out: Outlet[T]): Unit = {
val handler = new InHandler with OutHandler {
override def onPull(): Unit = pull(in)
override def onPush(): Unit = push(out, grab(in))
}
setHandlers(in, out, handler)
}
connect(upIn, upOut)
connect(downIn, downOut)
}
}

/** A simple stage that delays completion signals */
class DelayCompletionSignal[T](delay: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
setHandlers(in, out, this)

override def onPull(): Unit = pull(in)
override def onPush(): Unit = push(out, grab(in))

val callback = getAsyncCallback[Option[Throwable]] { signal =>
log.debug(s"Now executing delayed action $signal")
signal match {
case Some(ex) => failStage(ex)
case None => completeStage()
}
}

override def onUpstreamFinish(): Unit = {
log.debug(s"delaying completion")
materializer.scheduleOnce(delay, new Runnable { def run(): Unit = callback.invoke(None) })
}
override def onUpstreamFailure(ex: Throwable): Unit = {
log.debug(s"delaying error $ex")
materializer.scheduleOnce(delay, new Runnable { def run(): Unit = callback.invoke(Some(ex)) })
}
}
}
}
100 changes: 100 additions & 0 deletions akka-stream/src/main/scala/akka/stream/Attributes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import scala.reflect.{ classTag, ClassTag }
import akka.japi.function
import java.net.URLEncoder

import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.stream.impl.TraversalBuilder

import scala.compat.java8.OptionConverters._
import akka.util.{ ByteString, OptionVal }

import scala.concurrent.duration.FiniteDuration
import scala.util.control.NoStackTrace

/**
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
Expand Down Expand Up @@ -293,6 +295,104 @@ object Attributes {
extends Attribute
final case object AsyncBoundary extends Attribute

/**
* Cancellation strategies provide a way to configure the behavior of a stage when `cancelStage` is called.
*
* It is only relevant for stream components that have more than one output and do not define a custom cancellation
* behavior by overriding `onDownstreamFinish`. In those cases, if the first output is cancelled, the default behavior
* is to call `cancelStage` which shuts down the stage completely. The given strategy will allow customization of how
* the shutdown procedure should be done precisely.
*/
@ApiMayChange
final case class CancellationStrategy(strategy: CancellationStrategy.Strategy) extends MandatoryAttribute
@ApiMayChange
object CancellationStrategy {
private[stream] val Default: CancellationStrategy = CancellationStrategy(CompleteStage)
@ApiMayChange
object SiblingDownstreamWasCancelled extends RuntimeException("Sibling downstream was cancelled") with NoStackTrace
jrudolph marked this conversation as resolved.
Show resolved Hide resolved

sealed trait Strategy

/**
* Strategy that treats `cancelStage` the same as `completeStage`, i.e. all inlets are cancelled
* and all outlets are regularly completed.
*
* This used to be the default behavior before Akka 2.6.
*
* This behavior can be problematic in stacks of BidiFlows where different layers of the stack are both connected
* through inputs and outputs. In this case, an error in a doubly connected component triggers both a cancellation
* going upstream and an error going downstream. Since the stack might be connected to those components with inlets and
* outlets, a race starts whether the cancellation or the error arrives first. If the error arrives first, that's usually
* good because then the error can be propagated both on inlets and outlets. However, if the cancellation arrives first,
* the previous default behavior to complete the stage will lead other outputs to be completed regularly. The error
* which arrive late at the other hand will just be ignored (that connection will have been cancelled already and also
* the paths through which the error could propagates are already shut down).
*/
@ApiMayChange
case object CompleteStage extends Strategy

/**
* Strategy that treats `cancelStage` the same as `failStage`, i.e. all inlets are cancelled (propagating the
* cancellation cause) and all outlets are failed with an SiblingDownstreamWasCancelled exception.
*/
@ApiMayChange
case object FailStage extends Strategy

/**
* Strategy that allows to delay any action when `cancelStage` is invoked.
*
* The idea of this strategy is to delay any action on cancellation because it is expected that the stage is completed
* through another path in the meantime. The downside is that a stage and a stream may live longer than expected if no
* such signal is received and cancellation is invoked later on. In streams with many stages that all apply this strategy,
* this strategy might significantly delay the propagation of a cancellation signal because each upstream stage might impose
* such a delay. During this time, the stream will be mostly "silent", i.e. it cannot make progress because of backpressure,
* but you might still be able observe a long delay at the ultimate source.
*/
@ApiMayChange
final case class AfterDelay(delay: FiniteDuration, strategy: Strategy) extends Strategy
}

/**
* Strategy that treats `cancelStage` the same as `completeStage`, i.e. all inlets are cancelled
* and all outlets are regularly completed.
*
* This used to be the default behavior before Akka 2.6.
*
* This behavior can be problematic in stacks of BidiFlows where different layers of the stack are both connected
* through inputs and outputs. In this case, an error in a doubly connected component triggers both a cancellation
* going upstream and an error going downstream. Since the stack might be connected to those components with inlets and
* outlets, a race starts whether the cancellation or the error arrives first. If the error arrives first, that's usually
* good because then the error can be propagated both on inlets and outlets. However, if the cancellation arrives first,
* the previous default behavior to complete the stage will lead other outputs to be completed regularly. The error
* which arrive late at the other hand will just be ignored (that connection will have been cancelled already and also
* the paths through which the error could propagates are already shut down).
*/
@ApiMayChange
def cancellationStrategyCompleteState: CancellationStrategy.Strategy = CancellationStrategy.CompleteStage

/**
* Strategy that treats `cancelStage` the same as `failStage`, i.e. all inlets are cancelled (propagating the
* cancellation cause) and all outlets are failed with an SiblingDownstreamWasCancelled exception.
*/
@ApiMayChange
def cancellationStrategyFailStage: CancellationStrategy.Strategy = CancellationStrategy.FailStage

/**
* Strategy that allows to delay any action when `cancelStage` is invoked.
*
* The idea of this strategy is to delay any action on cancellation because it is expected that the stage is completed
* through another path in the meantime. The downside is that a stage and a stream may live longer than expected if no
* such signal is received and cancellation is invoked later on. In streams with many stages that all apply this strategy,
* this strategy might significantly delay the propagation of a cancellation signal because each upstream stage might impose
* such a delay. During this time, the stream will be mostly "silent", i.e. it cannot make progress because of backpressure,
* but you might still be able observe a long delay at the ultimate source.
*/
@ApiMayChange
def cancellationStrategyAfterDelay(
delay: FiniteDuration,
strategy: CancellationStrategy.Strategy): CancellationStrategy.Strategy =
CancellationStrategy.AfterDelay(delay, strategy)

object LogLevels {

/** Use to disable logging on certain operations when configuring [[Attributes#logLevels]] */
Expand Down
Loading