Permalink
Browse files

Merge pull request #948 from mpilquist/topic/translateSync

Fixed issue where translate after translateSync should restore any UnconsAsync steps
  • Loading branch information...
mpilquist committed Oct 12, 2017
2 parents a7403e1 + f98e808 commit 4bed5698f105349c0077db7190e2d3a24430c93b
@@ -109,6 +109,10 @@ class StreamSpec extends Fs2Spec with Inside {
runLog(s.get.translateSync(cats.arrow.FunctionK.id[Pure]).covary[IO]) shouldBe runLog(s.get)
}
"translateSync followed by translate" in forAll { (s: PureStream[Int]) =>
runLog(s.get.covary[IO].prefetch.translateSync(cats.arrow.FunctionK.id).translate(cats.arrow.FunctionK.id).covary[IO]) shouldBe runLog(s.get)
}
"toList" in forAll { (s: PureStream[Int]) =>
s.get.toList shouldBe runLog(s.get).toList
}
@@ -23,6 +23,8 @@ private[fs2] object Algebra {
final case class Release[F[_],O](token: Token) extends Algebra[F,O,Unit]
final case class UnconsAsync[F[_],X,Y,O](s: FreeC[Algebra[F,O,?],Unit], effect: Effect[F], ec: ExecutionContext)
extends Algebra[F,X,AsyncPull[F,Option[(Segment[O,Unit],FreeC[Algebra[F,O,?],Unit])]]]
final case class UnconsAsyncDeferred[F[_],G[_],X,Y,O](s: FreeC[Algebra[G,O,?],Unit], gToF: G ~> F, ec: ExecutionContext)
extends Algebra[F,X,AsyncPull[F,Option[(Segment[O,Unit],FreeC[Algebra[F,O,?],Unit])]]]
final case class OpenScope[F[_],O]() extends Algebra[F,O,Scope[F]]
final case class CloseScope[F[_],O](toClose: Scope[F]) extends Algebra[F,O,Unit]
final case class Suspend[F[_],O,R](thunk: () => FreeC[Algebra[F,O,?],R]) extends Algebra[F,O,R]
@@ -306,6 +308,9 @@ private[fs2] object Algebra {
}
F.flatMap(asyncPull) { ap => go(scope, asyncSupport, acc, f(Right(ap)).viewL) }
case deferred: Algebra.UnconsAsyncDeferred[_,_,_,_,_] =>
F.raiseError(new IllegalStateException("unconsAsync encountered but stream was translated via translateSync - try translating again via translate before running"))
case s: Algebra.Suspend[F2,O,_] =>
F.suspend {
try go(scope, asyncSupport, acc, FreeC.Bind(s.thunk(), f).viewL)
@@ -334,9 +339,15 @@ private[fs2] object Algebra {
case ua: UnconsAsync[F,_,_,_] =>
val uu: UnconsAsync[F2,Any,Any,Any] = ua.asInstanceOf[UnconsAsync[F2,Any,Any,Any]]
G match {
case None => Algebra.Eval(u(uu.effect.raiseError[X](new IllegalStateException("unconsAsync encountered while translating synchronously"))))
case None => UnconsAsyncDeferred(uu.s, u, uu.ec).asInstanceOf[Algebra[G,O2,X]]
case Some(ef) => UnconsAsync(uu.s.translate[Algebra[G,Any,?]](algFtoG), ef, uu.ec).asInstanceOf[Algebra[G,O2,X]]
}
case ua: UnconsAsyncDeferred[_,_,_,_,_] =>
val uu: UnconsAsyncDeferred[F2,Any,Any,Any,Any] = ua.asInstanceOf[UnconsAsyncDeferred[F2,Any,Any,Any,Any]]
G match {
case None => UnconsAsyncDeferred[G,Any,Any,Any,Any](uu.s, uu.gToF andThen u, uu.ec).asInstanceOf[Algebra[G,O2,X]]
case Some(ef) => UnconsAsync[G,Any,Any,Any](translate[Any,G,Any,Unit](uu.s, uu.gToF andThen u, G), ef, uu.ec).asInstanceOf[Algebra[G,O2,X]]
}
case s: Suspend[F,O2,X] => Suspend(() => s.thunk().translate(algFtoG))
}
}

0 comments on commit 4bed569

Please sign in to comment.