Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

remove EvalOnTest, inline ComputationPool, evalOn, #362

Merged
merged 3 commits into from
Dec 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import arrow.core.Option
import arrow.core.Some
import arrow.core.extensions.option.monad.flatten
import arrow.core.orElse
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Platform
import arrow.fx.coroutines.Semaphore
import arrow.fx.coroutines.stream.concurrent.NoneTerminatedQueue
import arrow.fx.coroutines.stream.concurrent.Queue
import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.uncancellable
import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.CoroutineContext

// stops the join evaluation
Expand Down Expand Up @@ -171,7 +171,7 @@ internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable
*/
fun <O> Stream<Stream<O>>.parJoin(
maxOpen: Int,
ctx: CoroutineContext = ComputationPool
ctx: CoroutineContext = Dispatchers.Default
): Stream<O> {
require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" }

Expand Down Expand Up @@ -204,5 +204,5 @@ fun <O> Stream<Stream<O>>.parJoin(
}

/** Like [parJoin] but races all inner streams simultaneously without limit. */
fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = ComputationPool): Stream<O> =
fun <O> Stream<Stream<O>>.parJoinUnbounded(ctx: CoroutineContext = Dispatchers.Default): Stream<O> =
parJoin(Int.MAX_VALUE, ctx)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import arrow.core.Option
import arrow.core.Some
import arrow.core.extensions.list.foldable.foldLeft
import arrow.core.identity
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.Duration
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Fiber
Expand All @@ -20,6 +19,7 @@ import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.concurrent.Signal
import arrow.typeclasses.Monoid
import arrow.typeclasses.Semigroup
import kotlinx.coroutines.Dispatchers
import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
Expand Down Expand Up @@ -1303,7 +1303,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* //sampleEnd
* ```
*/
fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = ComputationPool): Stream<O> =
fun <O2> concurrently(other: Stream<O2>, ctx: CoroutineContext = Dispatchers.Default): Stream<O> =
effect { Pair(Promise<Unit>(), Promise<Either<Throwable, Unit>>()) }
.flatMap { (interrupt, doneR) ->
bracket(
Expand Down Expand Up @@ -1338,14 +1338,14 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Merges both Streams into an Stream of A and B represented by Either<A, B>.
* This operation is equivalent to a normal merge but for different types.
*/
fun <B> either(ctx: CoroutineContext = ComputationPool, other: Stream<B>): Stream<Either<O, B>> =
fun <B> either(ctx: CoroutineContext = Dispatchers.Default, other: Stream<B>): Stream<Either<O, B>> =
Stream(this.map { Left(it) }, other.map { Right(it) })
.parJoin(2, ctx)

/**
* Starts this stream and cancels it as finalization of the returned stream.
*/
fun spawn(ctx: CoroutineContext = ComputationPool): Stream<Fiber<Unit>> =
fun spawn(ctx: CoroutineContext = Dispatchers.Default): Stream<Fiber<Unit>> =
supervise(ctx) { drain() }

/**
Expand Down Expand Up @@ -1770,7 +1770,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
/**
* Starts the supplied task and cancels it as finalization of the returned stream.
*/
fun <A> supervise(ctx: CoroutineContext = ComputationPool, fa: suspend () -> A): Stream<Fiber<A>> =
fun <A> supervise(ctx: CoroutineContext = Dispatchers.Default, fa: suspend () -> A): Stream<Fiber<A>> =
bracket(acquire = { fa.forkAndForget(ctx) }, release = { it.cancel() })

/**
Expand Down Expand Up @@ -2597,7 +2597,7 @@ private fun <A, B, C> Pull<A, Unit>.zipWith_(
}

typealias ZipWithCont<I, O> =
(Either<Pair<Chunk<I>, Pull<I, Unit>>, Pull<I, Unit>>) -> Pull<O, Unit>
(Either<Pair<Chunk<I>, Pull<I, Unit>>, Pull<I, Unit>>) -> Pull<O, Unit>

/** `Monoid` instance for `Stream`. */
fun <O> Stream.Companion.monoid(): Monoid<Stream<O>> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package arrow.fx.coroutines.stream
import arrow.core.Either
import arrow.core.identity
import arrow.fx.coroutines.CancelToken
import arrow.fx.coroutines.ComputationPool
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkAndForget
import arrow.fx.coroutines.Promise
Expand All @@ -26,6 +25,7 @@ import io.kotest.property.arbitrary.long
import io.kotest.property.arbitrary.map
import io.kotest.property.arbitrary.string
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.io.PrintStream
Expand Down Expand Up @@ -158,7 +158,7 @@ internal fun <A> Result<A>.toEither(): Either<Throwable, A> =

internal suspend fun Throwable.suspend(): Nothing =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { throw this }.startCoroutine(Continuation(ComputationPool) {
suspend { throw this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -167,7 +167,7 @@ internal suspend fun Throwable.suspend(): Nothing =

internal suspend fun <A> A.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -179,7 +179,7 @@ internal fun <A> A.suspended(): suspend () -> A =

internal suspend fun <A> Either<Throwable, A>.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
it.fold(
{
it.fold(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.kotest.property.arbitrary.long
import io.kotest.property.arbitrary.map
import io.kotest.property.arbitrary.string
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.io.PrintStream
Expand Down Expand Up @@ -162,7 +163,7 @@ fun <A> Result<A>.toEither(): Either<Throwable, A> =

suspend fun Throwable.suspend(): Nothing =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { throw this }.startCoroutine(Continuation(ComputationPool) {
suspend { throw this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand All @@ -171,7 +172,7 @@ suspend fun Throwable.suspend(): Nothing =

suspend fun <A> A.suspend(): A =
suspendCoroutineUninterceptedOrReturn { cont ->
suspend { this }.startCoroutine(Continuation(ComputationPool) {
suspend { this }.startCoroutine(Continuation(Dispatchers.Default) {
cont.intercepted().resumeWith(it)
})

Expand Down
20 changes: 10 additions & 10 deletions arrow-fx-coroutines/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id "org.jetbrains.kotlin.jvm"
id "org.jlleitschuh.gradle.ktlint"
id "org.jetbrains.kotlin.jvm"
id "org.jlleitschuh.gradle.ktlint"
}

apply plugin: 'kotlinx-atomicfu'
Expand All @@ -9,13 +9,13 @@ apply from: "$SUB_PROJECT"
apply from: "$DOC_CREATION"

dependencies {
api "io.arrow-kt:arrow-core:$VERSION_NAME"
implementation project(":arrow-fx-suspend-connection")
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$KOTLINX_COROUTINES_VERSION"
api "io.arrow-kt:arrow-core:$VERSION_NAME"
implementation project(":arrow-fx-suspend-connection")
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$KOTLINX_COROUTINES_VERSION"

testImplementation project(":arrow-fx-coroutines-test")
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLINX_COROUTINES_VERSION"
testImplementation "io.kotest:kotest-runner-junit5-jvm:$KOTEST_VERSION" // for kotest framework
testImplementation "io.kotest:kotest-assertions-core-jvm:$KOTEST_VERSION" // for kotest core jvm assertions
testImplementation "io.kotest:kotest-property-jvm:$KOTEST_VERSION" // for kotest property test
testImplementation project(":arrow-fx-coroutines-test")
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLINX_COROUTINES_VERSION"
testImplementation "io.kotest:kotest-runner-junit5-jvm:$KOTEST_VERSION" // for kotest framework
testImplementation "io.kotest:kotest-assertions-core-jvm:$KOTEST_VERSION" // for kotest core jvm assertions
testImplementation "io.kotest:kotest-property-jvm:$KOTEST_VERSION" // for kotest property test
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package arrow.fx.coroutines
import arrow.core.Either
import arrow.core.identity
import arrow.fx.coroutines.CircuitBreaker.State.Closed
import arrow.fx.coroutines.CircuitBreaker.State.HalfOpen
import arrow.fx.coroutines.CircuitBreaker.State.Open

class CircuitBreaker constructor(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package arrow.fx.coroutines

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -82,7 +83,7 @@ interface Environment {
fun <A> unsafeRunAsyncCancellable(fa: suspend () -> A, e: (Throwable) -> Unit, a: (A) -> Unit): Disposable

companion object {
operator fun invoke(ctx: CoroutineContext = ComputationPool): Environment =
operator fun invoke(ctx: CoroutineContext = Dispatchers.Default): Environment =
DefaultEnvironment(ctx)
}
}
Expand Down
13 changes: 7 additions & 6 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arrow.fx.coroutines

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
Expand Down Expand Up @@ -68,7 +69,7 @@ internal fun <A> Fiber(promise: UnsafePromise<A>, conn: SuspendConnection): Fibe
"Use Deferred with KotlinX Coroutines Structured Concurrency",
ReplaceWith("async(ctx) { f() }", "kotlinx.coroutines.Deferred")
)
suspend fun <A> ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> {
suspend fun <A> ForkConnected(ctx: CoroutineContext = Dispatchers.Default, f: suspend () -> A): Fiber<A> {
val def = CoroutineScope(coroutineContext[Job] ?: Job()).async(ctx) {
runCatching { f.invoke() }
}
Expand All @@ -84,7 +85,7 @@ suspend fun <A> ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspen
"Use Deferred with KotlinX Coroutines Structured Concurrency",
ReplaceWith("async(ctx) { invoke() }", "kotlinx.coroutines.Deferred")
)
suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = ComputationPool): Fiber<A> =
suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = Dispatchers.Default): Fiber<A> =
ForkConnected(ctx, this)

/**
Expand Down Expand Up @@ -120,7 +121,7 @@ suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = Computat
*/
// TODO provide proper deprecation annotation
suspend fun <A> ForkScoped(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
interruptWhen: suspend () -> Unit,
f: suspend () -> A
): Fiber<A> {
Expand All @@ -133,7 +134,7 @@ suspend fun <A> ForkScoped(
/** @see ForkScoped */
// TODO provide proper deprecation annotation
suspend fun <A> (suspend () -> A).forkScoped(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
interruptWhen: suspend () -> Unit
): Fiber<A> = ForkScoped(ctx, interruptWhen, this)

Expand All @@ -147,12 +148,12 @@ suspend fun <A> (suspend () -> A).forkScoped(
* @see ForkConnected for a fork operation that wires cancellation to its parent in a safe way.
*/
// TODO provide proper deprecation annotation
suspend fun <A> ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> =
suspend fun <A> ForkAndForget(ctx: CoroutineContext = Dispatchers.Default, f: suspend () -> A): Fiber<A> =
f.forkAndForget(ctx)

/** @see ForkAndForget */
// TODO provide proper deprecation annotation
suspend fun <A> (suspend () -> A).forkAndForget(ctx: CoroutineContext = ComputationPool): Fiber<A> =
suspend fun <A> (suspend () -> A).forkAndForget(ctx: CoroutineContext = Dispatchers.Default): Fiber<A> =
CoroutineScope(ctx).async {
invoke()
}.toFiber()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package arrow.fx.coroutines

import arrow.fx.coroutines.ForwardCancellable.Companion.State.Active
import arrow.fx.coroutines.ForwardCancellable.Companion.State.Empty
import kotlinx.atomicfu.atomic
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import kotlin.coroutines.CoroutineContext

@Deprecated("Use withContext", replaceWith = ReplaceWith("withContext(ctx, block)", "kotlinx.coroutines.withContext"))
suspend fun <T> (suspend () -> T).evalOn(ctx: CoroutineContext): T =
evalOn(ctx, this)
withContext(ctx) { this@evalOn.invoke() }

/**
* Executes a task on [context] and comes back to the original [CoroutineContext].
*
* State of [context] and previous [CoroutineContext] is merged
*/
@Deprecated("Use withContext", replaceWith = ReplaceWith("withContext(context, block)", "kotlinx.coroutines.withContext"))
@Deprecated(
"Use withContext",
replaceWith = ReplaceWith("withContext(context, block)", "kotlinx.coroutines.withContext")
)
suspend fun <T> evalOn(
context: CoroutineContext,
block: suspend () -> T
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package arrow.fx.coroutines

import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.intrinsics.createCoroutineUnintercepted
Expand All @@ -16,7 +17,7 @@ abstract class CancellableContinuation<A> internal constructor() : Continuation<
@Suppress("FunctionName")
@Deprecated("Use KotlinX structured concurrency as unsafe Environment to launch side-effects from non-suspending code")
fun <A> CancellableContinuation(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
resumeWith: (Result<A>) -> Unit
): CancellableContinuation<A> = CancellableContinuation(ctx, SuspendConnection(), resumeWith)

Expand All @@ -43,7 +44,7 @@ fun <A> (suspend () -> A).startCoroutineCancellable(completion: CancellableConti
@Suppress("FunctionName")
@Deprecated("Use KotlinX structured concurrency as unsafe Environment to launch side-effects from non-suspending code")
internal fun <A> CancellableContinuation(
ctx: CoroutineContext = ComputationPool,
ctx: CoroutineContext = Dispatchers.Default,
conn: SuspendConnection,
resumeWith: (Result<A>) -> Unit
): CancellableContinuation<A> = object : CancellableContinuation<A>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import arrow.core.Either
import io.kotest.assertions.fail
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.matchers.shouldBe
import java.lang.RuntimeException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext

class CircuitBreakerTest : ArrowFxSpec(spec = {

Expand All @@ -18,7 +19,7 @@ class CircuitBreakerTest : ArrowFxSpec(spec = {
val cb = CircuitBreaker.of(maxFailures = maxFailures, resetTimeout = resetTimeout)!!
var effect = 0
repeat(Schedule.recurs(10_000)) {
cb.protect { evalOn(ComputationPool) { effect += 1 } }
cb.protect { withContext(Dispatchers.Default) { effect += 1 } }
}
effect shouldBe 10_001
}
Expand Down Expand Up @@ -265,7 +266,7 @@ fun <A> recurAndCollect(n: Int): Schedule<A, List<A>> =

tailrec suspend fun stackSafeSuspend(cb: CircuitBreaker, n: Int, acc: Int): Int =
if (n > 0) {
val s = cb.protect { evalOn(ComputationPool) { acc + 1 } }
val s = cb.protect { withContext(Dispatchers.Default) { acc + 1 } }
stackSafeSuspend(cb, n - 1, s)
} else acc

Expand Down