Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behaviour of RestartFlow.onFailuresWithBackoff #24726

Closed
julianhowarth opened this issue Mar 14, 2018 · 3 comments
Closed

Unexpected behaviour of RestartFlow.onFailuresWithBackoff #24726

julianhowarth opened this issue Mar 14, 2018 · 3 comments
Assignees
Milestone

Comments

@julianhowarth
Copy link

@julianhowarth julianhowarth commented Mar 14, 2018

I was expecting the behaviour of RestartFlow.onFailuresWithBackoff to be the same as RestartFlow.withBackoff when a failure occurs in the stream. Instead I am seeing different behaviours. The below using withBackoff works as expected:

  implicit val sys = ActorSystem()
  implicit val executor = sys.dispatcher
  implicit val mat = {
    // Supervisor so we can see what's going on, but use default Stop behaviour 
    val decider: Supervision.Decider = {
      case NonFatal(e) ⇒
        println(s"Supervisor got exception: ${e.getMessage}")
        Supervision.Stop
    }
    ActorMaterializer(ActorMaterializerSettings(sys).withSupervisionStrategy(decider))
  }

  val failing =
    Flow[Int].map(i ⇒
      if (i % 3 == 0) {
        println("About to fail")
        throw new RuntimeException("FAIL!")
      }
      else i)

  val safe =
    RestartFlow.withBackoff(1.second, 2.seconds, 0.2, 2)(() ⇒ {
      println("Creating flow")
      failing
    })

  Source(1 to 7)
    .via(safe)
    .runWith(Sink.foreach(println))
    .onComplete {
      case Success(_) ⇒
        println("Completed successfully")
      case Failure(ex) ⇒
        println(s"Failed - ${ex.getMessage}")
    }

Prints out:

Creating flow
1
2
About to fail
Supervisor got exception: FAIL!
Creating flow
4
5
About to fail
Supervisor got exception: FAIL!
Creating flow
7
Completed successfully

However, if I change the safe val to be:

  val safe =
    RestartFlow.onFailuresWithBackoff(1.second, 2.seconds, 0.2, 2)(() ⇒ {
      println("Creating flow")
      failing
    })

I get:

Creating flow
1
2
About to fail
Supervisor got exception: FAIL!
[ERROR] [03/14/2018 08:22:42.753] [default-akka.actor.default-dispatcher-5] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure
java.lang.RuntimeException: FAIL!
	at jhowarth.RestartFlowTest$.$anonfun$failing$1(AkkaBug.scala:31)
	at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:12)
	at akka.stream.impl.fusing.Map$$anon$9.onPush(Ops.scala:53)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:585)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:469)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:560)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:742)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:732)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:758)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:667)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at akka.actor.ActorCell.invoke(ActorCell.scala:559)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Creating flow
Failed - Cannot pull closed port (RestartWithBackoffFlow.in(1245019744))

However, I would have expected identical behaviour to the withBackoff version.

akka-streams: 2.5.11
scala: 2.12.4

@patriknw

This comment has been minimized.

Copy link
Member

@patriknw patriknw commented Mar 14, 2018

"Cannot pull closed port" indicates that this is a bug.

btw, do you see the same if removing the supervision decider?

@julianhowarth

This comment has been minimized.

Copy link
Author

@julianhowarth julianhowarth commented Mar 14, 2018

Thanks, I thought the "cannot pull" was unlikely to be the intended behaviour.

Switching to use: implicit val mat = ActorMaterializer() and behaviour is the same (minus the logging):

Creating flow
1
2
About to fail
[ERROR] [03/14/2018 09:43:07.791] [default-akka.actor.default-dispatcher-2] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure
java.lang.RuntimeException: FAIL!
	at jhowarth.RestartFlowTest$.$anonfun$failing$1(AkkaBug.scala:32)
	at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:12)
	at akka.stream.impl.fusing.Map$$anon$9.onPush(Ops.scala:53)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:585)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:469)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:560)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:742)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:732)
	at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:726)
	at akka.actor.Actor.aroundPreStart(Actor.scala:528)
	at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:667)
	at akka.actor.ActorCell.create(ActorCell.scala:654)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:525)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.run(Mailbox.scala:223)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Creating flow
Failed - Cannot pull closed port (RestartWithBackoffFlow.in(549026988))
@julianhowarth

This comment has been minimized.

Copy link
Author

@julianhowarth julianhowarth commented Mar 17, 2018

After a little bit of investigation there are a couple of things to note:

  • The message "Restarting graph due to failure" is generated from the RestartWithBackoffLogic class when upstream fails. However, it is downstream that we are interested in as it has thrown the exception.
  • Looking further down there is the onDownstreamFinish method in the subSourceOutlet, which is the route by which I'd expect to get notification and graph restart (this is the route followed for the RestartFlow.withBackoff method).
  • However, the if condition is:
        if (finishing || maxRestartsReached() || onlyOnFailures) {
          cancel(in)
        } else {
          log.debug("Graph in finished")
          scheduleRestartTimer()
        }

so whereas for the withBackoff method we get a restart, in the case for onFailuresWithBackoff we do not restart as onlyOnFailures is set to true. Given that either completion or an error will both lead to downstream cancelling, I can't see how the onlyOnFailures condition in the code above can work without more context as to what caused the cancellation.

@chbatey chbatey self-assigned this Mar 26, 2018
chbatey added a commit to chbatey/akka that referenced this issue Mar 27, 2018
For wrapping a user flow with FlowRestart.onFlowWithFailures when the user
flow it fails signals a cancel upstream and a failure downstream.

These are intercepted by a SubSource/SubSink. In the case
the SubSource receives the cancel before the SubSink receives
the real upstream is wrongly canceled leading to an error
when the SubSink restarts the flow.

This commit introduces a delay for the cancel so that the failure
is more likely to win.

Would be far better to propagate a reason for cancel so this could
be deterministic. See akka#23909

Refs akka#24528 akka#24726
chbatey added a commit that referenced this issue Apr 27, 2018
* RestartWithBackOff delay cancel to wait for failure

For wrapping a user flow with FlowRestart.onFlowWithFailures when the user
flow it fails signals a cancel upstream and a failure downstream.

These are intercepted by a SubSource/SubSink. In the case
the SubSource receives the cancel before the SubSink receives
the real upstream is wrongly canceled leading to an error
when the SubSink restarts the flow.

This commit introduces a delay for the cancel so that the failure
is more likely to win.

Would be far better to propagate a reason for cancel so this could
be deterministic. See #23909

Refs #24528 #24726
@chbatey chbatey added this to the 2.5.13 milestone Apr 27, 2018
@chbatey chbatey closed this Apr 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
3 participants
You can’t perform that action at this time.