diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/builders.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/builders.kt index 938a0d33961..4acd577d018 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/builders.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/builders.kt @@ -1,8 +1,10 @@ package arrow.fx.coroutines -import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.awaitCancellation +// TODO deprecate? public suspend fun never(): A = - suspendCancellableCoroutine {} + awaitCancellation() +// TODO deprecate? public suspend fun unit(): Unit = Unit diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/FlowTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/FlowTest.kt index cb209dde436..44ffc09672c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/FlowTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/FlowTest.kt @@ -5,10 +5,11 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.int -import io.kotest.property.checkAll import io.kotest.property.arbitrary.positiveInts -import kotlinx.coroutines.CancellationException +import kotlin.time.ExperimentalTime import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf @@ -16,7 +17,6 @@ import kotlinx.coroutines.flow.reduce import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.toSet import kotlinx.coroutines.launch -import kotlin.time.ExperimentalTime @ExperimentalTime class FlowTest : ArrowFxSpec( @@ -72,28 +72,22 @@ class FlowTest : ArrowFxSpec( } "parMap - triggers cancel signal" { - checkAll(Arb.int(), Arb.int(1..2)) { i, n -> + checkAll { val latch = CompletableDeferred() - val exit = CompletableDeferred>() + val exit = CompletableDeferred() - assertThrowable { - flowOf(1, 2).parMap { index -> - if (index == n) { - guaranteeCase({ - latch.complete(Unit) - never() - }, { ex -> exit.complete(Pair(i, ex)) }) - } else { - latch.await() - throw CancellationException(null, null) - } + val job = launch { + flowOf(1).parMap { index -> + guaranteeCase({ + latch.complete(Unit) + awaitCancellation() + }, { ex -> exit.complete(ex) }) }.collect() - fail("Cannot reach here. CancellationException should be thrown.") - }.shouldBeTypeOf() - - val (ii, ex) = exit.await() - ii shouldBe i - ex.shouldBeTypeOf() + } + latch.await() + job.cancelAndJoin() + job.isCancelled shouldBe true + exit.await().shouldBeTypeOf() } } @@ -107,7 +101,7 @@ class FlowTest : ArrowFxSpec( if (index == n) { guaranteeCase({ latch.complete(Unit) - never() + awaitCancellation() }, { ex -> exit.complete(Pair(i, ex)) }) } else { latch.await() @@ -133,7 +127,7 @@ class FlowTest : ArrowFxSpec( flowOf(1, 2).parMap { index -> guaranteeCase({ if (index == 2) latch.complete(Unit) - never() + awaitCancellation() }, { ex -> if (index == 1) exitA.complete(Pair(i, ex)) else exitB.complete(Pair(i2, ex)) @@ -175,28 +169,23 @@ class FlowTest : ArrowFxSpec( } "parMapUnordered - triggers cancel signal" { - checkAll(Arb.int(), Arb.int(1..2)) { i, n -> + checkAll { val latch = CompletableDeferred() - val exit = CompletableDeferred>() + val exit = CompletableDeferred() - assertThrowable { - flowOf(1, 2).parMapUnordered { index -> - if (index == n) { - guaranteeCase({ - latch.complete(Unit) - never() - }, { ex -> exit.complete(Pair(i, ex)) }) - } else { - latch.await() - throw CancellationException(null, null) - } + val job = launch { + flowOf(1).parMapUnordered { + guaranteeCase({ + latch.complete(Unit) + awaitCancellation() + }, { ex -> exit.complete(ex) }) }.collect() - fail("Cannot reach here. CancellationException should be thrown.") - }.shouldBeTypeOf() + } + latch.await() + job.cancelAndJoin() - val (ii, ex) = exit.await() - ii shouldBe i - ex.shouldBeTypeOf() + job.isCancelled shouldBe true + exit.await().shouldBeTypeOf() } } @@ -210,7 +199,7 @@ class FlowTest : ArrowFxSpec( if (index == n) { guaranteeCase({ latch.complete(Unit) - never() + awaitCancellation() }, { ex -> exit.complete(Pair(i, ex)) }) } else { latch.await() @@ -236,7 +225,7 @@ class FlowTest : ArrowFxSpec( flowOf(1, 2).parMapUnordered { index -> guaranteeCase({ if (index == 2) latch.complete(Unit) - never() + awaitCancellation() }, { ex -> if (index == 1) exitA.complete(Pair(i, ex)) else exitB.complete(Pair(i2, ex)) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt index fd7ae9a5b8d..bfcfec5d811 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.toSet +import kotlinx.coroutines.test.currentTime import kotlinx.coroutines.test.runBlockingTest import kotlin.time.ExperimentalTime import kotlin.time.milliseconds diff --git a/arrow-libs/fx/arrow-fx-stm/src/commonTest/kotlin/arrow/fx/stm/TMapTest.kt b/arrow-libs/fx/arrow-fx-stm/src/commonTest/kotlin/arrow/fx/stm/TMapTest.kt index f26a2b6181b..97bf982db01 100644 --- a/arrow-libs/fx/arrow-fx-stm/src/commonTest/kotlin/arrow/fx/stm/TMapTest.kt +++ b/arrow-libs/fx/arrow-fx-stm/src/commonTest/kotlin/arrow/fx/stm/TMapTest.kt @@ -4,7 +4,6 @@ import arrow.fx.coroutines.ArrowFxSpec import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.pair import io.kotest.property.arbitrary.map class TMapTest : ArrowFxSpec( @@ -17,7 +16,7 @@ class TMapTest : ArrowFxSpec( } } "insert multiple values" { - checkAll(Arb.list(Arb.pair(Arb.int(), Arb.int())).map { it.distinct() }) { pairs -> + checkAll(Arb.map(Arb.int(), Arb.int())) { pairs -> val map = TMap.new() atomically { for ((k, v) in pairs) map.insert(k, v) @@ -28,7 +27,7 @@ class TMapTest : ArrowFxSpec( } } "insert multiple colliding values" { - checkAll(Arb.list(Arb.pair(Arb.int(), Arb.int()))) { pairs -> + checkAll(Arb.map(Arb.int(), Arb.int())) { pairs -> val map = TMap.new { 0 } // hash function that always returns 0 atomically { for ((k, v) in pairs) map.insert(k, v) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b7d9409a654..257fbfe4150 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ animalSniffer = "1.5.0" arrowGradleConfig = "0.6.0-alpha.4" assertj = "3.21.0" -coroutines = "1.5.2" +coroutines = "1.6.0" classgraph = "4.8.137" dokka = "1.5.30" jUnit = "4.12"