Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: fs2 2.0.0 #33

Open
wants to merge 3 commits into
base: series/0.4
Choose a base branch
from
Open

Conversation

sebastianvoss
Copy link

Update to fs2 version 2.0.0

Two tests are currently failing:

  • TakeThroughDrain.early-terminated.drain
  • TakeThroughDrain.normal-termination.dont-drain

I would be thankful for hints why this happens.

@sebastianvoss
Copy link
Author

@AdamChlupacek Can you help me to figure out why these test are failing?

@poliez
Copy link

poliez commented Nov 3, 2019

why these test are failing?

To my understanding the tests fail because calling queue.dequeue completely empties the queue, so the subsequent Stream.eval(queue.dequeue1) can only evaluate to IO(None) because the queue is empty and no one enqueues in it. So the problem could be in the test itself.

You can see it running this example in the REPL (i tried with fs2 2.0.1)

import fs2._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

implicit val cs = IO.contextShift(ExecutionContext.global)
implicit val ce = IO.ioConcurrentEffect

val source = Stream[IO, Int](1, 2, 3, 4, 5).covary[IO]

val program =
  fs2.concurrent.Queue
    .unbounded[IO, Int]
    .flatMap { queue =>
      (source.through(queue.enqueue).drain ++
        queue.dequeue
          .evalTap(el => IO(println(s"$el was in the queue")))
          .takeThrough(_ < 3)
          .evalTap(el => IO(println(s"$el was taken from the stream")))
          .drain ++
        Stream
          .eval(queue.dequeue1)
          .evalTap(el => IO(println(s"The queue still contains $el")))).compile.last
    }

program.unsafeRunTimed(10.second).flatten

That will output something like

1 was in the queue
1 was taken from the stream
2 was in the queue
2 was taken from the stream
3 was in the queue
3 was taken from the stream
res6: Option[Int] = None

Hope this helps!

@declspec-cdecl
Copy link

Main problem with early-terminated.drain test with fs 2.x.x is that .onFinalize in takeThroughDrain executed after last Stream.eval(queue.dequeue1). But I've found some workaround

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants