-
Notifications
You must be signed in to change notification settings - Fork 437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempt to make DeferredK rerunnable #1157
Changes from 2 commits
85431e2
7a2dc76
3c6b93b
c8032a4
c57f8f9
fde6406
46501bc
3238371
55da6e1
45f2f0f
ee16856
9f44f44
5e12e53
60a87bd
6dbbdc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,29 +9,68 @@ import kotlinx.coroutines.* | |
import kotlin.coroutines.CoroutineContext | ||
|
||
fun <A> Deferred<A>.k(): DeferredK<A> = | ||
DeferredK(this) | ||
DeferredK.Wrapped(memoized = this) | ||
|
||
fun <A> CoroutineScope.asyncK(ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, f: suspend CoroutineScope.() -> A): DeferredK<A> = | ||
this.async(ctx, start, f).k() | ||
DeferredK.Generated(ctx, start, this) { f() } | ||
|
||
fun <A> DeferredKOf<A>.value(): Deferred<A> = this.fix() | ||
fun <A> DeferredKOf<A>.value(): Deferred<A> = this.fix().memoized | ||
|
||
fun <A> DeferredKOf<A>.scope(): CoroutineScope = this.fix().scope | ||
|
||
@higherkind | ||
@ExperimentalCoroutinesApi | ||
data class DeferredK<out A>(private val deferred: Deferred<A>, val scope: CoroutineScope = GlobalScope) : DeferredKOf<A>, Deferred<A> by deferred { | ||
sealed class DeferredK<A>( | ||
val scope: CoroutineScope = GlobalScope, | ||
val memoized: Deferred<A> | ||
) : DeferredKOf<A>, Deferred<A> by memoized { | ||
|
||
/** | ||
* Pure wrapper for already constructed deferred instances, does nothing special really | ||
*/ | ||
class Wrapped<A>(scope: CoroutineScope = GlobalScope, memoized: Deferred<A>) : DeferredK<A>(scope = scope, memoized = memoized) | ||
|
||
/** | ||
* Represents a DeferredK that can generate an instance of deferred on every await | ||
* | ||
* It does not memoize results and thus can be rerun just as expected from a MonadDefer | ||
* However one can still break this system by ie returning or using a deferred in one of the functions | ||
* only when creating all deferred instances inside DeferredK or using DeferredK's methods | ||
* one can guarantee not having memoization | ||
*/ | ||
class Generated<A>( | ||
val ctx: CoroutineContext = Dispatchers.Default, | ||
val coroutineStart: CoroutineStart = CoroutineStart.LAZY, | ||
scope: CoroutineScope = GlobalScope, | ||
val generator: suspend () -> A | ||
) : DeferredK<A>(scope, scope.async(ctx, coroutineStart) { generator() }) { | ||
|
||
override suspend fun await(): A = | ||
if (memoized.isCompleted || memoized.isActive || memoized.isCancelled) { | ||
scope.async(ctx, coroutineStart) { generator() }.await() | ||
} else { | ||
memoized.await() | ||
} | ||
} | ||
|
||
fun <B> map(f: (A) -> B): DeferredK<B> = | ||
flatMap { a: A -> just(f(a)) } | ||
|
||
fun <B> ap(fa: DeferredKOf<(A) -> B>): DeferredK<B> = | ||
flatMap { a -> fa.fix().map { ff -> ff(a) } } | ||
|
||
fun <B> flatMap(f: (A) -> DeferredKOf<B>): DeferredK<B> = | ||
scope.asyncK(Dispatchers.Unconfined, CoroutineStart.LAZY) { | ||
f(await()).await() | ||
fun <B> flatMap(f: (A) -> DeferredKOf<B>): DeferredK<B> = when (this) { | ||
is Generated -> Generated(ctx, coroutineStart, scope) { | ||
f( | ||
scope.async(ctx, coroutineStart) { generator() }.await() | ||
).await() | ||
} | ||
is Wrapped -> Generated(Dispatchers.Unconfined, CoroutineStart.LAZY, scope) { | ||
f( | ||
memoized.await() | ||
).await() | ||
} | ||
} | ||
|
||
fun <B> bracketCase(use: (A) -> DeferredK<B>, release: (A, ExitCase<Throwable>) -> DeferredK<Unit>): DeferredK<B> = | ||
flatMap { a -> | ||
|
@@ -43,42 +82,41 @@ data class DeferredK<out A>(private val deferred: Deferred<A>, val scope: Corout | |
} | ||
} | ||
|
||
fun continueOn(ctx: CoroutineContext): DeferredK<A> = | ||
scope.asyncK(ctx, CoroutineStart.LAZY) { | ||
deferred.await() | ||
fun continueOn(ctx: CoroutineContext): DeferredK<A> = when (this) { | ||
is Generated -> Generated(ctx, coroutineStart, scope) { | ||
scope.async(this@DeferredK.ctx, coroutineStart) { | ||
generator() | ||
}.await() | ||
} | ||
is Wrapped -> scope.asyncK(ctx, CoroutineStart.LAZY) { memoized.await() } | ||
} | ||
|
||
override fun equals(other: Any?): Boolean = | ||
when (other) { | ||
is DeferredK<*> -> this.deferred == other.deferred | ||
is Deferred<*> -> this.deferred == other | ||
is DeferredK<*> -> this.memoized == other.memoized | ||
is Deferred<*> -> this.memoized == other | ||
else -> false | ||
} | ||
|
||
override fun hashCode(): Int = deferred.hashCode() | ||
override fun hashCode(): Int = memoized.hashCode() | ||
|
||
companion object { | ||
fun unit(): DeferredK<Unit> = | ||
CompletableDeferred(Unit).k() | ||
|
||
fun <A> just(a: A): DeferredK<A> = | ||
CompletableDeferred(a).k() | ||
fun unit(): DeferredK<Unit> = just(Unit) | ||
|
||
fun <A> defer(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, f: suspend () -> A): DeferredK<A> = | ||
scope.asyncK(ctx, start) { f() } | ||
fun <A> just(a: A): DeferredK<A> = CompletableDeferred(a).k() | ||
|
||
fun <A> defer(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, fa: () -> DeferredKOf<A>): DeferredK<A> = | ||
scope.asyncK(ctx, start) { fa().await() } | ||
Generated(ctx, start, scope) { fa().await() } | ||
|
||
operator fun <A> invoke(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, f: () -> A): DeferredK<A> = | ||
scope.asyncK(ctx, start) { f() } | ||
|
||
fun <A> failed(t: Throwable): DeferredK<A> = | ||
CompletableDeferred<A>().apply { completeExceptionally(t) }.k() | ||
operator fun <A> invoke(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, f: suspend () -> A): DeferredK<A> = | ||
Generated(ctx, start, scope, f) | ||
|
||
fun <A> raiseError(t: Throwable): DeferredK<A> = | ||
failed(t) | ||
|
||
fun <A> failed(t: Throwable): DeferredK<A> = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. squint why is this here? why do we need an alias? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know I wrote it, I cannot remember why. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other than providing a more intuitive constructor for those that have never used raiseError I don't see the point either :) I'll remove it for now |
||
CompletableDeferred<A>().apply { completeExceptionally(t) }.k() | ||
|
||
/** | ||
* Starts a coroutine that'll run [Proc]. | ||
* | ||
|
@@ -87,7 +125,7 @@ data class DeferredK<out A>(private val deferred: Deferred<A>, val scope: Corout | |
* and its [CoroutineStart] is [CoroutineStart.DEFAULT]. | ||
*/ | ||
fun <A> async(scope: CoroutineScope = GlobalScope, ctx: CoroutineContext = Dispatchers.Default, start: CoroutineStart = CoroutineStart.LAZY, fa: Proc<A>): DeferredK<A> = | ||
scope.asyncK(ctx, start) { | ||
Generated(ctx, start, scope) { | ||
CompletableDeferred<A>().apply { | ||
fa { | ||
it.fold(this::completeExceptionally, this::complete) | ||
|
@@ -98,7 +136,7 @@ data class DeferredK<out A>(private val deferred: Deferred<A>, val scope: Corout | |
fun <A, B> tailRecM(a: A, f: (A) -> DeferredKOf<Either<A, B>>): DeferredK<B> = | ||
f(a).value().let { initial: Deferred<Either<A, B>> -> | ||
var current: Deferred<Either<A, B>> = initial | ||
GlobalScope.async(Dispatchers.Unconfined, CoroutineStart.LAZY) { | ||
Generated(Dispatchers.Unconfined, CoroutineStart.LAZY, GlobalScope) { | ||
val result: B | ||
while (true) { | ||
val actual: Either<A, B> = current.await() | ||
|
@@ -110,13 +148,13 @@ data class DeferredK<out A>(private val deferred: Deferred<A>, val scope: Corout | |
} | ||
} | ||
result | ||
}.k() | ||
} | ||
} | ||
} | ||
} | ||
|
||
fun <A> DeferredKOf<A>.handleErrorWith(f: (Throwable) -> DeferredK<A>): DeferredK<A> = | ||
scope().asyncK(Dispatchers.Unconfined, CoroutineStart.LAZY) { | ||
DeferredK.Generated(Dispatchers.Unconfined, CoroutineStart.LAZY) { | ||
Try { await() }.fold({ f(it).await() }, ::identity) | ||
} | ||
|
||
|
@@ -127,13 +165,13 @@ fun <A> DeferredKOf<A>.unsafeRunSync(): A = | |
runBlocking { await() } | ||
|
||
fun <A> DeferredKOf<A>.runAsync(cb: (Either<Throwable, A>) -> DeferredKOf<Unit>): DeferredK<Unit> = | ||
DeferredK.invoke(scope(), Dispatchers.Unconfined, CoroutineStart.DEFAULT) { | ||
DeferredK.invoke(fix().scope(), Dispatchers.Unconfined, CoroutineStart.DEFAULT) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix shouldn't be necessary here, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
fix().forceExceptionPropagation() | ||
unsafeRunAsync(cb.andThen { it.unsafeRunAsync { } }) | ||
} | ||
|
||
fun <A> DeferredKOf<A>.runAsyncCancellable(onCancel: OnCancel = OnCancel.Silent, cb: (Either<Throwable, A>) -> DeferredKOf<Unit>): DeferredK<Disposable> = | ||
DeferredK.invoke(scope(), Dispatchers.Unconfined, CoroutineStart.DEFAULT) { | ||
DeferredK.invoke(fix().scope(), Dispatchers.Unconfined, CoroutineStart.DEFAULT) { | ||
fix().forceExceptionPropagation() | ||
val call = CompletableDeferred<Unit>(parent = runAsync(cb)) | ||
val disposable: Disposable = { | ||
|
@@ -146,7 +184,7 @@ fun <A> DeferredKOf<A>.runAsyncCancellable(onCancel: OnCancel = OnCancel.Silent, | |
} | ||
|
||
fun <A> DeferredKOf<A>.unsafeRunAsync(cb: (Either<Throwable, A>) -> Unit): Unit = | ||
scope().async(Dispatchers.Unconfined, CoroutineStart.DEFAULT) { | ||
fix().scope().async(Dispatchers.Unconfined, CoroutineStart.DEFAULT) { | ||
Try { await() }.fold({ cb(Left(it)) }, { cb(Right(it)) }) | ||
}.forceExceptionPropagation() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5k is enough to trigger a SO, 50k makes them too slow maybe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done