Skip to content
Permalink
Browse files

Removed dead code path in Stream.bracket

  • Loading branch information...
mpilquist committed Dec 15, 2018
1 parent 9045033 commit 69e2297ccc93fd619b647db2c4172e60235024ae
@@ -16,3 +16,5 @@ z_local.sbt
.tags
.metaserver
test-output
.metals
.bloop
@@ -210,7 +210,8 @@ lazy val mimaSettings = Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.tcp.Socket.mkSocket"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.udp.Socket.mkSocket"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.Pipe.joinQueued"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.Pipe.joinAsync")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.Pipe.joinAsync"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.Stream.bracketFinalizer")
)
)

@@ -236,7 +236,7 @@ object Pull extends PullLowPriority {
fromFreeC(Algebra.suspend(p.get))

private def release[F[x] >: Pure[x]](token: Token): Pull[F, INothing, Unit] =
fromFreeC[F, INothing, Unit](Algebra.release(token, None))
fromFreeC[F, INothing, Unit](Algebra.release(token))

/** `Sync` instance for `Pull`. */
implicit def syncInstance[F[_], O](
@@ -2708,7 +2708,7 @@ object Stream extends StreamLowPriority {
release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, R] =
fromFreeC(Algebra.acquire[F, R, R](acquire, release).flatMap {
case (r, token) =>
Stream.emit(r).covary[F].get[F, R].transformWith(bracketFinalizer(token))
Stream.emit(r).covary[F].get[F, R] >> Algebra.release(token)
})

/**
@@ -2732,7 +2732,7 @@ object Stream extends StreamLowPriority {
def bracketCaseCancellable[F[x] >: Pure[x], R](acquire: F[R])(
release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, (Stream[F, Unit], R)] =
bracketWithToken(acquire)(release).map {
case (token, r) => (Stream.fromFreeC(Algebra.release(token, None)) ++ Stream.emit(()), r)
case (token, r) => (Stream.fromFreeC(Algebra.release(token)) ++ Stream.emit(()), r)
}

private[fs2] def bracketWithToken[F[x] >: Pure[x], R](acquire: F[R])(
@@ -2743,33 +2743,9 @@ object Stream extends StreamLowPriority {
.emit(r)
.covary[F]
.map(o => (token, o))
.get[F, (Token, R)]
.transformWith(bracketFinalizer(token))
.get[F, (Token, R)] >> Algebra.release(token)
})

private[fs2] def bracketFinalizer[F[_], O](token: Token)(
r: Result[Unit]): FreeC[Algebra[F, O, ?], Unit] =
r match {

case Result.Fail(err) =>
Algebra.release(token, Some(err)).transformWith {
case Result.Pure(_) => Algebra.raiseError(err)
case Result.Fail(err2) =>
if (!err.eq(err2)) Algebra.raiseError(CompositeFailure(err, err2))
else Algebra.raiseError(err)
case Result.Interrupted(_, _) =>
Algebra.raiseError(new Throwable(s"Cannot interrupt while releasing resource ($err)"))
}

case Result.Interrupted(scopeId, err) =>
// this is interrupted lets leave the release util the scope terminates
Result.Interrupted(scopeId, err)

case Result.Pure(_) =>
// the stream finsihed, lets clean up any resources
Algebra.release(token, None)
}

/**
* Creates a pure stream that emits the elements of the supplied chunk.
*
@@ -35,7 +35,7 @@ private[fs2] object Algebra {
final case class Acquire[F[_], R](resource: F[R], release: (R, ExitCase[Throwable]) => F[Unit])
extends AlgEffect[F, (R, Token)]

final case class Release[F[_]](token: Token, err: Option[Throwable]) extends AlgEffect[F, Unit]
final case class Release[F[_]](token: Token) extends AlgEffect[F, Unit]

final case class OpenScope[F[_]](interruptible: Option[Concurrent[F]]) extends AlgEffect[F, Token]

@@ -62,8 +62,8 @@ private[fs2] object Algebra {
release: (R, ExitCase[Throwable]) => F[Unit]): FreeC[Algebra[F, O, ?], (R, Token)] =
FreeC.Eval[Algebra[F, O, ?], (R, Token)](Acquire(resource, release))

def release[F[_], O](token: Token, err: Option[Throwable]): FreeC[Algebra[F, O, ?], Unit] =
FreeC.Eval[Algebra[F, O, ?], Unit](Release(token, err))
def release[F[_], O](token: Token): FreeC[Algebra[F, O, ?], Unit] =
FreeC.Eval[Algebra[F, O, ?], Unit](Release(token))

/**
* Steps through the stream, providing either `uncons` or `stepLeg`.
@@ -290,11 +290,7 @@ private[fs2] object Algebra {
}

case release: Algebra.Release[F] =>
F.flatMap(
scope.releaseResource(release.token,
release.err
.map(ExitCase.error)
.getOrElse(ExitCase.Completed))) { r =>
F.flatMap(scope.releaseResource(release.token, ExitCase.Completed)) { r =>
go[X](scope, view.next(Result.fromEither(r)))
}

0 comments on commit 69e2297

Please sign in to comment.
You can’t perform that action at this time.