Permalink
Browse files

Merge branch 'series/0.10' into topic/translateSync

  • Loading branch information...
mpilquist committed Oct 11, 2017
2 parents 3e60422 + a7403e1 commit f98e808369abf8a9f8d8e34d1ea5a80b88288272
@@ -192,5 +192,9 @@ class StreamSpec extends Fs2Spec with Inside {
withClue("false means the delay has not passed: " + tail) { assert(tail.filterNot(_._1).map(_._2).forall { _ <= delay }) }
}
}
"issue #941 - scope closure issue" in {
Stream(1,2,3).map(_ + 1).repeat.zip(Stream(4,5,6).map(_ + 1).repeat).take(4).toList
}
}
}
@@ -27,7 +27,7 @@ object Pipe {
Algebra.runFold_(Algebra.uncons(s.get).flatMap {
case Some((hd,tl)) => Algebra.output1[Read,UO](Some((hd,Stream.fromFreeC(tl))))
case None => Algebra.pure[Read,UO,Unit](())
}, None, None: UO)((x,y) => y, new Algebra.Scope[Read])
}, None, None: UO)((x,y) => y, new Algebra.Scope[Read](None))
}
def go(s: Read[UO]): Stepper[I,O] = Stepper.Suspend { () =>
@@ -23,7 +23,7 @@ object Pipe2 {
Algebra.runFold_(Algebra.uncons(s.get).flatMap {
case Some((hd,tl)) => Algebra.output1[Read,UO](Some((hd,Stream.fromFreeC(tl))))
case None => Algebra.pure[Read,UO,Unit](())
}, None, None: UO)((x,y) => y, new Algebra.Scope[Read])
}, None, None: UO)((x,y) => y, new Algebra.Scope[Read](None))
}
def go(s: Read[UO]): Stepper[I,I2,O] = Stepper.Suspend { () =>
@@ -25,8 +25,8 @@ private[fs2] object Algebra {
extends Algebra[F,X,AsyncPull[F,Option[(Segment[O,Unit],FreeC[Algebra[F,O,?],Unit])]]]
final case class UnconsAsyncDeferred[F[_],G[_],X,Y,O](s: FreeC[Algebra[G,O,?],Unit], gToF: G ~> F, ec: ExecutionContext)
extends Algebra[F,X,AsyncPull[F,Option[(Segment[O,Unit],FreeC[Algebra[F,O,?],Unit])]]]
final case class OpenScope[F[_],O]() extends Algebra[F,O,(Scope[F],Scope[F])]
final case class CloseScope[F[_],O](toClose: Scope[F], scopeAfterClose: Scope[F]) extends Algebra[F,O,Unit]
final case class OpenScope[F[_],O]() extends Algebra[F,O,Scope[F]]
final case class CloseScope[F[_],O](toClose: Scope[F]) extends Algebra[F,O,Unit]
final case class Suspend[F[_],O,R](thunk: () => FreeC[Algebra[F,O,?],R]) extends Algebra[F,O,R]
def output[F[_],O](values: Segment[O,Unit]): FreeC[Algebra[F,O,?],Unit] =
@@ -50,17 +50,17 @@ private[fs2] object Algebra {
def unconsAsync[F[_],X,Y,O](s: FreeC[Algebra[F,O,?],Unit])(implicit F: Effect[F], ec: ExecutionContext): FreeC[Algebra[F,X,?],AsyncPull[F,Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]]] =
FreeC.Eval[Algebra[F,X,?],AsyncPull[F,Option[(Segment[O,Unit],FreeC[Algebra[F,O,?],Unit])]]](UnconsAsync(s, F, ec))
private def openScope[F[_],O]: FreeC[Algebra[F,O,?],(Scope[F],Scope[F])] =
FreeC.Eval[Algebra[F,O,?],(Scope[F],Scope[F])](OpenScope())
private def openScope[F[_],O]: FreeC[Algebra[F,O,?],Scope[F]] =
FreeC.Eval[Algebra[F,O,?],Scope[F]](OpenScope())
private def closeScope[F[_],O](toClose: Scope[F], scopeAfterClose: Scope[F]): FreeC[Algebra[F,O,?],Unit] =
FreeC.Eval[Algebra[F,O,?],Unit](CloseScope(toClose, scopeAfterClose))
private def closeScope[F[_],O](toClose: Scope[F]): FreeC[Algebra[F,O,?],Unit] =
FreeC.Eval[Algebra[F,O,?],Unit](CloseScope(toClose))
def scope[F[_],O,R](pull: FreeC[Algebra[F,O,?],R]): FreeC[Algebra[F,O,?],R] =
openScope flatMap { case (oldScope, newScope) =>
openScope flatMap { newScope =>
FreeC.Bind(pull, (e: Either[Throwable,R]) => e match {
case Left(e) => closeScope(newScope, oldScope) flatMap { _ => fail(e) }
case Right(r) => closeScope(newScope, oldScope) map { _ => r }
case Left(e) => closeScope(newScope) flatMap { _ => fail(e) }
case Right(r) => closeScope(newScope) map { _ => r }
})
}
@@ -73,7 +73,7 @@ private[fs2] object Algebra {
def suspend[F[_],O,R](f: => FreeC[Algebra[F,O,?],R]): FreeC[Algebra[F,O,?],R] =
FreeC.Eval[Algebra[F,O,?],R](Suspend(() => f))
final class Scope[F[_]](implicit F: Sync[F]) {
final class Scope[F[_]](private val parent: Option[Scope[F]])(implicit F: Sync[F]) {
private val monitor = this
private var closing: Boolean = false
private var closed: Boolean = false
@@ -107,12 +107,19 @@ private[fs2] object Algebra {
resources.remove(t)
}
def open: Scope[F] = monitor.synchronized {
if (closing || closed) throw new IllegalStateException("Cannot open new scope on a closed scope")
else {
val spawn = new Scope[F]()
spawns = spawns :+ spawn
spawn
def open: Scope[F] = {
val spawn = monitor.synchronized {
if (closing || closed) None
else {
val spawn = new Scope[F](Some(this))
spawns = spawns :+ spawn
Some(spawn)
}
}
spawn.getOrElse {
// This scope is already closed so try to promote the open to an ancestor; this can fail
// if the root scope has already been closed, in which case, we can safely throw
openAncestor.fold(_ => throw new IllegalStateException("cannot re-open root scope"), _.open)
}
}
@@ -163,6 +170,17 @@ private[fs2] object Algebra {
}
}
def openAncestor: Either[Scope[F],Scope[F]] = {
def loop(s: Scope[F]): Either[Scope[F],Scope[F]] = {
val opened = s.monitor.synchronized(!(s.closing || s.closed))
if (opened) Right(s) else s.parent match {
case Some(s2) => loop(s2)
case None => Left(s)
}
}
parent.toRight(this).flatMap(loop)
}
override def toString: String = ##.toString
}
@@ -202,7 +220,7 @@ private[fs2] object Algebra {
/** Left-fold the output of a stream. */
def runFold[F2[_],O,B](stream: FreeC[Algebra[F2,O,?],Unit], init: B)(f: (B, O) => B)(implicit F: Sync[F2]): F2[B] =
F.flatMap(F.delay(new Scope[F2]())) { scope =>
F.flatMap(F.delay(new Scope[F2](None))) { scope =>
F.flatMap(F.attempt(runFold_(stream, None, init)(f, scope))) {
case Left(t) => F.flatMap(scope.close(None))(_ => F.raiseError(t))
case Right(b) => F.flatMap(scope.close(None))(_ => F.pure(b))
@@ -261,13 +279,14 @@ private[fs2] object Algebra {
case c: Algebra.CloseScope[F2,_] =>
F.flatMap(c.toClose.close(asyncSupport)) { e =>
go(c.scopeAfterClose, asyncSupport, acc, f(e).viewL)
val scopeAfterClose = c.toClose.openAncestor.fold(identity, identity)
go(scopeAfterClose, asyncSupport, acc, f(e).viewL)
}
case o: Algebra.OpenScope[F2,_] =>
F.suspend {
val innerScope = scope.open
go(innerScope, asyncSupport, acc, f(Right(scope -> innerScope)).viewL)
go(innerScope, asyncSupport, acc, f(Right(innerScope)).viewL)
}
case unconsAsync: Algebra.UnconsAsync[F2,_,_,_] =>

0 comments on commit f98e808

Please sign in to comment.