Skip to content
Permalink
Browse files

Merge pull request #1635 from CremboC/evalFilternot

Add `evalFilterNot` and `evalFilterNotAsync`
  • Loading branch information...
mpilquist committed Oct 4, 2019
2 parents cfd6213 + b5a02b6 commit fad332e23aa66c40f9c1b6cabb0237420f37d8c1
Showing with 101 additions and 0 deletions.
  1. +20 −0 core/shared/src/main/scala/fs2/Stream.scala
  2. +81 −0 core/shared/src/test/scala/fs2/StreamSpec.scala
@@ -1038,6 +1038,26 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit]
f(o).map(if (_) Stream.emit(o) else Stream.empty)
}.flatten

/**
* Like `filterNot`, but allows filtering based on an effect.
*
* Note: The result Stream will consist of chunks that are empty or 1-element-long.
* If you want to operate on chunks after using it, consider buffering, e.g. by using [[buffer]].
*/
def evalFilterNot[F2[x] >: F[x]: Functor](f: O => F2[Boolean]): Stream[F2, O] =
flatMap(o => Stream.eval(f(o)).ifM(Stream.empty, Stream.emit(o)))

/**
* Like `filterNot`, but allows filtering based on an effect, with up to [[maxConcurrent]] concurrently running effects.
* The ordering of emitted elements is unchanged.
*/
def evalFilterNotAsync[F2[x] >: F[x]: Concurrent](
maxConcurrent: Int
)(f: O => F2[Boolean]): Stream[F2, O] =
parEvalMap[F2, Stream[F2, O]](maxConcurrent) { o =>
f(o).map(if (_) Stream.empty else Stream.emit(o))
}.flatten

/**
* Like `filter`, but the predicate `f` depends on the previously emitted and
* current elements.
@@ -993,6 +993,87 @@ class StreamSpec extends Fs2Spec {
.asserting((_ shouldBe ((n, 0))))
}
}

"evalFilterNot" - {
"with effectful const(true)" in forAll { s: Stream[Pure, Int] =>
val s1 = s.toList
s.evalFilterNot(_ => IO.pure(false)).compile.toList.asserting(_ shouldBe s1)
}

"with effectful const(false)" in forAll { s: Stream[Pure, Int] =>
s.evalFilterNot(_ => IO.pure(true)).compile.toList.asserting(_ shouldBe empty)
}

"with function that filters out odd elements" in {
Stream
.range(1, 10)
.evalFilterNot(e => IO(e % 2 == 0))
.compile
.toList
.asserting(_ shouldBe List(1, 3, 5, 7, 9))
}
}

"evalFilterNotAsync" - {
"with effectful const(true)" in forAll { s: Stream[Pure, Int] =>
s.covary[IO]
.evalFilterNotAsync(5)(_ => IO.pure(true))
.compile
.toList
.asserting(_ shouldBe empty)
}

"with effectful const(false)" in forAll { s: Stream[Pure, Int] =>
val s1 = s.toList
s.covary[IO]
.evalFilterNotAsync(5)(_ => IO.pure(false))
.compile
.toList
.asserting(_ shouldBe s1)
}

"with function that filters out odd elements" in {
Stream
.range(1, 10)
.evalFilterNotAsync[IO](5)(e => IO(e % 2 == 0))
.compile
.toList
.asserting(_ shouldBe List(1, 3, 5, 7, 9))
}

"filters up to N items in parallel" in {
val s = Stream.range(0, 100)
val n = 5

(Semaphore[IO](n), SignallingRef[IO, Int](0)).tupled
.flatMap {
case (sem, sig) =>
val tested = s
.covary[IO]
.evalFilterNotAsync(n) { elem =>
val ensureAcquired =
sem.tryAcquire.ifM(
IO.unit,
IO.raiseError(new Throwable("Couldn't acquire permit"))
)

ensureAcquired.bracket(
_ => sig.update(_ + 1).bracket(_ => IO.sleep(10.millis))(_ => sig.update(_ - 1))
)(_ => sem.release) *>
IO.pure(false)
}

sig.discrete
.interruptWhen(tested.drain)
.fold1(_.max(_))
.compile
.lastOrError
.product(sig.get)
}
.asserting((_ shouldBe ((n, 0))))
}
}

"evalMapAccumulate" in forAll { (s: Stream[Pure, Int], m: Int, n0: PosInt) =>
val sVector = s.toVector
val n = n0 % 20 + 1

0 comments on commit fad332e

Please sign in to comment.
You can’t perform that action at this time.