Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZipLatestWith backpressures if no elements are available from zipped sources #27416

Open
marekscholle opened this issue Jul 25, 2019 · 1 comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream

Comments

@marekscholle
Copy link

marekscholle commented Jul 25, 2019

zipLatestWith operator promisses it will backpressure only when downstream backpressures. According my observations, this is true, but only after initial elements become available in both sources. This is really bad: say you have real data source S1 and a user inputs source S2. You want tuples (data, user input) on output, so you use zipWithLatest. But this will backpressure real data until first user input comes (someone may want this, but should be warned).

The documentation Backpressures when downstream backpressures implies zipWithLatest should drop latest elements from one stream if no element are available in the second one.

I understand this is possibly breaking change, but at least documentation should be explicit about the operator behavior in its initial state and possibly mention recommended way to achieve drop-the-latest behavior in initial state.

test("zipWithLatest backpressures") {
  val (probe1, source1) = TestSource.probe[Int].preMaterialize()
  val (probe2, source2) = TestSource.probe[Int].preMaterialize()
  val done = source1
    .zipLatestWith(source2)((_, _))
    .toMat(Sink.foreach(println))(Keep.right)
    .run()

  (1 to 1000).foreach(probe1.sendNext)

  Thread.sleep(1000)
  probe2.sendNext(1)

  probe1.sendComplete()
  Await.result(done, 2.second)
}

This test fails. If I write 5 instead of 1000, I see (1, 1), ... (5, 1) instead of expected (5, 1) (but that may be the design as discussed above)

@johanandren
Copy link
Member

Sorry this got lost in the inflow of issues, I agree, I'd expect the semantics you describe where zipWithLatest keeps pulling elements rather than backpressuring before seeing the first element from the other source.

@johanandren johanandren added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream labels Sep 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream
Projects
None yet
Development

No branches or pull requests

2 participants