Skip to content
Permalink
Browse files

Fix scope leasing in parJoin

  • Loading branch information...
SystemFw committed Feb 23, 2019
1 parent d0d36fd commit c9c317d5fc50af635fe71a4dd9a0633c978fb3d1
@@ -495,7 +495,6 @@ class Pipe2Spec extends Fs2Spec {
}

"interrupt (19)" in {
pending
// interruptible eval

def prg =
@@ -1943,32 +1943,34 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
// note that supplied scope's resources must be leased before the inner stream forks the execution to another thread
// and that it must be released once the inner stream terminates or fails.
def runInner(inner: Stream[F2, O2], outerScope: Scope[F2]): F2[Unit] =
outerScope.lease.flatMap {
case Some(lease) =>
available.acquire >>
incrementRunning >>
F2.start {
inner.chunks
.evalMap(s => outputQ.enqueue1(Some(s)))
.interruptWhen(done.map(_.nonEmpty)) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enq last item while being interrupted
.compile
.drain
.attempt
.flatMap { r =>
lease.cancel.flatMap { cancelResult =>
available.release >>
(CompositeFailure.fromResults(r, cancelResult) match {
case Right(()) => F2.unit
case Left(err) =>
stop(Some(err))
}) >> decrementRunning
F2.uncancelable {
outerScope.lease.flatMap {
case Some(lease) =>
available.acquire >>
incrementRunning >>
F2.start {
inner.chunks
.evalMap(s => outputQ.enqueue1(Some(s)))
.interruptWhen(done.map(_.nonEmpty)) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enq last item while being interrupted
.compile
.drain
.attempt
.flatMap { r =>
lease.cancel.flatMap { cancelResult =>
available.release >>
(CompositeFailure.fromResults(r, cancelResult) match {
case Right(()) => F2.unit
case Left(err) =>
stop(Some(err))
}) >> decrementRunning
}
}
}
}.void
}.void

case None =>
F2.raiseError(
new Throwable("Outer scope is closed during inner stream startup"))
case None =>
F2.raiseError(
new Throwable("Outer scope is closed during inner stream startup"))
}
}

// runs the outer stream, interrupts when kill == true, and then decrements the `running`
@@ -382,7 +382,7 @@ private[fs2] final class CompileScope[F[_]] private (
case Some(iCtx) =>
F.map(
iCtx.concurrent
.race(iCtx.deferred.get, F.attempt(iCtx.concurrent.uncancelable(f)))) {
.race(iCtx.deferred.get, F.attempt(f))) {
case Right(result) => result.leftMap(Left(_))
case Left(other) => Left(other)
}

0 comments on commit c9c317d

Please sign in to comment.
You can’t perform that action at this time.