Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RestartWithBackOff delay cancel to wait for failure #24795

Merged
merged 3 commits into from Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -6,10 +6,11 @@ package akka.stream.scaladsl

import java.util.concurrent.atomic.AtomicInteger

import akka.stream.testkit.StreamSpec
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy }
import akka.testkit.{ DefaultTimeout, TestDuration }
import akka.{ Done, NotUsed }

Expand Down Expand Up @@ -464,10 +465,12 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1

def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1, onlyOnFailures: Boolean = false) = {
val created = new AtomicInteger()
val (flowInSource, flowInProbe) = TestSource.probe[String]

val (flowInSource: TestPublisher.Probe[String], flowInProbe: TestSubscriber.Probe[String]) = TestSource.probe[String]
.buffer(4, OverflowStrategy.backpressure)
.toMat(TestSink.probe)(Keep.both).run()
val (flowOutProbe, flowOutSource) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run()

val (flowOutProbe: TestPublisher.Probe[String], flowOutSource: Source[String, NotUsed]) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run()

// We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we
// simply use the probes as a message bus for feeding and capturing events.
Expand All @@ -476,6 +479,10 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
Flow.fromSinkAndSource(
Flow[String]
.takeWhile(_ != "cancel")
.map {
case "in error" ⇒ throw TE("in error")
case other ⇒ other
}
.to(Sink.foreach(flowInSource.sendNext)
.mapMaterializedValue(_.onComplete {
case Success(_) ⇒ flowInSource.sendNext("in complete")
Expand Down Expand Up @@ -736,5 +743,23 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
created.get() should ===(2)
}

"onFailuresWithBackoff, wrapped flow exception should restart configured times" in {
val flowCreations = new AtomicInteger(0)
val failsSomeTimes = Flow[Int].map { i ⇒
if (i % 3 == 0) throw TE("fail") else i
}

val restartOnFailures =
RestartFlow.onFailuresWithBackoff(1.second, 2.seconds, 0.2, 2)(() ⇒ {
flowCreations.incrementAndGet()
failsSomeTimes
}).addAttributes(Attributes(Delay(100.millis)))

val elements = Source(1 to 7)
.via(restartOnFailures)
.runWith(Sink.seq).futureValue
elements shouldEqual List(1, 2, 4, 5, 7)
flowCreations.get() shouldEqual 3
}
}
}
98 changes: 88 additions & 10 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala
Expand Up @@ -5,11 +5,15 @@
package akka.stream.scaladsl

import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.pattern.BackoffSupervisor
import akka.stream.Attributes.Attribute
import akka.stream._
import akka.stream.stage.{ GraphStage, InHandler, OutHandler, TimerGraphStageLogicWithLogging }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.stage._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

/**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
Expand Down Expand Up @@ -363,17 +367,25 @@ private final class RestartWithBackoffFlow[In, Out](
val out = Outlet[Out]("RestartWithBackoffFlow.out")

override def shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration

var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None

override protected def logSource = self.getClass

override protected def startGraph() = {
val sourceOut = createSubOutlet(in)
val sinkIn = createSubInlet(out)
Source.fromGraph(sourceOut.source).via(flowFactory()).runWith(sinkIn.sink)(subFusingMaterializer)
val sourceOut: SubSourceOutlet[In] = createSubOutlet(in)
val sinkIn: SubSinkInlet[Out] = createSubInlet(out)

Source.fromGraph(sourceOut.source)
// Temp fix while waiting cause of cancellation. See #23909
.via(RestartWithBackoffFlow.delayCancellation[In](delay))
.via(flowFactory())
.runWith(sinkIn.sink)(subFusingMaterializer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, the attributes will not be passed on from the RestartWithBackoffLogic stage, unless there is something I don't know about the subFusingMaterializer.

I think you'd expect that restart(inner flow).withAttributes(Greatness(max)) would provide that attribute as a default for the inner flow when materialized as well. Could be that is missing currently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes i'd expect that, you can put them on the inner flow but not on the restart bit for the inner flow


if (isAvailable(out)) {
sinkIn.pull()
}
Expand Down Expand Up @@ -419,26 +431,35 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) {
var restartCount = 0
var resetDeadline = minBackoff.fromNow

// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
// don't want to restart the sub inlet when it finishes, we just finish normally.
var finishing = false

protected def startGraph(): Unit
protected def backoff(): Unit

/**
* @param out The permanent outlet
* @return A sub sink inlet that's sink is attached to the wrapped stage
*/
protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = {
val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn")

sinkIn.setHandler(new InHandler {
override def onPush() = push(out, sinkIn.grab())

override def onUpstreamFinish() = {
if (finishing || maxRestartsReached() || onlyOnFailures) {
complete(out)
} else {
log.debug("Restarting graph due to finished upstream")
scheduleRestartTimer()
}
}

/*
* Upstream in this context is the wrapped stage.
*/
override def onUpstreamFailure(ex: Throwable) = {
if (finishing || maxRestartsReached()) {
fail(out, ex)
Expand All @@ -456,10 +477,13 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sinkIn.cancel()
}
})

sinkIn
}

/**
* @param in The permanent inlet for this stage
* @return Temporary SubSourceOutlet for this "restart"
*/
protected final def createSubOutlet[T](in: Inlet[T]): SubSourceOutlet[T] = {
val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut")

Expand All @@ -471,11 +495,17 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
pull(in)
}
}

/*
* Downstream in this context is the wrapped stage.
*
* Can either be a failure or a cancel in the wrapped state.
* onlyOnFailures is thus racy so a delay to cancellation is added in the case of a flow.
*/
override def onDownstreamFinish() = {
if (finishing || maxRestartsReached() || onlyOnFailures) {
cancel(in)
} else {
log.debug("Graph in finished")
scheduleRestartTimer()
}
}
Expand All @@ -498,7 +528,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sourceOut
}

protected final def maxRestartsReached() = {
protected final def maxRestartsReached(): Boolean = {
// Check if the last start attempt was more than the minimum backoff
if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)
Expand All @@ -508,7 +538,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
}

// Set a timer to restart after the calculated delay
protected final def scheduleRestartTimer() = {
protected final def scheduleRestartTimer(): Unit = {
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
log.debug("Restarting graph in {}", restartDelay)
scheduleOnce("RestartTimer", restartDelay)
Expand All @@ -526,3 +556,51 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
// When the stage starts, start the source
override def preStart() = startGraph()
}

object RestartWithBackoffFlow {

/**
* Temporary attribute that can override the time a [[RestartWithBackoffFlow]] waits
* for a failure before cancelling.
*
* See https://github.com/akka/akka/issues/24529
*
* Will be removed if/when cancellation can include a cause.
*/
@ApiMayChange
case class Delay(duration: FiniteDuration) extends Attribute

/**
* Returns a flow that is almost identity but delays propagation of cancellation from downstream to upstream.
*
* Once the down stream is finish calls to onPush are ignored.
*/
private def delayCancellation[T](duration: FiniteDuration): Flow[T, T, NotUsed] =
Flow.fromGraph(new DelayCancellationStage(duration))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelayCancellationStage should be private

private final class DelayCancellationStage[T](delay: FiniteDuration) extends SimpleLinearGraphStage[T] {

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {

setHandlers(in, out, this)

def onPush(): Unit = push(out, grab(in))
def onPull(): Unit = pull(in)

override def onDownstreamFinish(): Unit = {
scheduleOnce("CompleteState", delay)
setHandler(
in,
new InHandler {
def onPush(): Unit = {}
}
)
}

override protected def onTimer(timerKey: Any): Unit = {
log.debug(s"Stage was canceled after delay of $delay")
completeStage()
}
}
}
}