Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging streams causes 2 chunks to be emmited instead of 1 #1987

Closed
dmitriibundin opened this issue Aug 13, 2020 · 1 comment
Closed

Merging streams causes 2 chunks to be emmited instead of 1 #1987

dmitriibundin opened this issue Aug 13, 2020 · 1 comment

Comments

@dmitriibundin
Copy link
Contributor

dmitriibundin commented Aug 13, 2020

I'm running into the issue about mergeing 2 fs2.Streams and processing the resulting Stream further. Consider the following example:

  Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.never[IO].merge(fs2.Stream.repeatEval(ref.get)).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  }).flatMap(_.compile.drain).unsafeRunSync()

Expected output:

Got value 0
Got value 1
Got value 2
Got value 3
...

Actual output:

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

As far as I can tell from digging into the implementation of merge the issue is with the Semaphore(1). Instead of emitting 1 chunk it puts 2 chunks immediately which is not really expected.

Increasing the number of permits to n causes n+1 chunks to be emitted at once.

@mpilquist
Copy link
Member

mpilquist commented Aug 14, 2020

This is caused by the implementation of runStream: https://github.com/functional-streams-for-scala/fs2/blob/b3f4565891fd5e994abdd07eb326094def623053/core/shared/src/main/scala/fs2/Stream.scala#L1927-L1931

It pulls the next chunk from the source stream and then acquires the semaphore permit, which is blocked until previous chunk is processed from the queue. Hence, it's always reading 1 chunk ahead.

To fix, we can move the guard acquisition to happen before the pull from source stream. One way to do this is Stream.repeatEval(guard.acquire).zipRight(s.chunks), which will first acquire a guard and then pull a chunk.

dmitriibundin added a commit to dmitriibundin/fs2 that referenced this issue Aug 14, 2020
dmitriibundin added a commit to dmitriibundin/fs2 that referenced this issue Aug 15, 2020
dmitriibundin added a commit to dmitriibundin/fs2 that referenced this issue Aug 15, 2020
mpilquist added a commit that referenced this issue Aug 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants