Skip to content
Permalink
Browse files

Deprecated Sink in favor of Pipe

  • Loading branch information...
mpilquist committed Jan 2, 2019
1 parent 5bcdbab commit 2392671699bc53f876c13f91c930bee8ac7ee611
@@ -20,7 +20,7 @@ class QueueBenchmark {
Queue
.unbounded[IO, Int]
.flatMap { q =>
Concurrent[IO].start(Stream.constant(1, N).take(size).to(q.enqueue).compile.drain) >>
Concurrent[IO].start(Stream.constant(1, N).take(size).through(q.enqueue).compile.drain) >>
q.dequeue.take(size).compile.drain
}
.unsafeRunSync()
@@ -30,7 +30,8 @@ class QueueBenchmark {
Queue
.unbounded[IO, Chunk[Int]]
.flatMap { q =>
Concurrent[IO].start(Stream.constant(1, N).take(size).chunks.to(q.enqueue).compile.drain) >>
Concurrent[IO].start(
Stream.constant(1, N).take(size).chunks.through(q.enqueue).compile.drain) >>
q.dequeue.flatMap(Stream.chunk).take(size).compile.drain
}
.unsafeRunSync()
@@ -624,7 +624,7 @@ class PipeSpec extends Fs2Spec {
"handle multiple consecutive observations" in {
forAll { (s: PureStream[Int], f: Failure) =>
runLog {
val sink: Sink[IO, Int] = _.evalMap(i => IO(()))
val sink: Pipe[IO, Int, Unit] = _.evalMap(i => IO(()))
val src: Stream[IO, Int] = s.get.covary[IO]
src.observe(sink).observe(sink)
} shouldBe s.get.toVector
@@ -634,7 +634,7 @@ class PipeSpec extends Fs2Spec {
forAll { (s: PureStream[Int], f: Failure) =>
swallow {
runLog {
val sink: Sink[IO, Int] =
val sink: Pipe[IO, Int, Unit] =
in => spuriousFail(in.evalMap(i => IO(i)), f).map(_ => ())
val src: Stream[IO, Int] = spuriousFail(s.get.covary[IO], f)
src.observe(sink).observe(sink)

This file was deleted.

Oops, something went wrong.
@@ -39,8 +39,8 @@ class StreamCancelationSpec extends AsyncFs2Spec {
.flatMap { i =>
Stream.sleep_(50.milliseconds) ++ Stream.emit(i)
}
.to(q.enqueue),
q.dequeue.to(Sink.showLinesStdOut)
.through(q.enqueue),
q.dequeue.showLinesStdOut
).parJoin(2)
}
}
@@ -2,6 +2,7 @@ package fs2

import cats.{Eq, ~>}
import cats.effect.IO
import cats.effect.concurrent.Ref
import cats.effect.laws.discipline.arbitrary._
import cats.effect.laws.util.TestContext
import cats.effect.laws.util.TestInstances._
@@ -452,6 +453,41 @@ class StreamSpec extends Fs2Spec with Inside {
}
}

"observeEither" - {
val s = Stream.emits(Seq(Left(1), Right("a"))).repeat.covary[IO]

"does not drop elements" in {
val is = Ref.of[IO, Vector[Int]](Vector.empty)
val as = Ref.of[IO, Vector[String]](Vector.empty)

val test = for {
iref <- is
aref <- as
iSink = (_: Stream[IO, Int]).evalMap(i => iref.update(_ :+ i))
aSink = (_: Stream[IO, String]).evalMap(a => aref.update(_ :+ a))
_ <- s.take(10).observeEither(iSink, aSink).compile.drain
iResult <- iref.get
aResult <- aref.get
} yield {
assert(iResult.length == 5)
assert(aResult.length == 5)
}

test.unsafeToFuture
}

"termination" - {

"left" in {
assert(runLog(s.observeEither[Int, String](_.take(0).void, _.void)).length == 0)
}

"right" in {
assert(runLog(s.observeEither[Int, String](_.void, _.take(0).void)).length == 0)
}
}
}

"issue #941 - scope closure issue" in {
Stream(1, 2, 3)
.map(_ + 1)
@@ -120,7 +120,8 @@ class QueueSpec extends Fs2Spec {
.eval(InspectableQueue.unbounded[IO, Int])
.flatMap { q =>
def changes =
(Stream.range(1, 6).to(q.enqueue) ++ q.dequeue).zip(Stream.fixedRate[IO](200.millis))
(Stream.range(1, 6).through(q.enqueue) ++ q.dequeue)
.zip(Stream.fixedRate[IO](200.millis))

q.size.concurrently(changes)
}
@@ -7,26 +7,31 @@ import cats.effect.{Concurrent, Sync}
import cats.implicits._

/** Companion for [[Sink]]. */
@deprecated("Use Pipe instead", "1.0.2")
object Sink {

/** Lifts a function `I => F[Unit]` to `Sink[F,I]`. */
@deprecated("Use stream.evalMap(f) instead", "1.0.2")
def apply[F[_], I](f: I => F[Unit]): Sink[F, I] = _.evalMap(f)

/** Sink that prints each string from the source to the supplied `PrintStream`. */
@deprecated("Use stream.lines(out) instead", "1.0.2")
def lines[F[_]](out: PrintStream)(implicit F: Sync[F]): Sink[F, String] =
apply(str => F.delay(out.println(str)))

/**
* Sink that prints each element from the source to the supplied `PrintStream`
* using the `Show` instance for the input type.
*/
@deprecated("Use stream.showLines(out) instead", "1.0.2")
def showLines[F[_]: Sync, I: Show](out: PrintStream): Sink[F, I] =
_.map(_.show).to(lines(out))
_.map(_.show).through(lines(out))

/**
* Sink that prints each element from the source to the standard out
* using the `Show` instance for the input type.
*/
@deprecated("Use stream.showLinesStdOut instead", "1.0.2")
def showLinesStdOut[F[_]: Sync, I: Show]: Sink[F, I] = showLines(Console.out)

/**
@@ -36,10 +41,11 @@ object Sink {
* If either of `left` or `right` fails, then resulting stream will fail.
* If either `halts` the evaluation will halt too.
*/
@deprecated("Use stream.observeEither(left, right)", "1.0.2")
def either[F[_]: Concurrent, L, R](
left: Sink[F, L],
right: Sink[F, R]
): Sink[F, Either[L, R]] =
_.observe(_.collect { case Left(l) => l } to left)
.to(_.collect { case Right(r) => r } to right)
_.observe(_.collect { case Left(l) => l }.through(left))
.through(_.collect { case Right(r) => r }.through(right))
}
Oops, something went wrong.

0 comments on commit 2392671

Please sign in to comment.
You can’t perform that action at this time.