From f72ce81d8a37bd3c2c7cdf4256903dbe23e55239 Mon Sep 17 00:00:00 2001
From: i-walker <46971368+i-walker@users.noreply.github.com>
Date: Wed, 23 Dec 2020 11:32:22 +0100
Subject: [PATCH 1/3] group module build settings, inline sleep/ timeOutOrNull
and replace arrow Duration with kotlin.time.Duration
---
arrow-fx-coroutines-stream/build.gradle | 8 --
.../arrow/fx/coroutines/stream/Stream.kt | 7 +-
.../arrow/fx/coroutines/stream/BracketTest.kt | 8 +-
.../fx/coroutines/stream/CallbackTest.kt | 10 +--
.../fx/coroutines/stream/ConcurrentlyTest.kt | 10 +--
.../fx/coroutines/stream/InterruptionTest.kt | 44 +++++-----
.../arrow/fx/coroutines/stream/ParJoinTest.kt | 4 +-
.../coroutines/stream/concurrent/QueueTest.kt | 14 +--
arrow-fx-coroutines-test/build.gradle | 8 --
arrow-fx-coroutines/build.gradle | 8 --
.../kotlin/arrow/fx/coroutines/Schedule.kt | 5 +-
.../arrow/fx/coroutines/BracketCaseTest.kt | 3 +-
.../arrow/fx/coroutines/CancelBoundary.kt | 3 +-
.../arrow/fx/coroutines/CancellableF.kt | 3 +-
.../arrow/fx/coroutines/CircuitBreakerTest.kt | 5 +-
.../arrow/fx/coroutines/ConcurrentVarTest.kt | 13 +--
.../arrow/fx/coroutines/EnvironmentTest.kt | 3 +-
arrow-fx-stm/build.gradle | 8 --
.../src/test/kotlin/arrow/fx/stm/STMTest.kt | 14 +--
.../src/test/kotlin/arrow/fx/stm/TVarTest.kt | 10 +--
arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt | 42 +++++----
build.gradle | 88 ++++++++++++-------
22 files changed, 163 insertions(+), 155 deletions(-)
diff --git a/arrow-fx-coroutines-stream/build.gradle b/arrow-fx-coroutines-stream/build.gradle
index fd5b84db1..d530240cc 100644
--- a/arrow-fx-coroutines-stream/build.gradle
+++ b/arrow-fx-coroutines-stream/build.gradle
@@ -1,13 +1,5 @@
-plugins {
- id "org.jetbrains.kotlin.jvm"
- id "org.jlleitschuh.gradle.ktlint"
-}
-
apply plugin: 'kotlinx-atomicfu'
-apply from: "$SUB_PROJECT"
-apply from: "$DOC_CREATION"
-
dependencies {
api "io.arrow-kt:arrow-core:$VERSION_NAME"
implementation project(":arrow-fx-coroutines")
diff --git a/arrow-fx-coroutines-stream/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt b/arrow-fx-coroutines-stream/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt
index 0232b811a..9e21c2390 100644
--- a/arrow-fx-coroutines-stream/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt
+++ b/arrow-fx-coroutines-stream/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt
@@ -9,7 +9,6 @@ import arrow.core.Some
import arrow.core.extensions.list.foldable.foldLeft
import arrow.core.identity
import arrow.fx.coroutines.ComputationPool
-import arrow.fx.coroutines.Duration
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Fiber
import arrow.fx.coroutines.ForkAndForget
@@ -20,9 +19,11 @@ import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.concurrent.Signal
import arrow.typeclasses.Monoid
import arrow.typeclasses.Semigroup
+import kotlinx.coroutines.delay
import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
+import kotlin.time.Duration
class ForStream private constructor() {
companion object
@@ -1447,7 +1448,7 @@ inline fun StreamOf.fix(): Stream =
* Interrupts this stream after the specified duration has passed.
*/
fun interruptAfter(duration: Duration): Stream =
- interruptWhen { Right(arrow.fx.coroutines.sleep(duration)) }
+ interruptWhen { Right(delay(duration)) }
/**
* Transforms this stream using the given `Pipe`.
@@ -1478,7 +1479,7 @@ inline fun StreamOf.fix(): Stream =
* A single-element `Stream` that waits for the duration `d` before emitting unit.
*/
fun sleep(d: Duration): Stream =
- effect { arrow.fx.coroutines.sleep(d) }
+ effect { delay(d) }
/**
* Alias for `sleep(d).void`. Often used in conjunction with [append] (i.e., `sleep_(..).append { s }`) as a more
diff --git a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt
index 682300dc5..923703544 100644
--- a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt
+++ b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt
@@ -8,9 +8,8 @@ import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.leftException
-import arrow.fx.coroutines.milliseconds
+import kotlin.time.milliseconds
import arrow.fx.coroutines.parTupledN
-import arrow.fx.coroutines.sleep
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
@@ -19,6 +18,7 @@ import io.kotest.property.arbitrary.bool
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.string
+import kotlinx.coroutines.delay
class BracketTest : StreamSpec(spec = {
@@ -243,7 +243,7 @@ class BracketTest : StreamSpec(spec = {
exit.complete(ex)
throw e
}).flatMap { Stream.never() }
- .interruptWhen { Right(sleep(50.milliseconds)) }
+ .interruptWhen { Right(delay(50.milliseconds)) }
.drain()
} shouldBe e
@@ -346,7 +346,7 @@ class BracketTest : StreamSpec(spec = {
)
}
- parTupledN({ latch.get() }, { sleep(50.milliseconds) })
+ parTupledN({ latch.get() }, { delay(50.milliseconds) })
f.cancel()
diff --git a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt
index 35856d601..e869a28bb 100644
--- a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt
+++ b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt
@@ -11,13 +11,13 @@ import arrow.fx.coroutines.UnsafePromise
import arrow.fx.coroutines.cancelBoundary
import arrow.fx.coroutines.milliseconds
import arrow.fx.coroutines.parTupledN
-import arrow.fx.coroutines.sleep
import arrow.fx.coroutines.startCoroutineCancellable
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.map
+import kotlinx.coroutines.delay
class CallbackTest : StreamSpec(iterations = 250, spec = {
@@ -93,7 +93,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
Stream.callback {
emit(1)
- sleep(500.milliseconds)
+ delay(500.milliseconds.millis)
emit(2)
ref.set(true)
end()
@@ -145,7 +145,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
)
}
- parTupledN({ latch.get() }, { sleep(20.milliseconds) })
+ parTupledN({ latch.get() }, { delay(20.milliseconds.millis) })
f.cancel()
@@ -180,7 +180,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
ForkConnected { cancel.invoke() }
// Let cancel schedule
- sleep(10.milliseconds)
+ delay(10.milliseconds.millis)
start.complete(Unit) // Continue cancellableF
@@ -213,7 +213,7 @@ private suspend fun countToCallback(
arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) {
i += 1
cb(map(i))
- sleep(500.milliseconds)
+ delay(500.milliseconds.millis)
}
onEnd()
}
diff --git a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt
index 4a2a51fea..fd51e83f5 100644
--- a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt
+++ b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt
@@ -5,13 +5,13 @@ import arrow.fx.coroutines.Atomic
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.leftException
-import arrow.fx.coroutines.milliseconds
+import kotlin.time.milliseconds
import arrow.fx.coroutines.never
-import arrow.fx.coroutines.sleep
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
+import kotlinx.coroutines.delay
class ConcurrentlyTest : StreamSpec(spec = {
@@ -38,7 +38,7 @@ class ConcurrentlyTest : StreamSpec(spec = {
"when primary stream fails, overall stream fails and background stream is terminated" {
checkAll(Arb.throwable()) { e ->
val semaphore = Semaphore(0)
- val bg = Stream.effect { sleep(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
+ val bg = Stream.effect { delay(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
val fg = Stream.raiseError(e).delayBy(25.milliseconds)
assertThrowable {
@@ -53,7 +53,7 @@ class ConcurrentlyTest : StreamSpec(spec = {
checkAll(Arb.stream(Arb.int())) { s ->
val semaphore = Semaphore(0)
- val bg = Stream.effect { sleep(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
+ val bg = Stream.effect { delay(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
val fg = s.delayBy(25.milliseconds)
fg.concurrently(bg)
@@ -82,7 +82,7 @@ class ConcurrentlyTest : StreamSpec(spec = {
val runner = Stream.bracket(
{ runnerRun.set(true) },
{
- sleep(100.milliseconds) // assure this inner finalizer always take longer run than `outer`
+ delay(100.milliseconds) // assure this inner finalizer always take longer run than `outer`
finRef.update { it + "Inner" } // signal finalizer invoked
throw e // signal a failure
}).flatMap { // flag the concurrently had chance to start, as if the `s` will be empty `runner` may not be evaluated at all.
diff --git a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt
index c6125e287..7f8ddf5cc 100644
--- a/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt
+++ b/arrow-fx-coroutines-stream/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt
@@ -9,17 +9,17 @@ import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.guarantee
-import arrow.fx.coroutines.timeOutOrNull
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.ExitCase
-import arrow.fx.coroutines.milliseconds
-import arrow.fx.coroutines.sleep
+import kotlin.time.milliseconds
import arrow.fx.coroutines.never
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.types.shouldBeInstanceOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.withTimeoutOrNull
class InterruptionTest : StreamSpec(spec = {
"can cancel a hung effect" - {
@@ -31,7 +31,7 @@ class InterruptionTest : StreamSpec(spec = {
s.append { Stream(1) } // Make sure is not empty
.effectMap {
guaranteeCase({ latch.complete(Unit); never() }) { ex -> exit.complete(ex) }
- }.interruptWhen { Right(latch.get().also { sleep(20.milliseconds) }) }
+ }.interruptWhen { Right(latch.get().also { delay(20.milliseconds) }) }
.toList()
}
@@ -44,7 +44,7 @@ class InterruptionTest : StreamSpec(spec = {
"can interrupt a hung effect" - {
checkAll(Arb.stream(Arb.int())) { s ->
s.effectMap { never() }
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.toList() shouldBe emptyList()
}
}
@@ -65,7 +65,7 @@ class InterruptionTest : StreamSpec(spec = {
"constant stream" - {
checkAll(Arb.int()) { i ->
Stream.constant(i)
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.drain() // Finishes and gets interrupted
}
}
@@ -73,7 +73,7 @@ class InterruptionTest : StreamSpec(spec = {
"constant stream with a flatMap" - {
checkAll(Arb.int()) { i ->
Stream.constant(i)
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.flatMap { Stream(1) }
.drain()
}
@@ -84,7 +84,7 @@ class InterruptionTest : StreamSpec(spec = {
Stream(i).flatMap { i -> Stream(i).append { loop(i + 1) } }
loop(0)
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}
@@ -93,7 +93,7 @@ class InterruptionTest : StreamSpec(spec = {
Stream.effect { Unit }.flatMap { loop() }
loop()
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}
@@ -102,19 +102,19 @@ class InterruptionTest : StreamSpec(spec = {
Stream(Unit).flatMap { loop() }
loop()
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}
"effect stream" - {
Stream.effect { Unit }.repeat()
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}
"Constant drained stream" - {
Stream.constant(true)
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}
@@ -148,7 +148,7 @@ class InterruptionTest : StreamSpec(spec = {
"stream that never terminates in flatMap" - {
checkAll(Arb.stream(Arb.int())) { s ->
- s.interruptWhen { Right(sleep(20.milliseconds)) }
+ s.interruptWhen { Right(delay(20.milliseconds)) }
.flatMap { Stream.never() }
.toList() shouldBe emptyList()
}
@@ -160,7 +160,7 @@ class InterruptionTest : StreamSpec(spec = {
Stream.effect { Semaphore(0) }.flatMap { semaphore ->
Stream(1)
.append { s }
- .interruptWhen { sleep(20.milliseconds); Either.Left(e) }
+ .interruptWhen { delay(20.milliseconds); Either.Left(e) }
.flatMap { Stream.effect_ { semaphore.acquire() } }
}.toList()
} shouldBe Either.Left(e)
@@ -169,7 +169,7 @@ class InterruptionTest : StreamSpec(spec = {
"resume on append" - {
Stream.never()
- .interruptWhen { Right(sleep(20.milliseconds)) }
+ .interruptWhen { Right(delay(20.milliseconds)) }
.append { Stream(5) }
.toList() shouldBe listOf(5)
}
@@ -178,7 +178,7 @@ class InterruptionTest : StreamSpec(spec = {
checkAll(Arb.stream(Arb.int())) { s ->
val expected = s.toList()
- s.interruptWhen { Right(sleep(20.milliseconds)) }
+ s.interruptWhen { Right(delay(20.milliseconds)) }
.effectMap { never() }
.void()
.append { s }
@@ -190,7 +190,7 @@ class InterruptionTest : StreamSpec(spec = {
checkAll(Arb.stream(Arb.int())) { s ->
val expected = s.toList()
- s.interruptWhen { Right(sleep(20.milliseconds)) }
+ s.interruptWhen { Right(delay(20.milliseconds)) }
.effectMap { never