New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alsoTo silently swallows exceptions #24291

Closed
jeremystone opened this Issue Jan 11, 2018 · 36 comments

Comments

Projects
None yet
9 participants
@jeremystone
Contributor

jeremystone commented Jan 11, 2018

The following:

  Source(1 to 5)
    .alsoTo(Sink.foreach(_ => throw new IllegalStateException()))
    .runWith(Sink.foreach(println))

Prints:

1
2
3
4
5

I would have expected the stream to fail with the exception.

This causes problems if alsoTo is used in a stream inside a RestartSource because no automatic backoff/restart will occur and nothing appears in the log. (Although the main part of the flow continues to run.)

In our case we had the side flow writing to a Kafka topic and after a few hours or days noticed that nothing new got written. Fortunately we are able to use via instead which does not seem to have this issue.

Please also see unit tests at https://gist.github.com/jeremystone/ec1bf1eaaaef002e13ec4cfcc1928749

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 12, 2018

Member

Surprising given that alsoTo is implemented through a Broadcast(2). Must be a bug, thanks for the reproducer! Would you be up to trying to PR a fix as well?

Member

johanandren commented Jan 12, 2018

Surprising given that alsoTo is implemented through a Broadcast(2). Must be a bug, thanks for the reproducer! Would you be up to trying to PR a fix as well?

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 12, 2018

Member

BTW, bonus points for the reproducer file name :D

Member

johanandren commented Jan 12, 2018

BTW, bonus points for the reproducer file name :D

@jeremystone

This comment has been minimized.

Show comment
Hide comment
@jeremystone

jeremystone Jan 12, 2018

Contributor

Given that it's a Broadcast, am I right in understanding that a downstream failure in the side flow will not propagate up and then down the other main one? I.e. is the best that can be done to cancel the stage?

As far as a PR is concerned, I'd be happy to give it a go.

Contributor

jeremystone commented Jan 12, 2018

Given that it's a Broadcast, am I right in understanding that a downstream failure in the side flow will not propagate up and then down the other main one? I.e. is the best that can be done to cancel the stage?

As far as a PR is concerned, I'd be happy to give it a go.

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 12, 2018

Member

Ah, that could be the reason, failures do not flow upstream, only downstream, upstream only ever see a cancel, so my initial assertion about it being a bug was wrong.

We have a ticket about potentially sending cancellation upstream semantics here #23908

Member

johanandren commented Jan 12, 2018

Ah, that could be the reason, failures do not flow upstream, only downstream, upstream only ever see a cancel, so my initial assertion about it being a bug was wrong.

We have a ticket about potentially sending cancellation upstream semantics here #23908

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 12, 2018

Member

I wonder if the broadcast should not be eagerCancel = true though.

Member

johanandren commented Jan 12, 2018

I wonder if the broadcast should not be eagerCancel = true though.

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 12, 2018

Member

I don’t think this is a bug about the “swallows exception” by itself — it is another occurrence of a reappearing issue we have that should be addressed generically, and not just for the alsoTo.

Regarding the cancellation propagation that’s to be debated but: It is also to which means you care less about the other one. I’d rather propose adding a broadcast method that would do the eagerCancel = true.

The matter about “swallows errors” is also conflating things. There is no such thing as “the stream fails with the exception” as exceptions only flow downstream, and there is no downstream to the foreach sink so it has simply failed.

The actual issue is that we do not automatically log such failures, which I have a opened a ticket about. We have a ticket about actively logging all failures in such sinks, call it verbose logging or something similar which can be disabled but would be on by default. It may even have a PR but was too noisy to merge as is?

Please think about that ticket, as it would solve the root cause of this and numerous other ones. Will try to find that one and add a link here, though in a bus now..

Member

ktoso commented Jan 12, 2018

I don’t think this is a bug about the “swallows exception” by itself — it is another occurrence of a reappearing issue we have that should be addressed generically, and not just for the alsoTo.

Regarding the cancellation propagation that’s to be debated but: It is also to which means you care less about the other one. I’d rather propose adding a broadcast method that would do the eagerCancel = true.

The matter about “swallows errors” is also conflating things. There is no such thing as “the stream fails with the exception” as exceptions only flow downstream, and there is no downstream to the foreach sink so it has simply failed.

