Skip to content
Permalink
Browse files

Merge pull request #1405 from Sebruck/fix_groupWithing_empty_chunks

Fix groupWithin to not emit empty groups, resolves #1404
  • Loading branch information...
SystemFw committed Jan 28, 2019
2 parents 9b0e472 + b111c8f commit 157cbc78edc7c8f05ab6bec7adf7b6a5ce5a8e9c
Showing with 15 additions and 1 deletion.
  1. +14 −0 core/jvm/src/test/scala/fs2/GroupWithinSpec.scala
  2. +1 −1 core/shared/src/main/scala/fs2/Stream.scala
@@ -21,6 +21,20 @@ class GroupWithinSpec extends Fs2Spec {
runLog(action) shouldBe (result)
}

"groupWithin should never emit empty groups" in forAll {
(s: PureStream[VeryShortFiniteDuration], d: ShortFiniteDuration, maxGroupSize: SmallPositive) =>
whenever(s.get.toVector.nonEmpty) {
val action =
s.get
.covary[IO]
.evalTap(shortDuration => IO.sleep(shortDuration.get))
.groupWithin(maxGroupSize.get, d.get)
.map(_.toList)

runLog(action).foreach(group => group should not be empty)
}
}

"groupWithin should never have more elements than in its specified limit" in forAll {
(s: PureStream[VeryShortFiniteDuration], d: ShortFiniteDuration, maxGroupSize: SmallPositive) =>
val maxGroupSizeAsInt = maxGroupSize.get
@@ -1268,7 +1268,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
this.chunks.map(_.asRight.some).through(q.enqueue).onFinalize(q.enqueue1(None))

def emitNonEmpty(c: Chain[Chunk[O]]): Stream[F2, Chunk[O]] =
if (c.nonEmpty) Stream.emit(Chunk.concat(c.toList))
if (c.nonEmpty && !c.forall(_.isEmpty)) Stream.emit(Chunk.concat(c.toList))
else Stream.empty

def resize(c: Chunk[O], s: Stream[F2, Chunk[O]]): (Stream[F2, Chunk[O]], Chunk[O]) =

0 comments on commit 157cbc7

Please sign in to comment.
You can’t perform that action at this time.