Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

inline sleep and timeOutOrNull #358

Merged
merged 6 commits into from Dec 28, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions arrow-fx-coroutines-stream/build.gradle
Expand Up @@ -19,3 +19,15 @@ dependencies {
testImplementation "io.kotest:kotest-assertions-core-jvm:$KOTEST_VERSION" // for kotest core jvm assertions
testImplementation "io.kotest:kotest-property-jvm:$KOTEST_VERSION" // for kotest property test
}

// usage of kotlin.time
compileKotlin {
kotlinOptions {
freeCompilerArgs = ["-Xopt-in=kotlin.time.ExperimentalTime"]
}
}
compileTestKotlin {
kotlinOptions {
freeCompilerArgs = ["-Xopt-in=kotlin.time.ExperimentalTime"]
}
}
Expand Up @@ -9,7 +9,6 @@ import arrow.core.Some
import arrow.core.extensions.list.foldable.foldLeft
import arrow.core.identity
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.Duration
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Fiber
import arrow.fx.coroutines.ForkAndForget
Expand All @@ -20,9 +19,11 @@ import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.concurrent.Signal
import arrow.typeclasses.Monoid
import arrow.typeclasses.Semigroup
import kotlinx.coroutines.delay
import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
import kotlin.time.Duration

class ForStream private constructor() {
companion object
Expand Down Expand Up @@ -1447,7 +1448,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Interrupts this stream after the specified duration has passed.
*/
fun interruptAfter(duration: Duration): Stream<O> =
interruptWhen { Right(arrow.fx.coroutines.sleep(duration)) }
interruptWhen { Right(delay(duration)) }

/**
* Transforms this stream using the given `Pipe`.
Expand Down Expand Up @@ -1478,7 +1479,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* A single-element `Stream` that waits for the duration `d` before emitting unit.
*/
fun sleep(d: Duration): Stream<Unit> =
effect { arrow.fx.coroutines.sleep(d) }
effect { delay(d) }

/**
* Alias for `sleep(d).void`. Often used in conjunction with [append] (i.e., `sleep_(..).append { s }`) as a more
Expand Down
Expand Up @@ -8,9 +8,8 @@ import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.leftException
import arrow.fx.coroutines.milliseconds
import kotlin.time.milliseconds
import arrow.fx.coroutines.parTupledN
import arrow.fx.coroutines.sleep
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
Expand All @@ -19,6 +18,7 @@ import io.kotest.property.arbitrary.bool
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.string
import kotlinx.coroutines.delay

class BracketTest : StreamSpec(spec = {

Expand Down Expand Up @@ -243,7 +243,7 @@ class BracketTest : StreamSpec(spec = {
exit.complete(ex)
throw e
}).flatMap { Stream.never<Unit>() }
.interruptWhen { Right(sleep(50.milliseconds)) }
.interruptWhen { Right(delay(50.milliseconds)) }
.drain()
} shouldBe e

Expand Down Expand Up @@ -346,7 +346,7 @@ class BracketTest : StreamSpec(spec = {
)
}

parTupledN({ latch.get() }, { sleep(50.milliseconds) })
parTupledN({ latch.get() }, { delay(50.milliseconds) })

f.cancel()

Expand Down
Expand Up @@ -9,15 +9,15 @@ import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.Schedule
import arrow.fx.coroutines.UnsafePromise
import arrow.fx.coroutines.cancelBoundary
import arrow.fx.coroutines.milliseconds
import kotlin.time.milliseconds
import arrow.fx.coroutines.parTupledN
import arrow.fx.coroutines.sleep
import arrow.fx.coroutines.startCoroutineCancellable
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.map
import kotlinx.coroutines.delay

class CallbackTest : StreamSpec(iterations = 250, spec = {

Expand Down Expand Up @@ -93,7 +93,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {

Stream.callback {
emit(1)
sleep(500.milliseconds)
delay(500.milliseconds)
emit(2)
ref.set(true)
end()
Expand Down Expand Up @@ -145,7 +145,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
)
}

parTupledN({ latch.get() }, { sleep(20.milliseconds) })
parTupledN({ latch.get() }, { delay(20.milliseconds) })

f.cancel()

Expand Down Expand Up @@ -180,7 +180,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
ForkConnected { cancel.invoke() }

// Let cancel schedule
sleep(10.milliseconds)
delay(10.milliseconds)

start.complete(Unit) // Continue cancellableF

Expand Down Expand Up @@ -213,7 +213,7 @@ private suspend fun <A> countToCallback(
arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) {
i += 1
cb(map(i))
sleep(500.milliseconds)
delay(500.milliseconds)
}
onEnd()
}
Expand Up @@ -5,13 +5,13 @@ import arrow.fx.coroutines.Atomic
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.leftException
import arrow.fx.coroutines.milliseconds
import kotlin.time.milliseconds
import arrow.fx.coroutines.never
import arrow.fx.coroutines.sleep
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import kotlinx.coroutines.delay

class ConcurrentlyTest : StreamSpec(spec = {

Expand All @@ -38,7 +38,7 @@ class ConcurrentlyTest : StreamSpec(spec = {
"when primary stream fails, overall stream fails and background stream is terminated" {
checkAll(Arb.throwable()) { e ->
val semaphore = Semaphore(0)
val bg = Stream.effect { sleep(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
val bg = Stream.effect { delay(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
val fg = Stream.raiseError<Unit>(e).delayBy(25.milliseconds)

assertThrowable {
Expand All @@ -53,7 +53,7 @@ class ConcurrentlyTest : StreamSpec(spec = {
checkAll(Arb.stream(Arb.int())) { s ->
val semaphore = Semaphore(0)

val bg = Stream.effect { sleep(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
val bg = Stream.effect { delay(50.milliseconds) }.repeat().onFinalize { semaphore.release() }
val fg = s.delayBy(25.milliseconds)

fg.concurrently(bg)
Expand Down Expand Up @@ -82,7 +82,7 @@ class ConcurrentlyTest : StreamSpec(spec = {
val runner = Stream.bracket(
{ runnerRun.set(true) },
{
sleep(100.milliseconds) // assure this inner finalizer always take longer run than `outer`
delay(100.milliseconds) // assure this inner finalizer always take longer run than `outer`
finRef.update { it + "Inner" } // signal finalizer invoked
throw e // signal a failure
}).flatMap { // flag the concurrently had chance to start, as if the `s` will be empty `runner` may not be evaluated at all.
Expand Down
Expand Up @@ -9,17 +9,17 @@ import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.guarantee
import arrow.fx.coroutines.timeOutOrNull
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.milliseconds
import arrow.fx.coroutines.sleep
import kotlin.time.milliseconds
import arrow.fx.coroutines.never
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.types.shouldBeInstanceOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeoutOrNull

class InterruptionTest : StreamSpec(spec = {
"can cancel a hung effect" - {
Expand All @@ -31,7 +31,7 @@ class InterruptionTest : StreamSpec(spec = {
s.append { Stream(1) } // Make sure is not empty
.effectMap {
guaranteeCase({ latch.complete(Unit); never<Unit>() }) { ex -> exit.complete(ex) }
}.interruptWhen { Right(latch.get().also { sleep(20.milliseconds) }) }
}.interruptWhen { Right(latch.get().also { delay(20.milliseconds) }) }
.toList()
}

Expand All @@ -44,7 +44,7 @@ class InterruptionTest : StreamSpec(spec = {
"can interrupt a hung effect" - {
checkAll(Arb.stream(Arb.int())) { s ->
s.effectMap { never<Unit>() }
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.toList() shouldBe emptyList()
}
}
Expand All @@ -65,15 +65,15 @@ class InterruptionTest : StreamSpec(spec = {
"constant stream" - {
checkAll(Arb.int()) { i ->
Stream.constant(i)
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.drain() // Finishes and gets interrupted
}
}

"constant stream with a flatMap" - {
checkAll(Arb.int()) { i ->
Stream.constant(i)
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.flatMap { Stream(1) }
.drain()
}
Expand All @@ -84,7 +84,7 @@ class InterruptionTest : StreamSpec(spec = {
Stream(i).flatMap { i -> Stream(i).append { loop(i + 1) } }

loop(0)
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}

Expand All @@ -93,7 +93,7 @@ class InterruptionTest : StreamSpec(spec = {
Stream.effect { Unit }.flatMap { loop() }

loop()
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}

Expand All @@ -102,19 +102,19 @@ class InterruptionTest : StreamSpec(spec = {
Stream(Unit).flatMap { loop() }

loop()
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}

"effect stream" - {
Stream.effect { Unit }.repeat()
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}

"Constant drained stream" - {
Stream.constant(true)
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.drain()
}

Expand Down Expand Up @@ -148,7 +148,7 @@ class InterruptionTest : StreamSpec(spec = {

"stream that never terminates in flatMap" - {
checkAll(Arb.stream(Arb.int())) { s ->
s.interruptWhen { Right(sleep(20.milliseconds)) }
s.interruptWhen { Right(delay(20.milliseconds)) }
.flatMap { Stream.never<Int>() }
.toList() shouldBe emptyList()
}
Expand All @@ -160,7 +160,7 @@ class InterruptionTest : StreamSpec(spec = {
Stream.effect { Semaphore(0) }.flatMap { semaphore ->
Stream(1)
.append { s }
.interruptWhen { sleep(20.milliseconds); Either.Left(e) }
.interruptWhen { delay(20.milliseconds); Either.Left(e) }
.flatMap { Stream.effect_ { semaphore.acquire() } }
}.toList()
} shouldBe Either.Left(e)
Expand All @@ -169,7 +169,7 @@ class InterruptionTest : StreamSpec(spec = {

"resume on append" - {
Stream.never<Unit>()
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.append { Stream(5) }
.toList() shouldBe listOf(5)
}
Expand All @@ -178,7 +178,7 @@ class InterruptionTest : StreamSpec(spec = {
checkAll(Arb.stream(Arb.int())) { s ->
val expected = s.toList()

s.interruptWhen { Right(sleep(20.milliseconds)) }
s.interruptWhen { Right(delay(20.milliseconds)) }
.effectMap { never<Int>() }
.void()
.append { s }
Expand All @@ -190,7 +190,7 @@ class InterruptionTest : StreamSpec(spec = {
checkAll(Arb.stream(Arb.int())) { s ->
val expected = s.toList()

s.interruptWhen { Right(sleep(20.milliseconds)) }
s.interruptWhen { Right(delay(20.milliseconds)) }
.effectMap { never<Option<Int>>() }
.append { s.map { Some(it) } }
.filterOption()
Expand All @@ -203,7 +203,7 @@ class InterruptionTest : StreamSpec(spec = {
val expected = s.toList()

s.append { Stream(1) }
.interruptWhen { Right(sleep(50.milliseconds)) }
.interruptWhen { Right(delay(50.milliseconds)) }
.map { None }
.append { s.map { Some(it) } }
.flatMap {
Expand Down Expand Up @@ -250,7 +250,7 @@ class InterruptionTest : StreamSpec(spec = {
"resume on append with pull" - {
Stream(1)
.unchunk()
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.asPull()
.unconsOrNull()
.flatMap { uncons ->
Expand All @@ -267,7 +267,7 @@ class InterruptionTest : StreamSpec(spec = {

"resume with append after evalMap interruption" - {
Stream(1)
.interruptWhen { Right(sleep(20.milliseconds)) }
.interruptWhen { Right(delay(20.milliseconds)) }
.effectMap { never<Int>() }
.append { Stream(5) }
.toList() shouldBe listOf(5)
Expand All @@ -276,7 +276,7 @@ class InterruptionTest : StreamSpec(spec = {
"interrupted effect is cancelled" - {
val latch = Promise<Unit>()

timeOutOrNull(500.milliseconds) {
withTimeoutOrNull(500.milliseconds) {
Stream.effect { guarantee({ latch.get() }) { latch.complete(Unit) } }
.interruptAfter(50.milliseconds)
.drain()
Expand All @@ -290,7 +290,7 @@ class InterruptionTest : StreamSpec(spec = {
io.kotest.property.checkAll(500, Arb.stream(Arb.int())) { s ->
val expected = s.toList()

s.interruptWhen { Right(sleep(50.milliseconds)) }
s.interruptWhen { Right(delay(50.milliseconds)) }
.map { None }
.append { s.map { Option(it) } }
.interruptWhen { Right(never()) }
Expand Down