Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supervison strategy not applied to flatMapMerge #23066

Open
jrduncans opened this issue May 30, 2017 · 14 comments
Open

supervison strategy not applied to flatMapMerge #23066

jrduncans opened this issue May 30, 2017 · 14 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:stream

Comments

@jrduncans
Copy link

I would expect that the flatMapMerge below behave the same as the mapAsyncUnordered:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object MapMergeConcatError extends App {
  implicit val system = ActorSystem("Main")
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher

  val subFlow = {
    Flow[Int]
      .mapAsyncUnordered(5)(i => Future {
        if (i == 4) sys.error("")
        i * 5
      })
      .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
      .reduce(_ + _)
  }

  val subStreamFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
    .flatMapMerge(5, m => Source.single(m).mapConcat(identity).via(subFlow))
    .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
    .runWith(Sink.seq)

  val mapAsyncFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
    .mapAsyncUnordered(5)(m => Source.single(m).mapConcat(identity).via(subFlow).runWith(Sink.head))
    .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
    .runWith(Sink.seq)

  val f1 = Await.ready(subStreamFuture, 10.seconds)
  val f2 = Await.ready(mapAsyncFuture, 10.seconds)

  println(s"Using flatMapMerge: $f1")
  println(s"Using mapAsyncUnordered: $f2")

  system.terminate()
}

The output is:

Using flatMapMerge: Future(Failure(java.lang.RuntimeException: ☠))
Using mapAsyncUnordered: Future(Success(Vector(30, 15)))

@jrduncans
Copy link
Author

The mapAsyncUnordered output is the desired output (the whole item from the top-level is dropped when there is a failure in the sub-flow, but the other items that did not have a failure in the sub-flow make it through). I tried other approaches of having different materializers with different supervision strategies in scope for the construction of the outer flow from the inner flows without being able to get the desired behavior, though perhaps I didn't stumble on the right way to do it.

@johanandren
Copy link
Member

To clarify, the current flatMapMerge implementation (akka.stream.impl.fusing.FlattenMerge) doesn't even look at the supervision strategy. There may be good technical reasons for this, I'm not sure, but a PR exploring if it can be added would definitely be welcome.

@johanandren johanandren added 0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream labels Jun 5, 2017
@jrduncans
Copy link
Author

In the meantime, it seems like at least the docs should be updated to add this to the list, currently on the error handling page (http://doc.akka.io/docs/akka/current/scala/stream/stream-error.html) it lists:

ZipWith, GraphStage junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet.

@johanandren
Copy link
Member

Yep. Definitely a quite dated section of the docs. Maybe we should rather list the stages that does honour it rather than the ones that doesn't.

@agolubev
Copy link
Contributor

agolubev commented Jul 2, 2017

Actually, the same supervision strategy ignorance is for groupBy, splitWhen,splitAfter, flatMapConcat and flatMapMerge.
We can add the fact to documentation or to add this as a feature.
From another standpoint FlattenMerge has test that failed with a small modification:

"propagate failure from map function" in assertAllStagesStopped {
      val ex = new Exception("buh")
      intercept[TestFailedException] {
        Source(1 to 3)
          .flatMapMerge(10, i  if (i == 3) throw ex else blocked)
          //if we add the line below test will fail
          .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
          .runWith(Sink.head)
          .futureValue
      }.cause.get should ===(ex)
    }

This test shows that flatMapMerge has a map that honor supervision strategy however following FlattenMerge that does not.

I can think of the following grounding: if we are dealing with subtreams combination - we transparently proceed with events and failures from upstream to downstream and back. So the FlattenMerge is not considering as state transformation.

So let me know what way we want to go.

@johanandren johanandren added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on and removed 0 - new Ticket is unclear on it's purpose or if it is valid or not labels Jan 2, 2018
@gyoho
Copy link

gyoho commented Feb 16, 2018

👍 on this. Any update?

@ktoso
Copy link
Member

ktoso commented Feb 19, 2018

If there was any updates... they'd be here ;-)
Feel free to contribute if you'd like to see a fix.

Note also that Restart stages are nowadays recommended instead of supervision: https://doc.akka.io/docs/akka/2.5/stream/stream-error.html#delayed-restarts-with-a-backoff-stage

@johanandren
Copy link
Member

I think there has been a PR merged since this was opened up that added documentation on all stages that do use the supervision strategy, that's in the Scaladoc/Javadoc though, not in the reference docs.

@politrons
Copy link

Any update about this bug? Which is the best pattern right now to tackle this issue?

@johanandren
Copy link
Member

Nobody is actively working on this or you'd see that here. You could give it a try fixing the issue and doing a PR if it is an important issue to you.

Might be possible to workaround it with a recoverWith just ending the substream, if resume is what you want to do on failures.

@politrons
Copy link

@johanandren How about back off strategy. https://doc.akka.io/docs/akka-stream-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage

or create the KafkaConusumerActor by myself and the subscribe in the pipeline to the topic as they do here https://github.com/kciesielski/reactive-kafka/blob/7dd29719a5d1c6eb70584e67bf9f4dbd7b6d2b39/docs/src/test/scala/sample/scaladsl/ConsumerExample.scala#L328
Then maybe the supervisor decision works.

@johanandren
Copy link
Member

I think the forum (https://discuss.akka.io) is better for that kind of a discussion, let's please keep this issue to be about specifically implementing supervision on the flatMapMerge operator, thanks.

@mdedetrich
Copy link
Contributor

mdedetrich commented May 10, 2022

So I am experiencing this exact behavior as well, I am not sure what the exact technical reasons that @johanandren alluded to at #23066 (comment) but it is very impractical behavior. For my specific circumstance, I have a stream that gets split up into a SubFlow where each of these SubFlow's puts data into S3. If someone does a silly mistake (such as incorrect credentials) rather than adhering to a supervision strategy that tells the entire stream to cancel on such an exception, i.e.

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

It will just keep on looping forever (which actually in my case involved creating ridiculous amount of calls to S3 with each of them failing due to invalid credentials).

I will attempt to do a pull request that explores what would happen if we just propagated the supervision strategy rather than completely ignoring it, at least for my specific usebase the expected behavior if you use Supervision.Stop would be to just kill the parent stream (i.e. killing everything). This I believe is also the intended way to use exceptions in akka-streams, which is that if an exception is thrown its usually means an error which you cannot reasonably handle and if you can handle the errors you should be using values (i.e. Option/Either) along with collect/filter (and it can also be documented that if you don't want the parent stream for a SubFlow to cancel in case of an exception you can just catch the exception and put it in value).

@mdedetrich
Copy link
Contributor

mdedetrich commented May 10, 2022

So I did a bit more digging into my issue and I realized that its actually not due to FlattenMerge (which .splitAfter also uses however in a different context) but rather the fact that Split graph ignores SupervisionStrategy and instead has from what it seems a redundant SubstreamCancellationStrategy parameter.

I made a new ticket with details here #31394

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:stream
Projects
None yet
Development

No branches or pull requests

7 participants