diff --git a/arrow-libs/fx/arrow-fx-coroutines-test/api/arrow-fx-coroutines-test.api b/arrow-libs/fx/arrow-fx-coroutines-test/api/arrow-fx-coroutines-test.api index d901eb7c0cb..7a122419aa3 100644 --- a/arrow-libs/fx/arrow-fx-coroutines-test/api/arrow-fx-coroutines-test.api +++ b/arrow-libs/fx/arrow-fx-coroutines-test/api/arrow-fx-coroutines-test.api @@ -11,6 +11,8 @@ public final class arrow/fx/coroutines/NamedThreadFactory : java/util/concurrent public final class arrow/fx/coroutines/Predef_testKt { public static final fun assertThrowable (Lkotlin/jvm/functions/Function0;)Ljava/lang/Throwable; + public static final fun awaitExitCase (Lkotlinx/coroutines/CompletableDeferred;Lkotlinx/coroutines/CompletableDeferred;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitExitCase (Lkotlinx/coroutines/channels/Channel;Lkotlinx/coroutines/CompletableDeferred;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun charRange (Lio/kotest/property/Arb$Companion;)Lio/kotest/property/Arb; public static final fun either (Larrow/core/Either;)Lio/kotest/matchers/Matcher; public static final fun either (Lio/kotest/property/Arb$Companion;Lio/kotest/property/Arb;Lio/kotest/property/Arb;)Lio/kotest/property/Arb; diff --git a/arrow-libs/fx/arrow-fx-coroutines-test/src/commonMain/kotlin/arrow/fx/coroutines/predef-test.kt b/arrow-libs/fx/arrow-fx-coroutines-test/src/commonMain/kotlin/arrow/fx/coroutines/predef-test.kt index 1da3ebc7432..8a399f90b7a 100644 --- a/arrow-libs/fx/arrow-fx-coroutines-test/src/commonMain/kotlin/arrow/fx/coroutines/predef-test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines-test/src/commonMain/kotlin/arrow/fx/coroutines/predef-test.kt @@ -25,9 +25,6 @@ import io.kotest.property.arbitrary.list import io.kotest.property.arbitrary.long import io.kotest.property.arbitrary.map import io.kotest.property.arbitrary.string -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED @@ -35,7 +32,12 @@ import kotlin.coroutines.intrinsics.intercepted import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn import kotlin.coroutines.resume import kotlin.coroutines.startCoroutine +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.emptyFlow @@ -217,3 +219,15 @@ public fun either(e: Either): Matcher> = is Either.Right -> equalityMatcher(e).test(value) } } + +public suspend fun awaitExitCase(send: Channel, exit: CompletableDeferred): A = + guaranteeCase({ + send.receive() + awaitCancellation() + }) { ex -> exit.complete(ex) } + +public suspend fun awaitExitCase(start: CompletableDeferred, exit: CompletableDeferred): A = + guaranteeCase({ + start.complete(Unit) + awaitCancellation() + }) { ex -> exit.complete(ex) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/ParZip.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/ParZip.kt index ca6f286dcc9..bdb6be36de2 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/ParZip.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/ParZip.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.coroutineScope import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext +import kotlinx.coroutines.awaitAll /** * Runs [fa], [fb] in parallel on [Dispatchers.Default] and combines their results using the provided function. @@ -76,9 +77,10 @@ public suspend inline fun parZip( crossinline fb: suspend CoroutineScope.() -> B, crossinline f: suspend CoroutineScope.(A, B) -> C ): C = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - f(a.await(), b.await()) + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val (a, b) = awaitAll(faa, fbb) + f(a as A, b as B) } /** @@ -155,10 +157,11 @@ public suspend inline fun parZip( crossinline fc: suspend CoroutineScope.() -> C, crossinline f: suspend CoroutineScope.(A, B, C) -> D ): D = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - val c = async(ctx) { fc() } - f(a.await(), b.await(), c.await()) + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val fcc = async(ctx) { fc() } + val (a, b, c) = awaitAll(faa, fbb, fcc) + f(a as A, b as B, c as C) } /** @@ -242,11 +245,12 @@ public suspend inline fun parZip( crossinline fd: suspend CoroutineScope.() -> D, crossinline f: suspend CoroutineScope.(A, B, C, D) -> E ): E = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - val c = async(ctx) { fc() } - val d = async(ctx) { fd() } - f(a.await(), b.await(), c.await(), d.await()) + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val fcc = async(ctx) { fc() } + val fdd = async(ctx) { fd() } + val (a, b, c, d) = awaitAll(faa, fbb, fcc, fdd) + f(a as A, b as B, c as C, d as D) } /** @@ -337,12 +341,13 @@ public suspend inline fun parZip( crossinline fe: suspend CoroutineScope.() -> E, crossinline f: suspend CoroutineScope.(A, B, C, D, E) -> F ): F = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - val c = async(ctx) { fc() } - val d = async(ctx) { fd() } - val e = async(ctx) { fe() } - f(a.await(), b.await(), c.await(), d.await(), e.await()) + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val fcc = async(ctx) { fc() } + val fdd = async(ctx) { fd() } + val fee = async(ctx) { fe() } + val (a, b, c, d, e) = awaitAll(faa, fbb, fcc, fdd, fee) + f(a as A, b as B, c as C, d as D, e as E) } /** @@ -439,13 +444,14 @@ public suspend inline fun parZip( crossinline ff: suspend CoroutineScope.() -> F, crossinline f: suspend CoroutineScope.(A, B, C, D, E, F) -> G ): G = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - val c = async(ctx) { fc() } - val d = async(ctx) { fd() } - val e = async(ctx) { fe() } - val g = async(ctx) { ff() } - f(a.await(), b.await(), c.await(), d.await(), e.await(), g.await()) + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val fcc = async(ctx) { fc() } + val fdd = async(ctx) { fd() } + val fee = async(ctx) { fe() } + val fgg = async(ctx) { ff() } + val res = awaitAll(faa, fbb, fcc, fdd, fee, fgg) + f(res[0] as A, res[1] as B, res[2] as C, res[3] as D, res[4] as E, res[5] as F) } /** @@ -548,14 +554,15 @@ public suspend inline fun parZip( crossinline fg: suspend CoroutineScope.() -> G, crossinline f: suspend CoroutineScope.(A, B, C, D, E, F, G) -> H ): H = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - val c = async(ctx) { fc() } - val d = async(ctx) { fd() } - val e = async(ctx) { fe() } + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val fcc = async(ctx) { fc() } + val fdd = async(ctx) { fd() } + val fee = async(ctx) { fe() } val fDef = async(ctx) { ff() } - val g = async(ctx) { fg() } - f(a.await(), b.await(), c.await(), d.await(), e.await(), fDef.await(), g.await()) + val fgg = async(ctx) { fg() } + val res = awaitAll(faa, fbb, fcc, fdd, fee, fDef, fgg) + f(res[0] as A, res[1] as B, res[2] as C, res[3] as D, res[4] as E, res[5] as F, res[6] as G) } /** @@ -663,13 +670,14 @@ public suspend inline fun parZip( crossinline fh: suspend CoroutineScope.() -> H, crossinline f: suspend CoroutineScope.(A, B, C, D, E, F, G, H) -> I ): I = coroutineScope { - val a = async(ctx) { fa() } - val b = async(ctx) { fb() } - val c = async(ctx) { fc() } - val d = async(ctx) { fd() } - val e = async(ctx) { fe() } + val faa = async(ctx) { fa() } + val fbb = async(ctx) { fb() } + val fcc = async(ctx) { fc() } + val fdd = async(ctx) { fd() } + val fee = async(ctx) { fe() } val fDef = async(ctx) { ff() } - val g = async(ctx) { fg() } - val h = async(ctx) { fh() } - f(a.await(), b.await(), c.await(), d.await(), e.await(), fDef.await(), g.await(), h.await()) + val fgg = async(ctx) { fg() } + val fhh = async(ctx) { fh() } + val res = awaitAll(faa, fbb, fcc, fdd, fee, fDef, fgg, fhh) + f(res[0] as A, res[1] as B, res[2] as C, res[3] as D, res[4] as E, res[5] as F, res[6] as G, res[7] as H) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/BracketCaseTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/BracketCaseTest.kt index 41fffe9796d..169069090ea 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/BracketCaseTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/BracketCaseTest.kt @@ -16,20 +16,28 @@ class BracketCaseTest : ArrowFxSpec( spec = { "Immediate acquire bracketCase finishes successfully" { checkAll(Arb.int(), Arb.int()) { a, b -> + var once = true bracketCase( acquire = { a }, use = { aa -> Pair(aa, b) }, - release = { _, _ -> Unit } + release = { _, _ -> + require(once) + once = false + } ) shouldBe Pair(a, b) } } "Suspended acquire bracketCase finishes successfully" { checkAll(Arb.int(), Arb.int()) { a, b -> + var once = true bracketCase( acquire = { a.suspend() }, use = { aa -> Pair(aa, b) }, - release = { _, _ -> Unit } + release = { _, _ -> + require(once) + once = false + } ) shouldBe Pair(a, b) } } @@ -60,20 +68,28 @@ class BracketCaseTest : ArrowFxSpec( "Immediate use bracketCase finishes successfully" { checkAll(Arb.int(), Arb.int()) { a, b -> + var once = true bracketCase( acquire = { a }, use = { aa -> Pair(aa, b).suspend() }, - release = { _, _ -> Unit } + release = { _, _ -> + require(once) + once = false + } ) shouldBe Pair(a, b) } } "Suspended use bracketCase finishes successfully" { checkAll(Arb.int(), Arb.int()) { a, b -> + var once = true bracketCase( acquire = { a }, use = { aa -> Pair(aa, b).suspend() }, - release = { _, _ -> Unit } + release = { _, _ -> + require(once) + once = false + } ) shouldBe Pair(a, b) } } @@ -309,7 +325,7 @@ class BracketCaseTest : ArrowFxSpec( mVar.send(y) }, use = { never() }, - release = { _, exitCase -> p.complete(exitCase) } + release = { _, exitCase -> require(p.complete(exitCase)) } ) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt index b7dd5401a8c..144c81db9d3 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt @@ -17,7 +17,7 @@ class GuaranteeCaseTest : ArrowFxSpec( val res = guaranteeCase( fa = { i }, - finalizer = { ex -> p.complete(ex) } + finalizer = { ex -> require(p.complete(ex)) } ) p.await() shouldBe ExitCase.Completed @@ -31,7 +31,7 @@ class GuaranteeCaseTest : ArrowFxSpec( val attempted = Either.catch { guaranteeCase( fa = { throw e }, - finalizer = { ex -> p.complete(ex) } + finalizer = { ex -> require(p.complete(ex)) } ) } @@ -50,7 +50,7 @@ class GuaranteeCaseTest : ArrowFxSpec( start.complete(Unit) never() }, - finalizer = { ex -> p.complete(ex) } + finalizer = { ex -> require(p.complete(ex)) } ) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap2Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap2Test.kt index 33bef476544..9f8ef59c261 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap2Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap2Test.kt @@ -4,6 +4,7 @@ import arrow.core.Either import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.leftException import arrow.fx.coroutines.never @@ -11,15 +12,20 @@ import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.boolean import io.kotest.property.arbitrary.int -import kotlinx.coroutines.CoroutineScope +import io.kotest.property.arbitrary.string +import kotlin.time.ExperimentalTime +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.async +import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.channels.Channel +@OptIn(ExperimentalTime::class) class ParMap2Test : ArrowFxSpec( spec = { @@ -51,8 +57,10 @@ class ParMap2Test : ArrowFxSpec( val pa = CompletableDeferred>() val pb = CompletableDeferred>() - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = + { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } + val loserB: suspend CoroutineScope.() -> Int = + { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } val f = async { parZip(loserA, loserB) { _a, _b -> Pair(_a, _b) } } @@ -62,38 +70,50 @@ class ParMap2Test : ArrowFxSpec( pa.await().let { (res, exit) -> res shouldBe a - exit.shouldBeInstanceOf() + exit.shouldBeTypeOf() } pb.await().let { (res, exit) -> res shouldBe b - exit.shouldBeInstanceOf() + exit.shouldBeTypeOf() } } } "parMapN 2 cancels losers if a failure occurs in one of the tasks" { - checkAll( - Arb.throwable(), - Arb.boolean(), - Arb.int() - ) { e, leftWinner, a -> + checkAll(Arb.throwable(), Arb.boolean()) { e, leftWinner -> val s = Channel() - val pa = CompletableDeferred>() + val pa = CompletableDeferred() val winner: suspend CoroutineScope.() -> Unit = { s.send(Unit); throw e } - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = + { guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pa.complete(ex) } } val r = Either.catch { if (leftWinner) parZip(winner, loserA) { _, _ -> Unit } else parZip(loserA, winner) { _, _ -> Unit } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string()) { msg -> + val exit = CompletableDeferred() + val start = CompletableDeferred() + try { + parZip({ + awaitExitCase(start, exit) + }, { + start.await() + throw CancellationException(msg) + }) { _, _ -> } + } catch (e: CancellationException) { + e.message shouldBe msg + } + exit.await().shouldBeTypeOf() + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap3Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap3Test.kt index d971893199f..cd2fce7349a 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap3Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap3Test.kt @@ -4,17 +4,18 @@ import arrow.core.Either import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.leftException -import arrow.fx.coroutines.never import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -51,18 +52,18 @@ class ParMap3Test : ArrowFxSpec( } "Cancelling parMapN 3 cancels all participants" { - checkAll(Arb.int(), Arb.int(), Arb.int()) { a, b, c -> + checkAll { val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() val loserA: suspend CoroutineScope.() -> Int = - { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } + { awaitExitCase(s, pa) } val loserB: suspend CoroutineScope.() -> Int = - { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } + { awaitExitCase(s, pb) } val loserC: suspend CoroutineScope.() -> Int = - { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } + { awaitExitCase(s, pc) } val f = async { parZip(loserA, loserB, loserC) { _a, _b, _c -> Triple(_a, _b, _c) } } @@ -71,18 +72,9 @@ class ParMap3Test : ArrowFxSpec( s.send(Unit) f.cancel() - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() } } @@ -90,37 +82,53 @@ class ParMap3Test : ArrowFxSpec( checkAll( Arb.throwable(), Arb.element(listOf(1, 2, 3)), - Arb.int(), - Arb.int() - ) { e, winningTask, a, b -> + ) { e, winningTask -> val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() val winner: suspend CoroutineScope.() -> Int = { s.send(Unit); s.send(Unit); throw e } val loserA: suspend CoroutineScope.() -> Int = - { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } + { awaitExitCase(s, pa) } val loserB: suspend CoroutineScope.() -> Int = - { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } + { awaitExitCase(s, pb) } val r = Either.catch { when (winningTask) { - 1 -> parZip(winner, loserA, loserB) { _, _, _ -> Unit } - 2 -> parZip(loserA, winner, loserB) { _, _, _ -> Unit } - else -> parZip(loserA, loserB, winner) { _, _, _ -> Unit } + 1 -> parZip(winner, loserA, loserB) { _, _, _ -> } + 2 -> parZip(loserA, winner, loserB) { _, _, _ -> } + else -> parZip(loserA, loserB, winner) { _, _, _ -> } } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string(), Arb.int(1..3)) { msg, cancel -> + val s = Channel() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + + val winner: suspend CoroutineScope.() -> Int = { repeat(2) { s.send(Unit) }; throw CancellationException(msg) } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + + try { + when (cancel) { + 1 -> parZip(winner, loserA, loserB) { _, _, _ -> } + 2 -> parZip(loserA, winner, loserB) { _, _, _ -> } + else -> parZip(loserA, loserB, winner) { _, _, _ -> } + } + } catch (e: CancellationException) { + e.message shouldBe msg + } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap4Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap4Test.kt index 72887eb709f..b3cbc921b31 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap4Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap4Test.kt @@ -5,17 +5,18 @@ import arrow.core.Tuple4 import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.leftException -import arrow.fx.coroutines.never import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -58,40 +59,27 @@ class ParMap4Test : ArrowFxSpec( } "Cancelling parMapN 4 cancels all participants" { - checkAll(Arb.int(), Arb.int(), Arb.int(), Arb.int()) { a, b, c, d -> + checkAll { val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } val f = async { parZip(loserA, loserB, loserC, loserD) { _a, _b, _c, _d -> Tuple4(_a, _b, _c, _d) } } repeat(4) { s.send(Unit) } // Suspend until all racers started f.cancel() - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() } } @@ -99,43 +87,59 @@ class ParMap4Test : ArrowFxSpec( checkAll( Arb.throwable(), Arb.element(listOf(1, 2, 3, 4)), - Arb.int(), - Arb.int(), - Arb.int() - ) { e, winningTask, a, b, c -> + ) { e, winningTask -> val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() val winner: suspend CoroutineScope.() -> Int = { repeat(3) { s.send(Unit) }; throw e } - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } val r = Either.catch { when (winningTask) { - 1 -> parZip(winner, loserA, loserB, loserC) { _, _, _, _ -> Unit } - 2 -> parZip(loserA, winner, loserB, loserC) { _, _, _, _ -> Unit } - 3 -> parZip(loserA, loserB, winner, loserC) { _, _, _, _ -> Unit } - else -> parZip(loserA, loserB, loserC, winner) { _, _, _, _ -> Unit } + 1 -> parZip(winner, loserA, loserB, loserC) { _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC) { _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC) { _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, winner) { _, _, _, _ -> } } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string(), Arb.int(1..4)) { msg, cancel -> + val s = Channel() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + + val winner: suspend CoroutineScope.() -> Int = { repeat(3) { s.send(Unit) }; throw CancellationException(msg) } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + + try { + when (cancel) { + 1 -> parZip(winner, loserA, loserB, loserC) { _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC) { _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC) { _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, winner) { _, _, _, _ -> } + } + } catch (e: CancellationException) { + e.message shouldBe msg + } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap5Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap5Test.kt index 330db340334..6ceeb1a7519 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap5Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap5Test.kt @@ -5,17 +5,18 @@ import arrow.core.Tuple5 import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.leftException -import arrow.fx.coroutines.never import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -64,19 +65,19 @@ class ParMap5Test : ArrowFxSpec( } "Cancelling parMapN 5 cancels all participants" { - checkAll(Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int()) { a, b, c, d, e -> + checkAll { val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pe = CompletableDeferred>() - - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserE: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pe.complete(Pair(e, ex)) } } + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserE: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } val f = async { parZip(loserA, loserB, loserC, loserD, loserE) { _a, _b, _c, _d, _e -> @@ -93,28 +94,11 @@ class ParMap5Test : ArrowFxSpec( repeat(5) { s.send(Unit) } // Suspend until all racers started f.cancel() - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - - pe.await().let { (res, exit) -> - res shouldBe e - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() } } @@ -122,51 +106,67 @@ class ParMap5Test : ArrowFxSpec( checkAll( Arb.throwable(), Arb.element(listOf(1, 2, 3, 4, 5)), - Arb.int(), - Arb.int(), - Arb.int(), - Arb.int() - ) { e, winningTask, a, b, c, d -> + ) { e, winningTask -> val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() val winner: suspend CoroutineScope.() -> Int = { repeat(4) { s.send(Unit) }; throw e } - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } val r = Either.catch { when (winningTask) { - 1 -> parZip(winner, loserA, loserB, loserC, loserD) { _, _, _, _, _ -> Unit } - 2 -> parZip(loserA, winner, loserB, loserC, loserD) { _, _, _, _, _ -> Unit } - 3 -> parZip(loserA, loserB, winner, loserC, loserD) { _, _, _, _, _ -> Unit } - 4 -> parZip(loserA, loserB, loserC, winner, loserD) { _, _, _, _, _ -> Unit } - else -> parZip(loserA, loserB, loserC, loserD, winner) { _, _, _, _, _ -> Unit } + 1 -> parZip(winner, loserA, loserB, loserC, loserD) { _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD) { _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD) { _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD) { _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, winner) { _, _, _, _, _ -> } } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string(), Arb.int(1..5)) { msg, cancel -> + val s = Channel() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + + val winner: suspend CoroutineScope.() -> Int = { repeat(4) { s.send(Unit) }; throw CancellationException(msg) } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + + try { + when (cancel) { + 1 -> parZip(winner, loserA, loserB, loserC, loserD) { _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD) { _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD) { _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD) { _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, winner) { _, _, _, _, _ -> } + } + } catch (e: CancellationException) { + e.message shouldBe msg + } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap6Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap6Test.kt index c4ca33db54a..78982a64066 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap6Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap6Test.kt @@ -5,18 +5,18 @@ import arrow.core.Tuple6 import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.leftException -import arrow.fx.coroutines.never import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.next +import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -71,21 +71,21 @@ class ParMap6Test : ArrowFxSpec( } "Cancelling parMapN 6 cancels all participants" { - checkAll(Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int()) { a, b, c, d, e, f -> + checkAll { val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pe = CompletableDeferred>() - val pf = CompletableDeferred>() - - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserE: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pe.complete(Pair(e, ex)) } } - val loserF: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pf.complete(Pair(f, ex)) } } + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + val pf = CompletableDeferred() + + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserE: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } val fork = async { parZip(loserA, loserB, loserC, loserD, loserE, loserF) { _a, _b, _c, _d, _e, _f -> @@ -96,30 +96,12 @@ class ParMap6Test : ArrowFxSpec( repeat(6) { s.send(Unit) } // Suspend until all racers started fork.cancel() - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - pe.await().let { (res, exit) -> - res shouldBe e - exit.shouldBeInstanceOf() - } - pf.await().let { (res, exit) -> - res shouldBe f - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() } } @@ -128,61 +110,74 @@ class ParMap6Test : ArrowFxSpec( Arb.throwable(), Arb.element(listOf(1, 2, 3, 4, 5, 6)) ) { e, winningTask -> - - val intGen = Arb.int() - val a = intGen.next() - val b = intGen.next() - val c = intGen.next() - val d = intGen.next() - val f = intGen.next() - val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pf = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pf = CompletableDeferred() val winner: suspend CoroutineScope.() -> Int = { repeat(5) { s.send(Unit) }; throw e } - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserF: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pf.complete(Pair(f, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } val r = Either.catch { when (winningTask) { - 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF) { _, _, _, _, _, _ -> Unit } - 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF) { _, _, _, _, _, _ -> Unit } - 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF) { _, _, _, _, _, _ -> Unit } - 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF) { _, _, _, _, _, _ -> Unit } - 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF) { _, _, _, _, _, _ -> Unit } - else -> parZip(loserA, loserB, loserC, loserD, loserF, winner) { _, _, _, _, _, _ -> Unit } + 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF) { _, _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF) { _, _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF) { _, _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF) { _, _, _, _, _, _ -> } + 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF) { _, _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, loserF, winner) { _, _, _, _, _, _ -> } } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - pf.await().let { (res, exit) -> - res shouldBe f - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string(), Arb.int(1..6)) { msg, cancel -> + val s = Channel() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + + val winner: suspend CoroutineScope.() -> Int = { repeat(5) { s.send(Unit) }; throw CancellationException(msg) } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } + + try { + when (cancel) { + 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF) { _, _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF) { _, _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF) { _, _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF) { _, _, _, _, _, _ -> } + 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF) { _, _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, loserF, winner) { _, _, _, _, _, _ -> } + } + } catch (e: CancellationException) { + e.message shouldBe msg + } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap7Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap7Test.kt index 6d6dc71c3b0..9d77f712eed 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap7Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap7Test.kt @@ -5,18 +5,18 @@ import arrow.core.Tuple7 import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.leftException -import arrow.fx.coroutines.never import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.next +import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -77,23 +77,23 @@ class ParMap7Test : ArrowFxSpec( } "Cancelling parMapN 7 cancels all participants" { - checkAll(Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int()) { a, b, c, d, e, f, g -> + checkAll { val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pe = CompletableDeferred>() - val pf = CompletableDeferred>() - val pg = CompletableDeferred>() - - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserE: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pe.complete(Pair(e, ex)) } } - val loserF: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pf.complete(Pair(f, ex)) } } - val loserG: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pg.complete(Pair(g, ex)) } } + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + val pf = CompletableDeferred() + val pg = CompletableDeferred() + + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserE: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } + val loserG: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pg) } val fork = async { parZip(loserA, loserB, loserC, loserD, loserE, loserF, loserG) { a, b, c, d, e, f, g -> @@ -104,34 +104,13 @@ class ParMap7Test : ArrowFxSpec( repeat(7) { s.send(Unit) } // Suspend until all racers started fork.cancel() - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - pe.await().let { (res, exit) -> - res shouldBe e - exit.shouldBeInstanceOf() - } - pf.await().let { (res, exit) -> - res shouldBe f - exit.shouldBeInstanceOf() - } - pg.await().let { (res, exit) -> - res shouldBe g - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() + pg.await().shouldBeTypeOf() } } @@ -141,68 +120,82 @@ class ParMap7Test : ArrowFxSpec( Arb.element(listOf(1, 2, 3, 4, 5, 6, 7)) ) { e, winningTask -> - val intGen = Arb.int() - val a = intGen.next() - val b = intGen.next() - val c = intGen.next() - val d = intGen.next() - val f = intGen.next() - val g = intGen.next() - val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pf = CompletableDeferred>() - val pg = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pf = CompletableDeferred() + val pg = CompletableDeferred() val winner: suspend CoroutineScope.() -> Int = { repeat(6) { s.send(Unit) }; throw e } - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserF: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pf.complete(Pair(f, ex)) } } - val loserG: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pg.complete(Pair(g, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } + val loserG: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pg) } val r = Either.catch { when (winningTask) { - 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> Unit } - 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> Unit } - 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> Unit } - 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> Unit } - 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF, loserG) { _, _, _, _, _, _, _ -> Unit } - 6 -> parZip(loserA, loserB, loserC, loserD, loserF, winner, loserG) { _, _, _, _, _, _, _ -> Unit } - else -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, winner) { _, _, _, _, _, _, _ -> Unit } + 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF, loserG) { _, _, _, _, _, _, _ -> } + 6 -> parZip(loserA, loserB, loserC, loserD, loserF, winner, loserG) { _, _, _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, winner) { _, _, _, _, _, _, _ -> } } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - pf.await().let { (res, exit) -> - res shouldBe f - exit.shouldBeInstanceOf() - } - pg.await().let { (res, exit) -> - res shouldBe g - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() + pg.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string(), Arb.int(1..7)) { msg, cancel -> + val s = Channel() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + val pf = CompletableDeferred() + + val winner: suspend CoroutineScope.() -> Int = { repeat(6) { s.send(Unit) }; throw CancellationException(msg) } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } + val loserG: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } + + try { + when (cancel) { + 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF, loserG) { _, _, _, _, _, _, _ -> } + 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF, loserG) { _, _, _, _, _, _, _ -> } + 6 -> parZip(loserA, loserB, loserC, loserD, loserF, winner, loserG) { _, _, _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, winner) { _, _, _, _, _, _, _ -> } + } + } catch (e: CancellationException) { + e.message shouldBe msg + } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap8Test.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap8Test.kt index f32655fe6ed..471acc0990c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap8Test.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/parMapN/ParMap8Test.kt @@ -5,18 +5,18 @@ import arrow.core.Tuple8 import arrow.fx.coroutines.ArrowFxSpec import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.awaitExitCase import arrow.fx.coroutines.leftException -import arrow.fx.coroutines.never import arrow.fx.coroutines.parZip import arrow.fx.coroutines.throwable import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.next +import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -83,25 +83,25 @@ class ParMap8Test : ArrowFxSpec( } "Cancelling parMapN 8 cancels all participants" { - checkAll(Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int(), Arb.int()) { a, b, c, d, e, f, g, h -> + checkAll { val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pe = CompletableDeferred>() - val pf = CompletableDeferred>() - val pg = CompletableDeferred>() - val ph = CompletableDeferred>() - - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserE: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pe.complete(Pair(e, ex)) } } - val loserF: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pf.complete(Pair(f, ex)) } } - val loserG: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pg.complete(Pair(g, ex)) } } - val loserH: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> ph.complete(Pair(h, ex)) } } + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + val pf = CompletableDeferred() + val pg = CompletableDeferred() + val ph = CompletableDeferred() + + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserE: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } + val loserG: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pg) } + val loserH: suspend CoroutineScope.() -> Int = { awaitExitCase(s, ph) } val fork = async { parZip(loserA, loserB, loserC, loserD, loserE, loserF, loserG, loserH) { a, b, c, d, e, f, g, h -> @@ -112,38 +112,14 @@ class ParMap8Test : ArrowFxSpec( repeat(8) { s.send(Unit) } // Suspend until all racers started fork.cancel() - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - pe.await().let { (res, exit) -> - res shouldBe e - exit.shouldBeInstanceOf() - } - pf.await().let { (res, exit) -> - res shouldBe f - exit.shouldBeInstanceOf() - } - pg.await().let { (res, exit) -> - res shouldBe g - exit.shouldBeInstanceOf() - } - ph.await().let { (res, exit) -> - res shouldBe h - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() + pg.await().shouldBeTypeOf() + ph.await().shouldBeTypeOf() } } @@ -152,77 +128,90 @@ class ParMap8Test : ArrowFxSpec( Arb.throwable(), Arb.element(listOf(1, 2, 3, 4, 5, 6, 7, 8)) ) { e, winningTask -> - - val intGen = Arb.int() - val a = intGen.next() - val b = intGen.next() - val c = intGen.next() - val d = intGen.next() - val f = intGen.next() - val g = intGen.next() - val h = intGen.next() - val s = Channel() - val pa = CompletableDeferred>() - val pb = CompletableDeferred>() - val pc = CompletableDeferred>() - val pd = CompletableDeferred>() - val pf = CompletableDeferred>() - val pg = CompletableDeferred>() - val ph = CompletableDeferred>() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pf = CompletableDeferred() + val pg = CompletableDeferred() + val ph = CompletableDeferred() val winner: suspend CoroutineScope.() -> Int = { repeat(7) { s.send(Unit) }; throw e } - val loserA: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pa.complete(Pair(a, ex)) } } - val loserB: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pb.complete(Pair(b, ex)) } } - val loserC: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pc.complete(Pair(c, ex)) } } - val loserD: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pd.complete(Pair(d, ex)) } } - val loserF: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pf.complete(Pair(f, ex)) } } - val loserG: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> pg.complete(Pair(g, ex)) } } - val loserH: suspend CoroutineScope.() -> Int = { guaranteeCase({ s.receive(); never() }) { ex -> ph.complete(Pair(h, ex)) } } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } + val loserG: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pg) } + val loserH: suspend CoroutineScope.() -> Int = { awaitExitCase(s, ph) } val r = Either.catch { when (winningTask) { - 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> Unit } - 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> Unit } - 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> Unit } - 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> Unit } - 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> Unit } - 6 -> parZip(loserA, loserB, loserC, loserD, loserF, winner, loserG, loserH) { _, _, _, _, _, _, _, _ -> Unit } - 7 -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, winner, loserH) { _, _, _, _, _, _, _, _ -> Unit } - else -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, loserH, winner) { _, _, _, _, _, _, _, _ -> Unit } + 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 6 -> parZip(loserA, loserB, loserC, loserD, loserF, winner, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 7 -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, winner, loserH) { _, _, _, _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, loserH, winner) { _, _, _, _, _, _, _, _ -> } } } - pa.await().let { (res, exit) -> - res shouldBe a - exit.shouldBeInstanceOf() - } - pb.await().let { (res, exit) -> - res shouldBe b - exit.shouldBeInstanceOf() - } - pc.await().let { (res, exit) -> - res shouldBe c - exit.shouldBeInstanceOf() - } - pd.await().let { (res, exit) -> - res shouldBe d - exit.shouldBeInstanceOf() - } - pf.await().let { (res, exit) -> - res shouldBe f - exit.shouldBeInstanceOf() - } - pg.await().let { (res, exit) -> - res shouldBe g - exit.shouldBeInstanceOf() - } - ph.await().let { (res, exit) -> - res shouldBe h - exit.shouldBeInstanceOf() - } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() + pg.await().shouldBeTypeOf() + ph.await().shouldBeTypeOf() r should leftException(e) } } + + "parMapN CancellationException on right can cancel rest" { + checkAll(Arb.string(), Arb.int(1..8)) { msg, cancel -> + val s = Channel() + val pa = CompletableDeferred() + val pb = CompletableDeferred() + val pc = CompletableDeferred() + val pd = CompletableDeferred() + val pe = CompletableDeferred() + val pf = CompletableDeferred() + val pg = CompletableDeferred() + + val winner: suspend CoroutineScope.() -> Int = { repeat(7) { s.send(Unit) }; throw CancellationException(msg) } + val loserA: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pa) } + val loserB: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pb) } + val loserC: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pc) } + val loserD: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pd) } + val loserF: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pe) } + val loserG: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pf) } + val loserH: suspend CoroutineScope.() -> Int = { awaitExitCase(s, pg) } + + try { + when (cancel) { + 1 -> parZip(winner, loserA, loserB, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 2 -> parZip(loserA, winner, loserB, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 3 -> parZip(loserA, loserB, winner, loserC, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 4 -> parZip(loserA, loserB, loserC, winner, loserD, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 5 -> parZip(loserA, loserB, loserC, loserD, winner, loserF, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 6 -> parZip(loserA, loserB, loserC, loserD, loserF, winner, loserG, loserH) { _, _, _, _, _, _, _, _ -> } + 7 -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, winner, loserH) { _, _, _, _, _, _, _, _ -> } + else -> parZip(loserA, loserB, loserC, loserD, loserF, loserG, loserH, winner) { _, _, _, _, _, _, _, _ -> } + } + } catch (e: CancellationException) { + e.message shouldBe msg + } + pa.await().shouldBeTypeOf() + pb.await().shouldBeTypeOf() + pc.await().shouldBeTypeOf() + pd.await().shouldBeTypeOf() + pe.await().shouldBeTypeOf() + pf.await().shouldBeTypeOf() + pg.await().shouldBeTypeOf() + } + } } )