Skip to content

Commit

Permalink
typelevel#1987 format fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitriibundin committed Aug 14, 2020
1 parent 8ad16ae commit 0b203da
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
6 changes: 4 additions & 2 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1924,9 +1924,11 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
): F2[Unit] =
Semaphore(1).flatMap {
guard => // guarantee we process only single chunk at any given time from any given side.
Stream.repeatEval(guard.acquire).zipRight(s.chunks)
Stream
.repeatEval(guard.acquire)
.zipRight(s.chunks)
.evalMap { chunk =>
resultQ.enqueue1(Some(Stream.chunk(chunk).onFinalize(guard.release)))
resultQ.enqueue1(Some(Stream.chunk(chunk).onFinalize(guard.release)))
}
.interruptWhen(interrupt.get.attempt)
.compile
Expand Down
18 changes: 13 additions & 5 deletions core/shared/src/test/scala/fs2/StreamMergeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,19 @@ class StreamMergeSuite extends Fs2Suite {
test("merge not emit ahead") {
forAllF { (v: Int) =>
val expected = List(v, v + 1)
Ref.of[IO, Int](v).map(ref => {
Stream.repeatEval(ref.get).merge(Stream.never[IO]).evalMap(value => {
IO.sleep(100.milliseconds) >> ref.set(value + 1) >> IO(value)
}).take(2)
}).flatMap(_.compile.toList).map(result => assert(result == expected))
Ref
.of[IO, Int](v)
.map { ref =>
Stream
.repeatEval(ref.get)
.merge(Stream.never[IO])
.evalMap { value =>
IO.sleep(100.milliseconds) >> ref.set(value + 1) >> IO(value)
}
.take(2)
}
.flatMap(_.compile.toList)
.map(result => assert(result == expected))
}
}
}

0 comments on commit 0b203da

Please sign in to comment.