The actual issue is that we do not automatically log such failures, which I have a opened a ticket about. We have a ticket about actively logging all failures in such sinks, call it verbose logging or something similar which can be disabled but would be on by default. It may even have a PR but was too noisy to merge as is?

Please think about that ticket, as it would solve the root cause of this and numerous other ones. Will try to find that one and add a link here, though in a bus now..

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 12, 2018

Member

The ticket being: Proposal: Option to log errors/completion in typical "ignore" "silent" sinks #23501

Member

ktoso commented Jan 12, 2018

The ticket being: Proposal: Option to log errors/completion in typical "ignore" "silent" sinks #23501

@jeremystone

This comment has been minimized.

Show comment
Hide comment
@jeremystone

jeremystone Jan 12, 2018

Contributor

Would it be worth adding a note in the documentation of the RestartSource/RestartFlow that alsoTo (or presumably any explicit non-eagerCancel Broadcast branches that end with a Sink and so aren't merged back in) could cause the flow not to fully restart?

Contributor

jeremystone commented Jan 12, 2018

Would it be worth adding a note in the documentation of the RestartSource/RestartFlow that alsoTo (or presumably any explicit non-eagerCancel Broadcast branches that end with a Sink and so aren't merged back in) could cause the flow not to fully restart?

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 12, 2018

Member

We answered in parallel without seing each others comments, me and @ktoso, I still think that it would make sense if alsoTo would cancel eagerly, not sure if that would solve your problem though @jeremystone

Documenting that other than linear flows inside of the restarters could be surprising sounds good to me.

Member

johanandren commented Jan 12, 2018

We answered in parallel without seing each others comments, me and @ktoso, I still think that it would make sense if alsoTo would cancel eagerly, not sure if that would solve your problem though @jeremystone

Documenting that other than linear flows inside of the restarters could be surprising sounds good to me.

@jeremystone

This comment has been minimized.

Show comment
Hide comment
@jeremystone

jeremystone Jan 13, 2018

Contributor

Yes. Automatic logging of such failures, eager cancellation and a note in the docs would be helpful to others, I think.

Let me know I can assist in this regard.

You've probably already considered this, but use of an eagerCancel would also be prevent the 'opposite' problem where a cancellation (or failure) on the main branch downstream of the alsoTo can leave the side flow running.

The following:

  Source.tick(0.millis, 100.millis, NotUsed)
    .alsoTo(Sink.foreach(_ => println("Side")))
    .map(_ => println("Main"))
    .runWith(Sink.cancelled)

prints Side continually.

If used in a restarter, this presumably could result in resource leakage.

Contributor

jeremystone commented Jan 13, 2018

Yes. Automatic logging of such failures, eager cancellation and a note in the docs would be helpful to others, I think.

Let me know I can assist in this regard.

You've probably already considered this, but use of an eagerCancel would also be prevent the 'opposite' problem where a cancellation (or failure) on the main branch downstream of the alsoTo can leave the side flow running.

The following:

  Source.tick(0.millis, 100.millis, NotUsed)
    .alsoTo(Sink.foreach(_ => println("Side")))
    .map(_ => println("Main"))
    .runWith(Sink.cancelled)

prints Side continually.

If used in a restarter, this presumably could result in resource leakage.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Jan 24, 2018

Member

I just ran into this surprise that alsoTo is using eagerCancel = false. I think it should be true. We also have lengthy discussions about `wireTap in #15077 (and PR #23824).

Member

patriknw commented Jan 24, 2018

I just ran into this surprise that alsoTo is using eagerCancel = false. I think it should be true. We also have lengthy discussions about `wireTap in #15077 (and PR #23824).

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jan 24, 2018

Member

A PR changing it to eagerCancel = true would be good @jeremystone, additional docs on alsoTo would also be good.

For the "always log" there is a log statement here

log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
which I don't quite understand why it doesn't show up.

Member

johanandren commented Jan 24, 2018

A PR changing it to eagerCancel = true would be good @jeremystone, additional docs on alsoTo would also be good.

For the "always log" there is a log statement here

log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
which I don't quite understand why it doesn't show up.

@jrudolph

This comment has been minimized.

Show comment
Hide comment
@jrudolph

jrudolph Jan 24, 2018

Member

which I don't quite understand why it doesn't show up.

I guess it's not a stage error in the narrow sense where these logs show when there are actually exceptions throwing out of stage implementations. The error does show up in the materialized value of the alsoTo'd Sink.foreach. It just isn't read in this example.

Member

jrudolph commented Jan 24, 2018

which I don't quite understand why it doesn't show up.

I guess it's not a stage error in the narrow sense where these logs show when there are actually exceptions throwing out of stage implementations. The error does show up in the materialized value of the alsoTo'd Sink.foreach. It just isn't read in this example.

@jeremystone

This comment has been minimized.

Show comment
Hide comment
@jeremystone

jeremystone Jan 25, 2018

Contributor

Will try to get a PR together for this, @johanandren.

Contributor

jeremystone commented Jan 25, 2018

Will try to get a PR together for this, @johanandren.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Jan 25, 2018

Member

@viktorklang has an open PR for something very similar for divertTo. Coordinate with him so you don’t spend time on the same thing

Member

patriknw commented Jan 25, 2018

@viktorklang has an open PR for something very similar for divertTo. Coordinate with him so you don’t spend time on the same thing

@jeremystone

This comment has been minimized.

Show comment
Hide comment
@jeremystone

jeremystone Jan 25, 2018

Contributor

@patriknw, @viktorklang - my changes are so far limited to setting eagerCancel = true on alsoTo with corresponding API doc change, plus tests that cancellation on either of the side or main branches now terminates the stream.

I am considering adding a note to the stream-error.md page cautioning care when wrapping 'non-linear' graphs in the restarters.

Contributor

jeremystone commented Jan 25, 2018

@patriknw, @viktorklang - my changes are so far limited to setting eagerCancel = true on alsoTo with corresponding API doc change, plus tests that cancellation on either of the side or main branches now terminates the stream.

I am considering adding a note to the stream-error.md page cautioning care when wrapping 'non-linear' graphs in the restarters.

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 25, 2018

Member

@jeremystone This is what I am proposing for divertTo: #24403, having eagerCancel=true for alsoTo makes sense, but not as much as for divertTo (since for alsoTo there is no data loss if any of the sinks cancel)

Member

viktorklang commented Jan 25, 2018

@jeremystone This is what I am proposing for divertTo: #24403, having eagerCancel=true for alsoTo makes sense, but not as much as for divertTo (since for alsoTo there is no data loss if any of the sinks cancel)

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Jan 26, 2018

Member

I agree, but there could still be a point in making them consistent so that the "linear" flows behave the same. It's also fine that they are different and most important is that it's documented and tested, so it's no accidental decision.

Member

patriknw commented Jan 26, 2018

I agree, but there could still be a point in making them consistent so that the "linear" flows behave the same. It's also fine that they are different and most important is that it's documented and tested, so it's no accidental decision.

jeremystone added a commit to jeremystone/akka that referenced this issue Jan 26, 2018

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 26, 2018

Member

@patriknw Agreed. However, this means that something like wireTap wouldn't be allowed to have anything but eagerCancel=true either. I have thought a bit about it and have possibly concluded that eagerCancellation and eagerCompletion as setting might be a core concern for GraphStage.

Member

viktorklang commented Jan 26, 2018

@patriknw Agreed. However, this means that something like wireTap wouldn't be allowed to have anything but eagerCancel=true either. I have thought a bit about it and have possibly concluded that eagerCancellation and eagerCompletion as setting might be a core concern for GraphStage.

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Jan 26, 2018

Member

That was one of the major differences I saw with divertTo and wireTap. For wireTap you don't care so much about it (I guess it would drop instead of backpressure also).

Member

patriknw commented Jan 26, 2018

That was one of the major differences I saw with divertTo and wireTap. For wireTap you don't care so much about it (I guess it would drop instead of backpressure also).

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 26, 2018

Member

@patriknw My point is that wireTap will also be a "linear" flow.

Member

viktorklang commented Jan 26, 2018

@patriknw My point is that wireTap will also be a "linear" flow.

jeremystone added a commit to jeremystone/akka that referenced this issue Jan 29, 2018

jeremystone added a commit to jeremystone/akka that referenced this issue Jan 29, 2018

@capacman

This comment has been minimized.

Show comment
Hide comment
@capacman

capacman Jan 31, 2018

Hi i dont know whether this is related to but i had similar 'swallaw' like behavior below example:

implicit val actorSystem = ActorSystem("test")
    implicit val actorMaterializer = ActorMaterializer()
    val flow = Flow[Int].map { _ =>
      throw new RuntimeException("boom")
      1
    }
    val source = Source.fromIterator[Int](() => List(1, 2, 3,4,5,6).iterator).toMat(BroadcastHub.sink(bufferSize = 8).named("broadcast"))(Keep.right).run()
    val sink = Sink.foreach[Int](println)
    source.via(flow).toMat(sink)(Keep.right).run().onComplete(println)

I expect it to print Failure(java.lang.RuntimeException: boom) but it print Success when buffersize is bigger than list size. If you decrease buffersize to 4 in above example it prints Failure. Is there a workaround for this? Or is it a bug and related to above ?

capacman commented Jan 31, 2018

Hi i dont know whether this is related to but i had similar 'swallaw' like behavior below example:

implicit val actorSystem = ActorSystem("test")
    implicit val actorMaterializer = ActorMaterializer()
    val flow = Flow[Int].map { _ =>
      throw new RuntimeException("boom")
      1
    }
    val source = Source.fromIterator[Int](() => List(1, 2, 3,4,5,6).iterator).toMat(BroadcastHub.sink(bufferSize = 8).named("broadcast"))(Keep.right).run()
    val sink = Sink.foreach[Int](println)
    source.via(flow).toMat(sink)(Keep.right).run().onComplete(println)

I expect it to print Failure(java.lang.RuntimeException: boom) but it print Success when buffersize is bigger than list size. If you decrease buffersize to 4 in above example it prints Failure. Is there a workaround for this? Or is it a bug and related to above ?

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 31, 2018

Member
Member

viktorklang commented Jan 31, 2018

@capacman

This comment has been minimized.

Show comment
Hide comment
@capacman

capacman Jan 31, 2018

So what i see is correct behavior? I mean i had a mistake?

capacman commented Jan 31, 2018

So what i see is correct behavior? I mean i had a mistake?

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 31, 2018

Member
Member

viktorklang commented Jan 31, 2018

@capacman

This comment has been minimized.

Show comment
Hide comment
@capacman

capacman Jan 31, 2018

I expect mat .onComplete(println) to print always Failure since flow between source and sink always throws exception and stream is failing.So mat value Futuru[Done] should always resolve to failure. Or i completely misunderstood mat concept !?!

capacman commented Jan 31, 2018

I expect mat .onComplete(println) to print always Failure since flow between source and sink always throws exception and stream is failing.So mat value Futuru[Done] should always resolve to failure. Or i completely misunderstood mat concept !?!

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 31, 2018

Member
Member

viktorklang commented Jan 31, 2018

@capacman

This comment has been minimized.

Show comment
Hide comment
@capacman

capacman Jan 31, 2018

Himm. So i see exception since buffersize is bigger than list and consume all list and when i attach a sink after broadcast hub mat resolves to success because list is already finished?

capacman commented Jan 31, 2018

Himm. So i see exception since buffersize is bigger than list and consume all list and when i attach a sink after broadcast hub mat resolves to success because list is already finished?

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 31, 2018

Member
Member

viktorklang commented Jan 31, 2018

@capacman

This comment has been minimized.

Show comment
Hide comment
@capacman

capacman Jan 31, 2018

I understand. Thank you for explanation in the meantime 👍

capacman commented Jan 31, 2018

I understand. Thank you for explanation in the meantime 👍

@viktorklang

This comment has been minimized.

Show comment
Hide comment
@viktorklang

viktorklang Jan 31, 2018

Member
Member

viktorklang commented Jan 31, 2018

jeremystone added a commit to jeremystone/akka that referenced this issue Feb 18, 2018

johanandren added a commit that referenced this issue Feb 19, 2018

@johanandren johanandren added this to the 2.5.10 milestone Feb 19, 2018

@sarahgerweck

This comment has been minimized.

Show comment
Hide comment
@sarahgerweck

sarahgerweck Mar 5, 2018

This is a pretty major behavior change; I don't think it should have gone in a patch release, at least not without some kind of banner warning that it's a change with major potential consequences.

FWIW, I would argue that the previous behavior was more natural, as Broadcast is symmetric but alsoTo syntactically privileges one stream over the other. Ideally this really should be a parameter of alsoTo, as I think both behaviors will surprise people. Most of all, I think more attention to this change was needed in the release notes.

sarahgerweck commented Mar 5, 2018

This is a pretty major behavior change; I don't think it should have gone in a patch release, at least not without some kind of banner warning that it's a change with major potential consequences.

FWIW, I would argue that the previous behavior was more natural, as Broadcast is symmetric but alsoTo syntactically privileges one stream over the other. Ideally this really should be a parameter of alsoTo, as I think both behaviors will surprise people. Most of all, I think more attention to this change was needed in the release notes.

@jypma

This comment has been minimized.

Show comment
Hide comment
@jypma

jypma Jun 21, 2018

Member

We are actually relying on alsoTo allowing a side-flow to complete without consuming all elements. After the change, it'll make the main flow complete too early.

Although I understand the desire to propagate the cancel in certain cases, is there a way to turn this new behaviour off? What should we do instead if we want a side-flow to only consume some of the elements?

Member

jypma commented Jun 21, 2018

We are actually relying on alsoTo allowing a side-flow to complete without consuming all elements. After the change, it'll make the main flow complete too early.

Although I understand the desire to propagate the cancel in certain cases, is there a way to turn this new behaviour off? What should we do instead if we want a side-flow to only consume some of the elements?

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jun 21, 2018

Member

Dropping down to the GraphDSL will give you an explicit eagerCancel flag when constructing Broadcast, another option could be wireTap (note that it will drop elements if the alternative downstream back pressures though).

Member

johanandren commented Jun 21, 2018

Dropping down to the GraphDSL will give you an explicit eagerCancel flag when constructing Broadcast, another option could be wireTap (note that it will drop elements if the alternative downstream back pressures though).

@jypma

This comment has been minimized.

Show comment
Hide comment
@jypma

jypma Jun 21, 2018

Member

Indeed, we're now doing

    public static <T,M> Flow<T,T,M> alsoTo(Sink<T,M> sink) {
        return Flow.fromGraph(GraphDSL.create(sink, (builder, out) -> {
            UniformFanOutShape<T, T> bcast = builder.add(Broadcast.create(2, false));
            builder.from(bcast.out(1)).to(out);
            return FlowShape.of(bcast.in(), bcast.out(0));
        }));
    }

and then just mainGraph.via(alsoTo(sideSink)). Rewriting all chained flows in graph DSL would be a lot more work :)

But, are we doing the "wrong" thing, allowing side flows to only consume part of the elements? The original issue was about errors not propagating, instead of termination.

Member

jypma commented Jun 21, 2018

Indeed, we're now doing

    public static <T,M> Flow<T,T,M> alsoTo(Sink<T,M> sink) {
        return Flow.fromGraph(GraphDSL.create(sink, (builder, out) -> {
            UniformFanOutShape<T, T> bcast = builder.add(Broadcast.create(2, false));
            builder.from(bcast.out(1)).to(out);
            return FlowShape.of(bcast.in(), bcast.out(0));
        }));
    }

and then just mainGraph.via(alsoTo(sideSink)). Rewriting all chained flows in graph DSL would be a lot more work :)

But, are we doing the "wrong" thing, allowing side flows to only consume part of the elements? The original issue was about errors not propagating, instead of termination.

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Jun 21, 2018

Member

If you follow the conversation it ends up in two things - error logging and the eager cancellation. Please either start a forum discussion or if you think there is need to make the eagerCancellation flag available through the flow api, a new ticket instead of discussing it on this old PR (and pinging everyone who was involved for every comment)

Member

johanandren commented Jun 21, 2018

If you follow the conversation it ends up in two things - error logging and the eager cancellation. Please either start a forum discussion or if you think there is need to make the eagerCancellation flag available through the flow api, a new ticket instead of discussing it on this old PR (and pinging everyone who was involved for every comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment