Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BIO: Fix F.timeout and F.raceWith erroneously making actions interruptible inside uninterruptible regions #1978

Merged
merged 3 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.testkit.TestInstances
import cats.kernel.Order
import izumi.functional.bio.Exit.{CatsExit, ZIOExit}
import izumi.functional.bio.data.Morphism1
import izumi.functional.bio.{Clock1, Clock2, IO2}
import izumi.functional.bio.{Clock1, Clock2}
import org.scalacheck.{Arbitrary, Cogen, Prop}
import zio.{Clock, Duration, Executor, IO, Runtime, Scheduler, Task, Trace, UIO, Unsafe, ZIO, ZLayer}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package izumi.functional.bio.test

import izumi.functional.bio.*
import org.scalatest.Assertion
import org.scalatest.wordspec.AnyWordSpec
import zio.ZIO

class ZIOWorkaroundsTest extends AnyWordSpec {

"Issue https://github.com/zio/zio/issues/6911" should {
val runtime = UnsafeRun2.createZIO()

val runtime = zio.Runtime.default
implicit val unsafe: zio.Unsafe = zio.Unsafe.unsafe(identity)
"Issue https://github.com/zio/zio/issues/6911" should {

"not reproduce with F.parTraverse" in runtime.unsafe.run {
"not reproduce with F.parTraverse" in runtime.unsafeRun {
F.parTraverse(
List(
F.unit.forever,
Expand All @@ -25,7 +25,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.parTraverse_" in runtime.unsafe.run {
"not reproduce with F.parTraverse_" in runtime.unsafeRun {
F.parTraverse_(
List(
F.unit.forever,
Expand All @@ -39,7 +39,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.parTraverseN" in runtime.unsafe.run {
"not reproduce with F.parTraverseN" in runtime.unsafeRun {
F.parTraverseN(2)(
List(
F.unit.forever,
Expand All @@ -53,7 +53,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.parTraverseN_" in runtime.unsafe.run {
"not reproduce with F.parTraverseN_" in runtime.unsafeRun {
F.parTraverseN_(2)(
List(
F.unit.forever,
Expand All @@ -67,7 +67,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.race" in runtime.unsafe.run {
"not reproduce with F.race" in runtime.unsafeRun {
F.race(
F.unit.forever,
F.terminate(new RuntimeException("testexception")),
Expand All @@ -80,7 +80,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.racePairUnsafe" in runtime.unsafe.run {
"not reproduce with F.racePairUnsafe" in runtime.unsafeRun {
F.racePairUnsafe(
F.unit.forever,
F.terminate(new RuntimeException("testexception")),
Expand All @@ -92,7 +92,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.zipWithPar" in runtime.unsafe.run {
"not reproduce with F.zipWithPar" in runtime.unsafeRun {
F.zipWithPar(
F.unit.forever.widen[Unit],
F.terminate(new RuntimeException("testexception")).widen[Unit],
Expand All @@ -104,7 +104,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.zipPar" in runtime.unsafe.run {
"not reproduce with F.zipPar" in runtime.unsafeRun {
F.zipPar(
F.unit.forever,
F.terminate(new RuntimeException("testexception")),
Expand All @@ -116,7 +116,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.zipParLeft" in runtime.unsafe.run {
"not reproduce with F.zipParLeft" in runtime.unsafeRun {
F.zipParLeft(
F.unit.forever,
F.terminate(new RuntimeException("testexception")),
Expand All @@ -128,7 +128,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"not reproduce with F.zipParRight" in runtime.unsafe.run {
"not reproduce with F.zipParRight" in runtime.unsafeRun {
F.zipParRight(
F.unit.forever,
F.terminate(new RuntimeException("testexception")),
Expand All @@ -140,7 +140,7 @@ class ZIOWorkaroundsTest extends AnyWordSpec {
}
}

"guaranteeExceptOnInterrupt works correctly" in runtime.unsafe.run {
"guaranteeExceptOnInterrupt works correctly" in runtime.unsafeRun {
for {
succRes <- F.mkRef(Option.empty[Boolean])
failRes <- F.mkRef(Option.empty[Boolean])
Expand Down Expand Up @@ -175,4 +175,129 @@ class ZIOWorkaroundsTest extends AnyWordSpec {

}

"Issue https://github.com/zio/zio/issues/8243 leaking 'interruption inheritance' into BIO" should {
// related issues https://github.com/zio/zio/issues/5459 https://github.com/zio/zio/issues/3100 https://github.com/zio/zio/issues/3065 https://github.com/zio/zio/issues/1764

"F.timeout in uninterruptible region is correctly not interrupted when parent is interrupted" in {
import scala.concurrent.duration.*

def test[F[+_, +_]: Async2: Temporal2: Primitives2: Fork2]: F[Nothing, Assertion] = {
for {
fiberStarted <- F.mkLatch
stopFiber <- F.mkLatch
innerFiberNotInterrupted <- F.mkLatch
fiberNotInterrupted1 <- F.mkRef(false)
fiberNotInterrupted2 <- F.mkRef(Option.empty[Boolean])
fiber <- F.fork {
F.uninterruptible(
F.timeout(5.hours)(
(fiberStarted.succeed(()) *>
F.sleep(1.second) *>
stopFiber.await *>
innerFiberNotInterrupted.succeed(()))
.guaranteeExceptOnInterrupt(_ => fiberNotInterrupted1.set(true))
).flatMap(promiseModifiedOrTimedOut => fiberNotInterrupted2.set(promiseModifiedOrTimedOut))
)
}
_ <- fiberStarted.await
innerIsNotInterrupted <- fiber.interrupt
.as(false)
.race(
F.sleep(1.second) *>
stopFiber.succeed(()) *>
innerFiberNotInterrupted.await.as(true)
)
isNotInterrupted1 <- fiberNotInterrupted1.get
isNotInterrupted2 <- fiberNotInterrupted2.get
} yield assert(innerIsNotInterrupted && isNotInterrupted1 && (isNotInterrupted2 == Some(true)))
}

runtime.unsafeRun(test[zio.IO])
}

"F.race in uninterruptible region is correctly not interrupted when parent is interrupted" in {
import scala.concurrent.duration.*

def test[F[+_, +_]: Async2: Temporal2: Primitives2: Fork2]: F[Nothing, Assertion] = {
for {
fiberStarted <- F.mkLatch
stopFiber <- F.mkLatch
innerFiberNotInterrupted <- F.mkLatch
fiberNotInterrupted1 <- F.mkRef(false)
fiberNotInterrupted2 <- F.mkRef(Option.empty[Boolean])
fiber <- F.fork {
F.uninterruptible(
F.race(
r1 = (
fiberStarted.succeed(()) *>
F.sleep(1.second) *>
stopFiber.await *>
innerFiberNotInterrupted
.succeed(())
.map(Some(_))
).guaranteeExceptOnInterrupt(_ => fiberNotInterrupted1.set(true)),
r2 = F.sleep(5.hours).as(None),
).flatMap(promiseModifiedOrTimedOut => fiberNotInterrupted2.set(promiseModifiedOrTimedOut))
)
}
_ <- fiberStarted.await
innerIsNotInterrupted <- fiber.interrupt
.as(false)
.race(
F.sleep(1.second) *>
stopFiber.succeed(()) *>
innerFiberNotInterrupted.await.as(true)
)
isNotInterrupted1 <- fiberNotInterrupted1.get
isNotInterrupted2 <- fiberNotInterrupted2.get
} yield assert(innerIsNotInterrupted && isNotInterrupted1 && (isNotInterrupted2 == Some(true)))
}

runtime.unsafeRun(test[zio.IO])
}

}

"avoid sleep/race 'interruption inheritance' in ZIO" should {

"F.timeout interrupts the timed action correctly within an uninterruptible region" in {
import scala.concurrent.duration.*

def test[F[+_, +_]: Async2: Temporal2: Primitives2: Fork2]: F[String, Assertion] = {
for {
fiber <- F.fork {
F.uninterruptible(
F.timeout(1.seconds)(F.never)
)
}
res <- fiber.join.timeoutFail("timed out")(1.minute)
} yield assert(res.isEmpty)
}

runtime.unsafeRun(test[zio.IO])
}

"F.race interrupts the timed action correctly within an uninterruptible region" in {
import scala.concurrent.duration.*

def test[F[+_, +_]: Async2: Temporal2: Primitives2: Fork2]: F[String, Assertion] = {
for {
started <- F.mkLatch
finish <- F.mkLatch
fiber <- F.fork {
F.uninterruptible(
F.race(started.succeed(()) *> finish.await.as(1), F.never)
)
}
_ <- started.await
_ <- finish.succeed(())
res <- fiber.join.timeoutFail("timed out")(1.minute)
} yield assert(res == 1)
}

runtime.unsafeRun(test[zio.IO])
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import izumi.functional.bio.Exit.ZIOExit
import izumi.functional.bio.data.{Morphism3, RestoreInterruption3}
import izumi.functional.bio.{Async3, Exit, Fiber2, Fiber3, __PlatformSpecific}
import izumi.fundamentals.platform.language.Quirks.Discarder
import zio._izumicompat_.__ZIOWithFiberRuntime
import zio._izumicompat_.{__ZIORaceCompat, __ZIOWithFiberRuntime}
import zio.internal.stacktracer.{InteropTracer, Tracer}
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.{ZEnvironment, ZIO}
Expand Down Expand Up @@ -226,7 +226,7 @@ open class AsyncZio extends Async3[ZIO] /*with Local3[ZIO]*/ {
@inline override final def race[R, E, A](r1: ZIO[R, E, A], r2: ZIO[R, E, A]): ZIO[R, E, A] = {
implicit val trace: zio.Trace = Tracer.instance.empty

r1.interruptible.raceFirst(r2.interruptible)
__ZIORaceCompat.raceFirst(r1.interruptible, r2.interruptible)
}

@inline override final def racePairUnsafe[R, E, A, B](
Expand All @@ -237,9 +237,10 @@ open class AsyncZio extends Async3[ZIO] /*with Local3[ZIO]*/ {

val interrupted1 = new AtomicBoolean(true)
val interrupted2 = new AtomicBoolean(true)
(ZIOExit.ZIOSignalOnNoExternalInterruptFailure(r1.interruptible)(sync(interrupted1.set(false)))
raceWith
ZIOExit.ZIOSignalOnNoExternalInterruptFailure(r2.interruptible)(sync(interrupted2.set(false))))(
__ZIORaceCompat.raceWith(
ZIOExit.ZIOSignalOnNoExternalInterruptFailure(r1.interruptible)(sync(interrupted1.set(false))),
ZIOExit.ZIOSignalOnNoExternalInterruptFailure(r2.interruptible)(sync(interrupted2.set(false))),
)(
{ case (l, f) => ZIO.succeed(Left((ZIOExit.toExit(l)(interrupted1.get()), Fiber2.fromZIO(sync(interrupted2.get()))(f)))) },
{ case (r, f) => ZIO.succeed(Right((Fiber2.fromZIO(sync(interrupted1.get()))(f), ZIOExit.toExit(r)(interrupted2.get())))) },
)
Expand Down
Loading
Loading