Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Deprecate arrow.fx.coroutines.Duration #338

Closed
wants to merge 8 commits into from
Closed
10 changes: 6 additions & 4 deletions arrow-docs/docs/fx/async/README.md
Expand Up @@ -101,14 +101,16 @@ suspend fun main(): Unit {
Once the function specifies a valid return, we can observe how the returned non-blocking value is bound on the left-hand side.

```kotlin:ank:playground
import kotlin.time.milliseconds
import kotlinx.coroutines.delay
import arrow.fx.coroutines.*

//sampleStart
suspend fun loser(): Unit =
never<Unit>() // Never wins

suspend fun winner(): Int {
sleep(5.milliseconds)
delay(5.milliseconds)
return 5
}

Expand Down Expand Up @@ -160,7 +162,7 @@ All operators found in Arrow Fx check for cancellation. In the small example of
```kotlin:ank
tailrec suspend fun sleeper(): Unit {
println("I am sleepy. I'm going to nap")
sleep(1.seconds) // <-- cancellation check-point
delay(1.seconds) // <-- cancellation check-point
println("1 second nap.. Going to sleep some more")
sleeper()
}
Expand Down Expand Up @@ -198,7 +200,7 @@ So how can you execute of `suspend fun` with guarantee that it cannot be cancell

```kotlin:ank
suspend fun uncancellableSleep(duration: Duration): Unit =
uncancellable { sleep(duration) }
uncancellable { delay(duration) }
```

If we now re-implement our previous `sleeper`, than it will behave a little different from before. The cancellation check before and after `uncancellableSleep` but note that the `sleep` istelf will not be cancelled.
Expand All @@ -221,7 +223,7 @@ import arrow.fx.coroutines.*

suspend fun main(): Unit {
val r = timeOutOrNull(1.seconds) {
uncancellable { sleep(2.seconds) }
uncancellable { delay(2.seconds) }
} // r is null, but took 2 seconds.
}
```
Expand Down
Expand Up @@ -4,7 +4,6 @@ import arrow.fx.coroutines.AtomicRefW
import arrow.fx.coroutines.CancelToken
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.cancelBoundary
import arrow.fx.coroutines.cancellable
import arrow.fx.coroutines.cancellableF
import arrow.fx.coroutines.guaranteeCase
Expand All @@ -22,6 +21,7 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineDispatcher
import kotlinx.coroutines.test.TestCoroutineExceptionHandler
Expand All @@ -32,7 +32,7 @@ import kotlinx.coroutines.test.TestCoroutineScope
class ExtensionsTest : StringSpec({

fun Arb.Companion.throwable(): Arb<Throwable> =
Arb.string().map(::RuntimeException)
string().map(::RuntimeException)

// --------------- suspendCancellable ---------------

Expand All @@ -44,11 +44,11 @@ class ExtensionsTest : StringSpec({
scope.launch {
suspendCancellable {
val first = i + 1
cancelBoundary()
coroutineContext.ensureActive()
val second = first + 1
cancelBoundary()
coroutineContext.ensureActive()
val third = second + 1
cancelBoundary()
coroutineContext.ensureActive()
third
} shouldBe i + 3
}
Expand Down Expand Up @@ -79,7 +79,7 @@ class ExtensionsTest : StringSpec({
val ref = AtomicRefW<Int?>(i)
scope.cancel()
scope.launch {
cancelBoundary()
coroutineContext.ensureActive()
ref.value = null
}

Expand Down
8 changes: 0 additions & 8 deletions arrow-fx-coroutines-stream/build.gradle
@@ -1,13 +1,5 @@
plugins {
id "org.jetbrains.kotlin.jvm"
id "org.jlleitschuh.gradle.ktlint"
}

apply plugin: 'kotlinx-atomicfu'

apply from: "$SUB_PROJECT"
apply from: "$DOC_CREATION"

dependencies {
api "io.arrow-kt:arrow-core:$VERSION_NAME"
implementation project(":arrow-fx-coroutines")
Expand Down
Expand Up @@ -9,12 +9,13 @@ import arrow.core.nonFatalOrThrow
import arrow.core.right
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Platform
import arrow.fx.coroutines.cancelBoundary
import arrow.fx.coroutines.stream.R.Done
import arrow.fx.coroutines.stream.R.Out
import arrow.fx.coroutines.stream.Pull.Result
import arrow.fx.coroutines.stream.R.Interrupted
import kotlinx.coroutines.ensureActive
import java.util.concurrent.CancellationException
import kotlin.coroutines.coroutineContext

internal sealed class R<out O> {
data class Done(val scope: Scope) : R<Nothing>()
Expand Down Expand Up @@ -122,7 +123,9 @@ internal fun <O> interruptBoundary(
}

internal suspend inline fun interruptGuard(scope: Scope): Result<Any?>? =
when (val isInterrupted = scope.isInterrupted().also { cancelBoundary() }) {
when (val isInterrupted = scope.isInterrupted().also {
coroutineContext.ensureActive()
}) {
None -> null
is Some -> when (val eith = isInterrupted.t) {
is Either.Left -> Result.Fail(eith.a)
Expand Down
Expand Up @@ -6,14 +6,14 @@ import arrow.core.Option
import arrow.core.Some
import arrow.core.extensions.option.monad.flatten
import arrow.core.orElse
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Platform
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.stream.concurrent.NoneTerminatedQueue
import arrow.fx.coroutines.stream.concurrent.Queue
import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.uncancellable
import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.CoroutineContext

// stops the join evaluation
Expand Down Expand Up @@ -149,8 +149,8 @@ internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.IOPool
* import arrow.fx.coroutines.milliseconds
* import arrow.fx.coroutines.sleep
* import kotlin.time.milliseconds
* import kotlinx.coroutines.delay
* import arrow.fx.coroutines.stream.*
*
* //sampleStart
Expand All @@ -171,7 +171,7 @@ internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable
*/
fun <O> Stream<Stream<O>>.parJoin(
maxOpen: Int,
ctx: CoroutineContext = ComputationPool
ctx: CoroutineContext = Dispatchers.Default
): Stream<O> {
require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" }

Expand Down Expand Up @@ -204,5 +204,5 @@ fun <O> Stream<Stream<O>>.parJoin(
}

/** Like [parJoin] but races all inner streams simultaneously without limit. */
fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = ComputationPool): Stream<O> =
fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = Dispatchers.Default): Stream<O> =
parJoin(Int.MAX_VALUE, ctx)
@@ -1,3 +1,4 @@
@file:Suppress("")
package arrow.fx.coroutines.stream

import arrow.core.Either
Expand All @@ -8,8 +9,7 @@ import arrow.core.Option
import arrow.core.Some
import arrow.core.extensions.list.foldable.foldLeft
import arrow.core.identity
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.Duration
import kotlin.time.Duration
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Fiber
import arrow.fx.coroutines.ForkAndForget
Expand All @@ -20,6 +20,8 @@ import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.concurrent.Signal
import arrow.typeclasses.Monoid
import arrow.typeclasses.Semigroup
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
Expand Down Expand Up @@ -592,14 +594,16 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Like [mapAccumulate], but accepts a function returning an suspend fun.
*
* ```kotlin:ank:playground
* import kotlin.time.milliseconds
* import kotlinx.coroutines.delay
* import arrow.fx.coroutines.*
* import arrow.fx.coroutines.stream.*
*
* //sampleStart
* suspend fun main(): Unit =
* Stream(1,2,3,4)
* .effectMapAccumulate(0) { acc, i ->
* sleep((i * 10).milliseconds)
* delay(i * 10).milliseconds)
* Pair(i, acc + i)
* }
* .toList()
Expand Down Expand Up @@ -1303,7 +1307,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* //sampleEnd
* ```
*/
fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = ComputationPool): Stream<O> =
fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = Dispatchers.Default): Stream<O> =
effect { Pair(Promise<Unit>(), Promise<Either<Throwable, Unit>>()) }
.flatMap { (interrupt, doneR) ->
bracket(
Expand Down Expand Up @@ -1338,14 +1342,14 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Merges both Streams into an Stream of A and B represented by Either<A, B>.
* This operation is equivalent to a normal merge but for different types.
*/
fun <B> either(ctx: CoroutineContext = ComputationPool, other: Stream<B>): Stream<Either<O, B>> =
fun <B> either(ctx: CoroutineContext = Dispatchers.Default, other: Stream<B>): Stream<Either<O, B>> =
Stream(this.map { Left(it) }, other.map { Right(it) })
.parJoin(2, ctx)

/**
* Starts this stream and cancels it as finalization of the returned stream.
*/
fun spawn(ctx: CoroutineContext = ComputationPool): Stream<Fiber<Unit>> =
fun spawn(ctx: CoroutineContext = Dispatchers.Default): Stream<Fiber<Unit>> =
supervise(ctx) { drain() }

/**
Expand Down Expand Up @@ -1447,7 +1451,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Interrupts this stream after the specified duration has passed.
*/
fun interruptAfter(duration: Duration): Stream<O> =
interruptWhen { Right(arrow.fx.coroutines.sleep(duration)) }
interruptWhen { Right(delay(duration)) }

/**
* Transforms this stream using the given `Pipe`.
Expand Down Expand Up @@ -1478,7 +1482,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* A single-element `Stream` that waits for the duration `d` before emitting unit.
*/
fun sleep(d: Duration): Stream<Unit> =
effect { arrow.fx.coroutines.sleep(d) }
effect { delay(d) }

/**
* Alias for `sleep(d).void`. Often used in conjunction with [append] (i.e., `sleep_(..).append { s }`) as a more
Expand Down Expand Up @@ -1770,7 +1774,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
/**
* Starts the supplied task and cancels it as finalization of the returned stream.
*/
fun <A> supervise(ctx: CoroutineContext = ComputationPool, fa: suspend () -> A): Stream<Fiber<A>> =
fun <A> supervise(ctx: CoroutineContext = Dispatchers.Default, fa: suspend () -> A): Stream<Fiber<A>> =
bracket(acquire = { fa.forkAndForget(ctx) }, release = { it.cancel() })

/**
Expand Down
Expand Up @@ -8,9 +8,7 @@ import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.leftException
import arrow.fx.coroutines.milliseconds
import arrow.fx.coroutines.parTupledN
import arrow.fx.coroutines.sleep
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
Expand All @@ -19,6 +17,8 @@ import io.kotest.property.arbitrary.bool
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.string
import kotlinx.coroutines.delay
import kotlin.time.milliseconds

class BracketTest : StreamSpec(spec = {

Expand Down Expand Up @@ -243,7 +243,7 @@ class BracketTest : StreamSpec(spec = {
exit.complete(ex)
throw e
}).flatMap { Stream.never<Unit>() }
.interruptWhen { Right(sleep(50.milliseconds)) }
.interruptWhen { Right(delay(50.milliseconds)) }
.drain()
} shouldBe e

Expand Down Expand Up @@ -346,7 +346,7 @@ class BracketTest : StreamSpec(spec = {
)
}

parTupledN({ latch.get() }, { sleep(50.milliseconds) })
parTupledN({ latch.get() }, { delay(50.milliseconds) })

f.cancel()

Expand Down
Expand Up @@ -8,16 +8,17 @@ import arrow.fx.coroutines.ForkConnected
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.Schedule
import arrow.fx.coroutines.UnsafePromise
import arrow.fx.coroutines.cancelBoundary
import arrow.fx.coroutines.milliseconds
import kotlin.time.milliseconds
import arrow.fx.coroutines.parTupledN
import arrow.fx.coroutines.sleep
import kotlinx.coroutines.delay
import arrow.fx.coroutines.startCoroutineCancellable
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.map
import kotlinx.coroutines.ensureActive
import io.kotest.property.checkAll

class CallbackTest : StreamSpec(iterations = 250, spec = {

Expand Down Expand Up @@ -93,7 +94,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {

Stream.callback {
emit(1)
sleep(500.milliseconds)
delay(500.milliseconds)
emit(2)
ref.set(true)
end()
Expand Down Expand Up @@ -145,7 +146,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
)
}

parTupledN({ latch.get() }, { sleep(20.milliseconds) })
parTupledN({ latch.get() }, { delay(20.milliseconds) })

f.cancel()

Expand All @@ -163,7 +164,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
Stream.cancellable {
latch.complete(Unit)
start.get()
cancelBoundary()
coroutineContext.ensureActive()
emit(Unit)
done.complete(i)
CancelToken.unit
Expand All @@ -180,7 +181,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = {
ForkConnected { cancel.invoke() }

// Let cancel schedule
sleep(10.milliseconds)
delay(10.milliseconds)

start.complete(Unit) // Continue cancellableF

Expand Down Expand Up @@ -213,7 +214,7 @@ private suspend fun <A> countToCallback(
arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) {
i += 1
cb(map(i))
sleep(500.milliseconds)
delay(500.milliseconds)
}
onEnd()
}