Skip to content

Commit

Permalink
AwaitScope
Browse files Browse the repository at this point in the history
  • Loading branch information
serras committed May 25, 2024
1 parent 9894996 commit 05d9cf2
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
12 changes: 12 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,15 @@ public final class arrow/fx/coroutines/ScopedRaiseAccumulate : arrow/core/raise/
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
}

public final class arrow/fx/coroutines/await/AwaitScope : kotlinx/coroutines/CoroutineScope {
public fun <init> (Lkotlinx/coroutines/CoroutineScope;)V
public final fun async (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/Deferred;
public static synthetic fun async$default (Larrow/fx/coroutines/await/AwaitScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Deferred;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
}

public final class arrow/fx/coroutines/await/AwaitScopeKt {
public static final fun await (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun await (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ final class <#A: kotlin/Any?> arrow.fx.coroutines/ScopedRaiseAccumulate : arrow.
final val coroutineContext // arrow.fx.coroutines/ScopedRaiseAccumulate.coroutineContext|{}coroutineContext[0]
final fun <get-coroutineContext>(): kotlin.coroutines/CoroutineContext // arrow.fx.coroutines/ScopedRaiseAccumulate.coroutineContext.<get-coroutineContext>|<get-coroutineContext>(){}[0]
}
final class arrow.fx.coroutines.await/AwaitScope : kotlinx.coroutines/CoroutineScope { // arrow.fx.coroutines.await/AwaitScope|null[0]
constructor <init>(kotlinx.coroutines/CoroutineScope) // arrow.fx.coroutines.await/AwaitScope.<init>|<init>(kotlinx.coroutines.CoroutineScope){}[0]
final fun <#A1: kotlin/Any?> async(kotlin.coroutines/CoroutineContext = ..., kotlinx.coroutines/CoroutineStart = ..., kotlin.coroutines/SuspendFunction1<kotlinx.coroutines/CoroutineScope, #A1>): kotlinx.coroutines/Deferred<#A1> // arrow.fx.coroutines.await/AwaitScope.async|async(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.CoroutineStart;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,0:0>){0§<kotlin.Any?>}[0]
final val coroutineContext // arrow.fx.coroutines.await/AwaitScope.coroutineContext|{}coroutineContext[0]
final fun <get-coroutineContext>(): kotlin.coroutines/CoroutineContext // arrow.fx.coroutines.await/AwaitScope.coroutineContext.<get-coroutineContext>|<get-coroutineContext>(){}[0]
}
final class arrow.fx.coroutines/CountDownLatch { // arrow.fx.coroutines/CountDownLatch|null[0]
constructor <init>(kotlin/Long) // arrow.fx.coroutines/CountDownLatch.<init>|<init>(kotlin.Long){}[0]
final fun count(): kotlin/Long // arrow.fx.coroutines/CountDownLatch.count|count(){}[0]
Expand Down Expand Up @@ -57,6 +63,8 @@ final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterabl
final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterable<#A>).arrow.fx.coroutines/parMapNotNull(kotlin.coroutines/CoroutineContext = ..., kotlin/Int, kotlin.coroutines/SuspendFunction2<kotlinx.coroutines/CoroutineScope, #A, #B?>): kotlin.collections/List<#B> // arrow.fx.coroutines/parMapNotNull|parMapNotNull@kotlin.collections.Iterable<0:0>(kotlin.coroutines.CoroutineContext;kotlin.Int;kotlin.coroutines.SuspendFunction2<kotlinx.coroutines.CoroutineScope,0:0,0:1?>){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ResourceScope, #A>).arrow.fx.coroutines/use(kotlin.coroutines/SuspendFunction1<#A, #B>): #B // arrow.fx.coroutines/use|use@kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ResourceScope,0:0>(kotlin.coroutines.SuspendFunction1<0:0,0:1>){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ResourceScope, #A>).arrow.fx.coroutines/allocated(): kotlin/Pair<#A, kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ExitCase, kotlin/Unit>> // arrow.fx.coroutines/allocated|allocated@kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ResourceScope,0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines/CoroutineScope).arrow.fx.coroutines.await/await(kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines.await/AwaitScope, #A>): #A // arrow.fx.coroutines.await/await|await@kotlinx.coroutines.CoroutineScope(kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.await.AwaitScope,0:0>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines.await/await(kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines.await/AwaitScope, #A>): #A // arrow.fx.coroutines.await/await|await(kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.await.AwaitScope,0:0>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines/resourceScope(kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ResourceScope, #A>): #A // arrow.fx.coroutines/resourceScope|resourceScope(kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ResourceScope,0:0>){0§<kotlin.Any?>}[0]
final suspend fun arrow.fx.coroutines/cancelAndCompose(kotlinx.coroutines/Deferred<*>, kotlinx.coroutines/Deferred<*>) // arrow.fx.coroutines/cancelAndCompose|cancelAndCompose(kotlinx.coroutines.Deferred<*>;kotlinx.coroutines.Deferred<*>){}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?, #D: kotlin/Any?, #E: kotlin/Any?, #F: kotlin/Any?, #G: kotlin/Any?, #H: kotlin/Any?, #I: kotlin/Any?, #J: kotlin/Any?, #K: kotlin/Any?> (arrow.core.raise/Raise<#A>).arrow.fx.coroutines/parZipOrAccumulate(crossinline kotlin/Function2<#A, #A, #A>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #B>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #C>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #D>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #E>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #F>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #G>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #H>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #I>, crossinline kotlin.coroutines/SuspendFunction1<arrow.fx.coroutines/ScopedRaiseAccumulate<#A>, #J>, crossinline kotlin.coroutines/SuspendFunction10<kotlinx.coroutines/CoroutineScope, #B, #C, #D, #E, #F, #G, #H, #I, #J, #K>): #K // arrow.fx.coroutines/parZipOrAccumulate|parZipOrAccumulate@arrow.core.raise.Raise<0:0>(kotlin.Function2<0:0,0:0,0:0>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:1>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:2>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:3>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:4>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:5>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:6>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:7>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:8>;kotlin.coroutines.SuspendFunction1<arrow.fx.coroutines.ScopedRaiseAccumulate<0:0>,0:9>;kotlin.coroutines.SuspendFunction10<kotlinx.coroutines.CoroutineScope,0:1,0:2,0:3,0:4,0:5,0:6,0:7,0:8,0:9,0:10>){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.Any?>;3§<kotlin.Any?>;4§<kotlin.Any?>;5§<kotlin.Any?>;6§<kotlin.Any?>;7§<kotlin.Any?>;8§<kotlin.Any?>;9§<kotlin.Any?>;10§<kotlin.Any?>}[0]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package arrow.fx.coroutines.await

import arrow.atomic.Atomic
import arrow.atomic.update
import kotlinx.coroutines.async as coroutinesAsync
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

public suspend fun <A> await(
block: suspend AwaitScope.() -> A
): A = coroutineScope { block(AwaitScope(this)) }

public suspend fun <A> CoroutineScope.await(
block: suspend AwaitScope.() -> A
): A = block(AwaitScope(this))

/**
* Within an [AwaitScope], any call to [kotlinx.coroutines.Deferred.await]
* causes all the other [Deferred] in the same block to be awaited too.
* That way you can get more concurrency without having to sacrifice
* readability.
*
* ```kotlin
* suspend fun loadUserInfo(id: UserId): UserInfo = await {
* val name = async { loadUserFromDb(id) }
* val avatar = async { loadAvatar(id) }
* UserInfo(
* name.await(), // <- at this point every 'async' is 'await'ed
* avatar.await() // <- so when you reach this 'await', the value is already there
* )
* }
*
* suspend fun loadUserInfoWithoutAwait(id: UserId): UserInfo {
* val name = async { loadUserFromDb(id) }
* val avatar = async { loadAvatar(id) }
* awaitAll(name, avatar) // <- this is required otherwise
* return UserInfo(
* name.await(),
* avatar.await()
* )
* }
* ```
*/
public class AwaitScope(
private val scope: CoroutineScope
): CoroutineScope by scope {
private val tasks: Atomic<List<Deferred<*>>> = Atomic(emptyList())

public fun <T> async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val deferred = coroutinesAsync(context, start, block)
tasks.update { it + deferred }
return Await(deferred)
}

private inner class Await<T>(
private val deferred: Deferred<T>
): Deferred<T> by deferred {
override suspend fun await(): T {
tasks.getAndSet(emptyList()).awaitAll()
return deferred.await()
}
}
}

0 comments on commit 05d9cf2

Please sign in to comment.