New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOC: Document feed-forward deadlock scenarios #17435

Open
lancearlaus opened this Issue May 9, 2015 · 7 comments

Comments

Projects
None yet
6 participants
@lancearlaus
Copy link

lancearlaus commented May 9, 2015

Issue
A stream that fans out via Broadcast and fans in via Zip fails to complete when one of the intermediate branches contains a drop.

Example
The following graph will fail to complete

    val source = b.add(Source(1 to 10))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(5))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~>         zip.in0
              bcast ~> drop ~> zip.in1

Additional Detail
Here's a snippet from a more complete test case covering different variations in an attempt to isolate the problem.

  // PASS
  def zipSource(num: Int, diff: Int) = Source() { implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source0 = b.add(Source(1 to num))
    val source1 = b.add(Source(1 to (num + diff)))
    val zip = b.add(Zip[Int, Int])

    source0 ~> zip.in0
    source1 ~> zip.in1

    (zip.out)
  }
  // PASS
  def dropSinkSource(num: Int, diff: Int) = Source() {  implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(diff))
    val sink0 = b.add(Sink.ignore)

    source ~> bcast ~> sink0
              bcast ~> drop

    (drop.outlet)
  }
  // FAIL for diff > 0
  def zipDropSource(num: Int, diff: Int) = Source() {  implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(diff))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~>         zip.in0
              bcast ~> drop ~> zip.in1

    (zip.out)
  }

  // PASS
  "Zip" should "complete with same length streams" in {
    val future: Future[Int] = zipSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  it should "complete with different length streams" in {
    val future: Future[Int] = zipSource(10, 20).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  "Sink with drop" should "complete with different length streams" in {
    val future: Future[Int] = dropSinkSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  "Zip with drop" should "complete with same length streams" in {
    val future: Future[Int] = zipDropSource(10, 0).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // FAIL
  it should "complete with different length streams" in {
    val future: Future[Int] = zipDropSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

@drewhk drewhk added this to the http-1.0-RC3 milestone May 11, 2015

@drewhk drewhk self-assigned this May 11, 2015

@drewhk drewhk added 3 - in progress and removed 1 - triaged labels May 11, 2015

@drewhk drewhk modified the milestones: streams-1.0-RC3, http-1.0-RC3 May 11, 2015

@drewhk

This comment has been minimized.

Copy link
Member

drewhk commented May 11, 2015

No, there is no bug. This is a typical deadlock case that can be tricky to figure out.

Scenario description:

  • broadcast can only emit if all downstreams have demand
  • initially, broadcast have demand from zip and drop
  • after enough elements have been dropped by drop, zip still have not progressed (needs another input to progress, but drop is not yet allowing elements through), and there are no more buffer available in zip
  • broadcast waits for the zip for new demand
  • zip waits on drop to emit something finally, so it can emit a new pair and free up buffer space
  • drop waits on broadcast for new element

If you add a buffer stage in the non-dropping path between broadcast and zip, and set its OverflowStrategy to Fail, then this stream will properly fail with a BufferOverflowException if the gap between dropped elements and all elements are large enough.

The reason why this does not fail for the smaller diff cases is because the test case has to drop enough elements so that all the buffers of zip (or any stage between broadcast and zip) are not enough bridge the gap.

@drewhk drewhk modified the milestones: invalid, streams-1.0-RC3 May 11, 2015

@drewhk drewhk closed this May 11, 2015

@patriknw

This comment has been minimized.

Copy link
Member

patriknw commented May 11, 2015

Excellent description, @drewhk. Please add to docs.

@drewhk drewhk modified the milestones: streams-1.0-RC3, invalid May 11, 2015

@drewhk drewhk changed the title Broadcast with zip fails to complete for different length streams (Akka Streams 1.0-RC2) DOC: Document feed-forward deadlock scenarios May 11, 2015

@drewhk

This comment has been minimized.

Copy link
Member

drewhk commented May 11, 2015

Changed to a documentation ticket

@2m 2m reopened this May 11, 2015

@lancearlaus

This comment has been minimized.

Copy link
Author

lancearlaus commented May 11, 2015

Thank you for the prompt, clear response on this issue.
Based on @drewhk comments, I updated the test case to the following and can confirm that it works as expected.

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val buffer = b.add(Flow[Int].buffer(Math.max(diff, 1), OverflowStrategy.backpressure))
    val drop = b.add(Flow[Int].drop(diff))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~> buffer ~> zip.in0
              bcast ~> drop   ~> zip.in1
@drewhk

This comment has been minimized.

Copy link
Member

drewhk commented May 11, 2015

Btw, in these cases I recommend using the buffer in Fail mode, so if deadlock would happen for some reason, it fails the stream instead.

@lancearlaus

This comment has been minimized.

Copy link
Author

lancearlaus commented May 12, 2015

That's what I did initially based on your comments and it certainly worked for the simple test case.
However, when used on real, but still simple, flows with more data, I encounter buffer overflows. After some experimentation and re-reading docs about internal buffers, here's what I surmise. Please correct my assumptions.

When using failure buffer overflow strategy, buffer size must account for:

  • stream diff - the actual difference in stream lengths, for example due to elements being dropped on one branch and not another
  • stage diff - the difference in number of stages between branches, for example due to additional processing stages in one branch vs. another
  • internal buffer size - all stages have internal buffers with a default size of 16, according to the documentation

I arrived at the following formula through a little reasoning and experimentation:

  • buffer size = stream diff + stage diff + internal buffer size
  • Example (for my real flow): buffer size = 20 + 2 + 16 = 38

Once I hit this threshold, it works fine. Alternatively, I can use the stream diff and a back pressure overflow strategy.

Of course, if this is right, or reasonably right, sizing buffers like this doesn't seem appropriate as a general practice. I'd like an approach that solely relies on the stream difference, which is known and intrinsic to the flow.

What are the performance or other implications of sizing according to stream diff and using a back pressure overflow strategy? What are the reasons it would deadlock, as you mention?

@drewhk drewhk modified the milestones: streams-1.0-RC4, streams-1.x May 26, 2015

@lancearlaus

This comment has been minimized.

Copy link
Author

lancearlaus commented Jun 1, 2015

Circling back on this, I created a blog post that explains the issue I encountered along with the solution of using a balancing buffer.

http://blog.lancearlaus.com/akka/streams/scala/2015/05/27/Akka-Streams-Balancing-Buffer/

I hope it helps those who encounter the same issue.

@ktoso ktoso modified the milestones: streams-2.0, streams-backlog Dec 21, 2015

@drewhk drewhk removed their assignment Jan 21, 2016

@rkuhn rkuhn removed the 1 - triaged label Mar 9, 2016

@rkuhn rkuhn added the 1 - triaged label Mar 23, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment