Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Deprecate arrow.fx.coroutines.Duration #338

Closed
wants to merge 8 commits into from
Expand Up @@ -6,14 +6,14 @@ import arrow.core.Option
import arrow.core.Some
import arrow.core.extensions.option.monad.flatten
import arrow.core.orElse
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Platform
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.stream.concurrent.NoneTerminatedQueue
import arrow.fx.coroutines.stream.concurrent.Queue
import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.uncancellable
import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.CoroutineContext

// stops the join evaluation
Expand Down Expand Up @@ -171,7 +171,7 @@ internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable
*/
fun <O> Stream<Stream<O>>.parJoin(
maxOpen: Int,
ctx: CoroutineContext = ComputationPool
ctx: CoroutineContext = Dispatchers.Default
): Stream<O> {
require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" }

Expand Down Expand Up @@ -204,5 +204,5 @@ fun <O> Stream<Stream<O>>.parJoin(
}

/** Like [parJoin] but races all inner streams simultaneously without limit. */
fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = ComputationPool): Stream<O> =
fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = Dispatchers.Default): Stream<O> =
parJoin(Int.MAX_VALUE, ctx)
Expand Up @@ -9,7 +9,6 @@ import arrow.core.Option
import arrow.core.Some
import arrow.core.extensions.list.foldable.foldLeft
import arrow.core.identity
import arrow.fx.coroutines.ComputationPool
import kotlin.time.Duration
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Fiber
Expand All @@ -21,6 +20,7 @@ import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.concurrent.Signal
import arrow.typeclasses.Monoid
import arrow.typeclasses.Semigroup
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext
Expand Down Expand Up @@ -1307,7 +1307,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* //sampleEnd
* ```
*/
fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = ComputationPool): Stream<O> =
fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = Dispatchers.Default): Stream<O> =
effect { Pair(Promise<Unit>(), Promise<Either<Throwable, Unit>>()) }
.flatMap { (interrupt, doneR) ->
bracket(
Expand Down Expand Up @@ -1342,14 +1342,14 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Merges both Streams into an Stream of A and B represented by Either<A, B>.
* This operation is equivalent to a normal merge but for different types.
*/
fun <B> either(ctx: CoroutineContext = ComputationPool, other: Stream<B>): Stream<Either<O, B>> =
fun <B> either(ctx: CoroutineContext = Dispatchers.Default, other: Stream<B>): Stream<Either<O, B>> =
Stream(this.map { Left(it) }, other.map { Right(it) })
.parJoin(2, ctx)

/**
* Starts this stream and cancels it as finalization of the returned stream.
*/
fun spawn(ctx: CoroutineContext = ComputationPool): Stream<Fiber<Unit>> =
fun spawn(ctx: CoroutineContext = Dispatchers.Default): Stream<Fiber<Unit>> =
supervise(ctx) { drain() }

/**
Expand Down Expand Up @@ -1774,7 +1774,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
/**
* Starts the supplied task and cancels it as finalization of the returned stream.
*/
fun <A> supervise(ctx: CoroutineContext = ComputationPool, fa: suspend () -> A): Stream<Fiber<A>> =
fun <A> supervise(ctx: CoroutineContext = Dispatchers.Default, fa: suspend () -> A): Stream<Fiber<A>> =
bracket(acquire = { fa.forkAndForget(ctx) }, release = { it.cancel() })

/**
Expand Down
Expand Up @@ -9,7 +9,6 @@ import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.guarantee
import arrow.fx.coroutines.timeOutOrNull
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.ExitCase
import kotlin.time.milliseconds
Expand All @@ -20,6 +19,7 @@ import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.types.shouldBeInstanceOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import kotlinx.coroutines.withTimeoutOrNull

class InterruptionTest : StreamSpec(spec = {
"can cancel a hung effect" - {
Expand Down Expand Up @@ -276,13 +276,13 @@ class InterruptionTest : StreamSpec(spec = {
"interrupted effect is cancelled" - {
val latch = Promise<Unit>()

timeOutOrNull(500.milliseconds) {
Stream.effect { guarantee({ latch.get() }) { latch.complete(Unit) } }
.interruptAfter(50.milliseconds)
.drain()
withTimeoutOrNull(500.milliseconds) {
Stream.effect { guarantee({ latch.get() }) { latch.complete(Unit) } }
.interruptAfter(50.milliseconds)
.drain()

latch.get()
true
latch.get()
true
} shouldBe true
}

Expand Down
Expand Up @@ -6,18 +6,18 @@ import arrow.core.extensions.list.foldable.combineAll
import arrow.core.extensions.list.foldable.foldMap
import arrow.core.extensions.monoid
import arrow.core.extensions.semigroup
import kotlin.time.milliseconds
import arrow.fx.coroutines.timeOutOrNull
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.orNull
import io.kotest.property.arbitrary.positiveInts
import io.kotest.property.arbitrary.set
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.math.absoluteValue
import kotlin.math.max
import kotlin.random.Random
import kotlin.time.milliseconds

class StreamTest : StreamSpec(spec = {
"constructors" - {
Expand All @@ -32,7 +32,7 @@ class StreamTest : StreamSpec(spec = {
}

"never() should timeout" {
timeOutOrNull(10.milliseconds) {
withTimeoutOrNull(10.milliseconds) {
Stream.never<Int>().toList()
} shouldBe null
}
Expand Down
Expand Up @@ -4,25 +4,25 @@ import arrow.core.Option
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.ForkConnected
import arrow.fx.coroutines.Promise
import arrow.fx.coroutines.stream.StreamSpec
import kotlin.time.milliseconds
import kotlin.time.seconds
import kotlinx.coroutines.delay
import arrow.fx.coroutines.stream.Stream
import arrow.fx.coroutines.stream.StreamSpec
import arrow.fx.coroutines.stream.append
import arrow.fx.coroutines.stream.drain
import arrow.fx.coroutines.stream.noneTerminate
import arrow.fx.coroutines.stream.parJoinUnbounded
import arrow.fx.coroutines.stream.terminateOnNone
import arrow.fx.coroutines.stream.toList
import arrow.fx.coroutines.timeOutOrNull
import io.kotest.assertions.assertSoftly
import io.kotest.matchers.ints.shouldBeLessThan
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.positiveInts
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.math.max
import kotlin.time.milliseconds
import kotlin.time.seconds

class QueueTest : StreamSpec(spec = {

Expand Down Expand Up @@ -185,7 +185,7 @@ class QueueTest : StreamSpec(spec = {

"cancel" {
val q = Queue.unbounded<Int>()
timeOutOrNull(100.milliseconds) {
withTimeoutOrNull(100.milliseconds) {
q.dequeue1()
} shouldBe null
q.enqueue1(1)
Expand Down
Expand Up @@ -3,7 +3,6 @@ package arrow.fx.coroutines.stream
import arrow.core.Either
import arrow.core.identity
import arrow.fx.coroutines.CancelToken
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
Expand All @@ -26,6 +25,7 @@ import io.kotest.property.arbitrary.long
import io.kotest.property.arbitrary.map
import io.kotest.property.arbitrary.string
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.io.PrintStream
Expand Down Expand Up @@ -158,7 +158,7 @@ internal fun <A> Result<A>.toEither(): Either<Throwable, A> =

internal suspend fun Throwable.suspend(): Nothing =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { throw this }.startCoroutine(Continuation(ComputationPool) {
suspend { throw this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -167,7 +167,7 @@ internal suspend fun Throwable.suspend(): Nothing =

internal suspend fun <A> A.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -179,7 +179,7 @@ internal fun <A> A.suspended(): suspend () -> A =

internal suspend fun <A> Either<Throwable, A>.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
it.fold(
{
it.fold(
Expand Down
Expand Up @@ -19,6 +19,7 @@ import io.kotest.property.arbitrary.long
import io.kotest.property.arbitrary.map
import io.kotest.property.arbitrary.string
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.io.PrintStream
Expand Down Expand Up @@ -151,7 +152,7 @@ fun <A> Result<A>.toEither(): Either<Throwable, A> =

suspend fun Throwable.suspend(): Nothing =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { throw this }.startCoroutine(Continuation(ComputationPool) {
suspend { throw this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -160,7 +161,7 @@ suspend fun Throwable.suspend(): Nothing =

suspend fun <A> A.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -172,7 +173,7 @@ fun <A> A.suspended(): suspend () -> A =

suspend fun <A> Either<Throwable, A>.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
it.fold(
{
it.fold(
Expand Down
@@ -1,18 +1,20 @@
package arrow.fx.coroutines

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.delay

/**
* An [Environment] can run [suspend] programs using [startCoroutine] and [startCoroutineCancellable].
*
* An [Environment] runs on a certain [CoroutineContext] which is used to start the programs on.
* Since coroutines always return where they were started, this [CoroutineContext] also defines where you return after operators like [sleep] or [evalOn].
* Since coroutines always return where they were started, this [CoroutineContext] also defines where you return after operators like [delay] or [evalOn].
* Therefore it's advised to always run on [ComputationPool] which is the default setting.
*
* [Environment] also has an [asyncErrorHandler], which by default redirects to [Throwable.printStackTrace].
Expand Down Expand Up @@ -82,7 +84,7 @@ interface Environment {
fun <A> unsafeRunAsyncCancellable(fa: suspend () -> A, e: (Throwable) -> Unit, a: (A) -> Unit): Disposable

companion object {
operator fun invoke(ctx: CoroutineContext = ComputationPool): Environment =
operator fun invoke(ctx: CoroutineContext = Dispatchers.Default): Environment =
DefaultEnvironment(ctx)
}
}
Expand Down
13 changes: 7 additions & 6 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt
Expand Up @@ -2,6 +2,7 @@ package arrow.fx.coroutines

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
Expand Down Expand Up @@ -70,7 +71,7 @@ internal fun <A> Fiber(promise: UnsafePromise<A>, conn: SuspendConnection): Fibe
"Use Deferred with KotlinX Coroutines Structured Concurrency",
ReplaceWith("async(ctx) { f() }", "kotlinx.coroutines.Deferred")
)
suspend fun <A> ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> {
suspend fun <A> ForkConnected(ctx: CoroutineContext = Dispatchers.Default, f: suspend () -> A): Fiber<A> {
val def = CoroutineScope(coroutineContext[Job] ?: Job()).async(ctx) {
runCatching { f.invoke() }
}
Expand All @@ -86,7 +87,7 @@ suspend fun <A> ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspen
"Use Deferred with KotlinX Coroutines Structured Concurrency",
ReplaceWith("async(ctx) { invoke() }", "kotlinx.coroutines.Deferred")
)
suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = ComputationPool): Fiber<A> =
suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = Dispatchers.Default): Fiber<A> =
ForkConnected(ctx, this)

/**
Expand Down Expand Up @@ -124,7 +125,7 @@ suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = Computat
*/
// TODO provide proper deprecation annotation
suspend fun <A> ForkScoped(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
interruptWhen: suspend () -> Unit,
f: suspend () -> A
): Fiber<A> {
Expand All @@ -137,7 +138,7 @@ suspend fun <A> ForkScoped(
/** @see ForkScoped */
// TODO provide proper deprecation annotation
suspend fun <A> (suspend () -> A).forkScoped(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
interruptWhen: suspend () -> Unit
): Fiber<A> = ForkScoped(ctx, interruptWhen, this)

Expand All @@ -151,12 +152,12 @@ suspend fun <A> (suspend () -> A).forkScoped(
* @see ForkConnected for a fork operation that wires cancellation to its parent in a safe way.
*/
// TODO provide proper deprecation annotation
suspend fun <A> ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> =
suspend fun <A> ForkAndForget(ctx: CoroutineContext = Dispatchers.Default, f: suspend () -> A): Fiber<A> =
f.forkAndForget(ctx)

/** @see ForkAndForget */
// TODO provide proper deprecation annotation
suspend fun <A> (suspend () -> A).forkAndForget(ctx: CoroutineContext = ComputationPool): Fiber<A> =
suspend fun <A> (suspend () -> A).forkAndForget(ctx: CoroutineContext = Dispatchers.Default): Fiber<A> =
CoroutineScope(ctx).async {
invoke()
}.toFiber()
Expand Down
Expand Up @@ -5,7 +5,7 @@ import kotlin.coroutines.CoroutineContext

@Deprecated("Use withContext", replaceWith = ReplaceWith("withContext(ctx, block)", "kotlinx.coroutines.withContext"))
i-walker marked this conversation as resolved.
Show resolved Hide resolved
suspend fun <T> (suspend () -> T).evalOn(ctx: CoroutineContext): T =
evalOn(ctx, this)
withContext<T>(ctx) { this@evalOn.invoke() }

/**
* Executes a task on [context] and comes back to the original [CoroutineContext].
Expand Down
@@ -1,5 +1,6 @@
package arrow.fx.coroutines

import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.intrinsics.createCoroutineUnintercepted
Expand All @@ -16,7 +17,7 @@ abstract class CancellableContinuation<A> internal constructor() : Continuation<
@Suppress("FunctionName")
@Deprecated("Use KotlinX structured concurrency as unsafe Environment to launch side-effects from non-suspending code")
fun <A> CancellableContinuation(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
resumeWith: (Result<A>) -> Unit
): CancellableContinuation<A> = CancellableContinuation(ctx, SuspendConnection(), resumeWith)

Expand All @@ -43,7 +44,7 @@ fun <A> (suspend () -> A).startCoroutineCancellable(completion: CancellableConti
@Suppress("FunctionName")
@Deprecated("Use KotlinX structured concurrency as unsafe Environment to launch side-effects from non-suspending code")
internal fun <A> CancellableContinuation(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
conn: SuspendConnection,
resumeWith: (Result<A>) -> Unit
): CancellableContinuation<A> = object : CancellableContinuation<A>() {
Expand Down