Skip to content
Permalink
Browse files

Algebra: open the implementation classes to outside.

The subclasses of Algebra, like Output, Eval, or Acquire,
were private to the Algebra object, but that is private to fs2.

Since we no longer have the nested types, we can remove and unfold
the internal factory methods, and just call the case classes.
  • Loading branch information...
diesalbla committed Sep 12, 2019
1 parent c3cd7df commit a2220f69ac76e0434a5c64a430ea51fa72da9b4b
@@ -16,7 +16,7 @@ class FreeCBenchmark {
@Benchmark
def nestedMaps = {
val nestedMapsFreeC =
(0 to N).foldLeft(FreeC.pure[IO, Int](0): FreeC[IO, INothing, Int]) { (acc, i) =>
(0 to N).foldLeft(Result.Pure[IO, Int](0): FreeC[IO, INothing, Int]) { (acc, i) =>
acc.map(_ + i)
}
run(nestedMapsFreeC)
@@ -25,8 +25,8 @@ class FreeCBenchmark {
@Benchmark
def nestedFlatMaps = {
val nestedFlatMapsFreeC =
(0 to N).foldLeft(FreeC.pure[IO, Int](0): FreeC[IO, INothing, Int]) { (acc, i) =>
acc.flatMap(j => FreeC.pure(i + j))
(0 to N).foldLeft(Result.Pure[IO, Int](0): FreeC[IO, INothing, Int]) { (acc, i) =>
acc.flatMap(j => Result.Pure(i + j))
}
run(nestedFlatMapsFreeC)
}
@@ -1,9 +1,11 @@
package fs2

import cats._
import cats.{Eval => _, _}
import cats.arrow.FunctionK
import cats.effect._
import fs2.internal._
import fs2.internal.FreeC.Result
import fs2.internal.Algebra.Eval

/**
* A `p: Pull[F,O,R]` reads values from one or more streams, returns a
@@ -32,7 +34,7 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Nothing, O, R])

/** Returns a pull with the result wrapped in `Right`, or an error wrapped in `Left` if the pull has failed. */
def attempt: Pull[F, O, Either[Throwable, R]] =
Pull.fromFreeC(get[F, O, R].map(r => Right(r)).handleErrorWith(t => FreeC.pure(Left(t))))
Pull.fromFreeC(get[F, O, R].map(r => Right(r)).handleErrorWith(t => Result.Pure(Left(t))))

/**
* Interpret this `Pull` to produce a `Stream`.
@@ -73,7 +75,7 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Nothing, O, R])

/** Run `p2` after `this`, regardless of errors during `this`, then reraise any errors encountered during `this`. */
def onComplete[F2[x] >: F[x], O2 >: O, R2 >: R](p2: => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
handleErrorWith(e => p2 >> new Pull(Algebra.raiseError[Nothing](e))) >> p2
handleErrorWith(e => p2 >> new Pull(Result.Fail[Nothing](e))) >> p2

/** If `this` terminates with `Pull.raiseError(e)`, invoke `h(e)`. */
def handleErrorWith[F2[x] >: F[x], O2 >: O, R2 >: R](
@@ -96,19 +98,18 @@ object Pull extends PullLowPriority {
*/
def attemptEval[F[_], R](fr: F[R]): Pull[F, INothing, Either[Throwable, R]] =
fromFreeC(
Algebra
.eval[F, R](fr)
Eval[F, R](fr)
.map(r => Right(r): Either[Throwable, R])
.handleErrorWith(t => Algebra.pure[F, Either[Throwable, R]](Left(t)))
.handleErrorWith(t => Result.Pure[F, Either[Throwable, R]](Left(t)))
)

/** The completed `Pull`. Reads and outputs nothing. */
val done: Pull[Pure, INothing, Unit] =
fromFreeC[Pure, INothing, Unit](Algebra.pure[Pure, Unit](()))
fromFreeC[Pure, INothing, Unit](Result.Pure[Pure, Unit](()))

/** Evaluates the supplied effectful value and returns the result as the resource of the returned pull. */
def eval[F[_], R](fr: F[R]): Pull[F, INothing, R] =
fromFreeC(Algebra.eval[F, R](fr))
fromFreeC(Eval[F, R](fr))

/**
* Repeatedly uses the output of the pull as input for the next step of the pull.
@@ -126,19 +127,19 @@ object Pull extends PullLowPriority {

/** Outputs a chunk of values. */
def output[F[x] >: Pure[x], O](os: Chunk[O]): Pull[F, O, Unit] =
if (os.isEmpty) Pull.done else fromFreeC(Algebra.output[F, O](os))
if (os.isEmpty) Pull.done else fromFreeC(Algebra.Output[F, O](os))

/** Pull that outputs nothing and has result of `r`. */
def pure[F[x] >: Pure[x], R](r: R): Pull[F, INothing, R] =
fromFreeC[F, INothing, R](Algebra.pure(r))
fromFreeC[F, INothing, R](Result.Pure(r))

/**
* Reads and outputs nothing, and fails with the given error.
*
* The `F` type must be explicitly provided (e.g., via `raiseError[IO]` or `raiseError[Fallible]`).
*/
def raiseError[F[_]: RaiseThrowable](err: Throwable): Pull[F, INothing, INothing] =
new Pull(Algebra.raiseError[Nothing](err))
new Pull(Result.Fail[Nothing](err))

final class PartiallyAppliedFromEither[F[_]] {
def apply[A](either: Either[Throwable, A])(implicit ev: RaiseThrowable[F]): Pull[F, A, Unit] =
@@ -1,13 +1,14 @@
package fs2

import cats._
import cats.{Eval => _, _}
import cats.arrow.FunctionK
import cats.data.{Chain, NonEmptyList}
import cats.effect._
import cats.effect.concurrent._
import cats.effect.implicits._
import cats.implicits.{catsSyntaxEither => _, _}
import fs2.concurrent._
import fs2.internal.Algebra.{Acquire, Eval, GetScope, Output}
import fs2.internal.FreeC.Result
import fs2.internal.{Resource => _, _}
import java.io.PrintStream
@@ -1095,7 +1096,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit]
else {
f(hd(idx)).get.transformWith {
case Result.Pure(_) => go(idx + 1)
case Result.Fail(err) => Algebra.raiseError(err)
case Result.Fail(err) => Result.Fail(err)
case Result.Interrupted(scopeId: Token, err) =>
Stream.fromFreeC(Algebra.interruptBoundary(tl, scopeId, err)).flatMap(f).get
case Result.Interrupted(invalid, err) =>
@@ -1868,7 +1869,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit]
* }}}
*/
def onComplete[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] =
handleErrorWith(e => s2 ++ Stream.fromFreeC(Algebra.raiseError(e))) ++ s2
handleErrorWith(e => s2 ++ Stream.fromFreeC(Result.Fail(e))) ++ s2

/**
* Runs the supplied effectful action at the end of this stream, regardless of how the stream terminates.
@@ -1911,7 +1912,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit]
f: ExitCase[Throwable] => F2[Unit]
)(implicit F2: Applicative[F2]): Stream[F2, O] =
Stream.fromFreeC(
Algebra.acquire[F2, O, Unit](().pure[F2], (_, ec) => f(ec)).flatMap(_ => get[F2, O])
Acquire[F2, Unit](().pure[F2], (_, ec) => f(ec)).flatMap(_ => get[F2, O])
)

/**
@@ -3045,7 +3046,7 @@ object Stream extends StreamLowPriority {
def bracketCaseWeak[F[x] >: Pure[x], R](
acquire: F[R]
)(release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, R] =
fromFreeC(Algebra.acquire[F, R, R](acquire, release).flatMap(Algebra.output1(_)))
fromFreeC(Acquire[F, R](acquire, release).flatMap(Algebra.output1(_)))

/**
* Creates a pure stream that emits the elements of the supplied chunk.
@@ -3056,7 +3057,7 @@ object Stream extends StreamLowPriority {
* }}}
*/
def chunk[F[x] >: Pure[x], O](os: Chunk[O]): Stream[F, O] =
Stream.fromFreeC(Algebra.output[F, O](os))
Stream.fromFreeC(Output[F, O](os))

/**
* Creates an infinite pure stream that always returns the supplied value.
@@ -3103,12 +3104,12 @@ object Stream extends StreamLowPriority {
os match {
case Nil => empty
case Seq(x) => emit(x)
case _ => fromFreeC(Algebra.output[F, O](Chunk.seq(os)))
case _ => fromFreeC(Algebra.Output[F, O](Chunk.seq(os)))
}

/** Empty pure stream. */
val empty: Stream[Pure, INothing] =
fromFreeC[Pure, INothing](Algebra.pure[Pure, Unit](())): Stream[Pure, INothing]
fromFreeC[Pure, INothing](Result.Pure[Pure, Unit](())): Stream[Pure, INothing]

/**
* Creates a single element stream that gets its value by evaluating the supplied effect. If the effect fails,
@@ -3125,7 +3126,7 @@ object Stream extends StreamLowPriority {
* }}}
*/
def eval[F[_], O](fo: F[O]): Stream[F, O] =
fromFreeC(Algebra.eval(fo).flatMap(Algebra.output1))
fromFreeC(Eval(fo).flatMap(Algebra.output1))

/**
* Creates a stream that evaluates the supplied `fa` for its effect, discarding the output value.
@@ -3140,11 +3141,11 @@ object Stream extends StreamLowPriority {
* }}}
*/
def eval_[F[_], A](fa: F[A]): Stream[F, INothing] =
fromFreeC(Algebra.eval(fa).map(_ => ()))
fromFreeC(Eval(fa).map(_ => ()))

/** like `eval` but resulting chunk is flatten efficiently **/
def evalUnChunk[F[_], O](fo: F[Chunk[O]]): Stream[F, O] =
fromFreeC(Algebra.eval(fo).flatMap(Algebra.output))
fromFreeC(Eval(fo).flatMap(Output(_)))

/** Like `eval`, but lifts a foldable structure. **/
def evals[F[_], S[_]: Foldable, O](fo: F[S[O]]): Stream[F, O] =
@@ -3306,7 +3307,7 @@ object Stream extends StreamLowPriority {
* This is a low-level method and generally should not be used by user code.
*/
def getScope[F[x] >: Pure[x]]: Stream[F, Scope[F]] =
Stream.fromFreeC(Algebra.getScope[F].flatMap(Algebra.output1(_)))
Stream.fromFreeC(GetScope[F]().flatMap(Algebra.output1(_)))

/**
* A stream that never emits and never terminates.
@@ -3328,7 +3329,7 @@ object Stream extends StreamLowPriority {
* }}}
*/
def raiseError[F[_]: RaiseThrowable](e: Throwable): Stream[F, INothing] =
fromFreeC(Algebra.raiseError(e))
fromFreeC(Result.Fail(e))

/**
* Creates a random stream of integers using a random seed.
@@ -4020,7 +4021,7 @@ object Stream extends StreamLowPriority {
*/
def stepLeg: Pull[F, INothing, Option[StepLeg[F, O]]] =
Pull
.fromFreeC(Algebra.getScope[F])
.fromFreeC(GetScope[F]())
.flatMap { scope =>
new StepLeg[F, O](Chunk.empty, scope.id, self.get).stepLeg
}

0 comments on commit a2220f6

Please sign in to comment.
You can’t perform that action at this time.