Permalink
Cannot retrieve contributors at this time
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
271 lines (246 sloc)
12 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | |
*/ | |
@file:JvmMultifileClass | |
@file:JvmName("BuildersKt") | |
@file:OptIn(ExperimentalContracts::class) | |
package kotlinx.coroutines | |
import kotlinx.atomicfu.* | |
import kotlinx.coroutines.internal.* | |
import kotlinx.coroutines.intrinsics.* | |
import kotlinx.coroutines.selects.* | |
import kotlin.contracts.* | |
import kotlin.coroutines.* | |
import kotlin.coroutines.intrinsics.* | |
import kotlin.jvm.* | |
// --------------- launch --------------- | |
/** | |
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job]. | |
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel]. | |
* | |
* The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument. | |
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. | |
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden | |
* with a corresponding [context] element. | |
* | |
* By default, the coroutine is immediately scheduled for execution. | |
* Other start options can be specified via `start` parameter. See [CoroutineStart] for details. | |
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, | |
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function | |
* and will be started implicitly on the first invocation of [join][Job.join]. | |
* | |
* Uncaught exceptions in this coroutine cancel the parent job in the context by default | |
* (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with | |
* the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine. | |
* | |
* See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine. | |
* | |
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. | |
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. | |
* @param block the coroutine code which will be invoked in the context of the provided scope. | |
**/ | |
public fun CoroutineScope.launch( | |
context: CoroutineContext = EmptyCoroutineContext, | |
start: CoroutineStart = CoroutineStart.DEFAULT, | |
block: suspend CoroutineScope.() -> Unit | |
): Job { | |
val newContext = newCoroutineContext(context) | |
val coroutine = if (start.isLazy) | |
LazyStandaloneCoroutine(newContext, block) else | |
StandaloneCoroutine(newContext, active = true) | |
coroutine.start(start, coroutine, block) | |
return coroutine | |
} | |
// --------------- async --------------- | |
/** | |
* Creates a coroutine and returns its future result as an implementation of [Deferred]. | |
* The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel]. | |
* The resulting coroutine has a key difference compared with similar primitives in other languages | |
* and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm. | |
* To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used. | |
* | |
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. | |
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. | |
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden | |
* with corresponding [context] element. | |
* | |
* By default, the coroutine is immediately scheduled for execution. | |
* Other options can be specified via `start` parameter. See [CoroutineStart] for details. | |
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, | |
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start] | |
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll]. | |
* | |
* @param block the coroutine code. | |
*/ | |
public fun <T> CoroutineScope.async( | |
context: CoroutineContext = EmptyCoroutineContext, | |
start: CoroutineStart = CoroutineStart.DEFAULT, | |
block: suspend CoroutineScope.() -> T | |
): Deferred<T> { | |
val newContext = newCoroutineContext(context) | |
val coroutine = if (start.isLazy) | |
LazyDeferredCoroutine(newContext, block) else | |
DeferredCoroutine<T>(newContext, active = true) | |
coroutine.start(start, coroutine, block) | |
return coroutine | |
} | |
@Suppress("UNCHECKED_CAST") | |
private open class DeferredCoroutine<T>( | |
parentContext: CoroutineContext, | |
active: Boolean | |
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> { | |
override fun getCompleted(): T = getCompletedInternal() as T | |
override suspend fun await(): T = awaitInternal() as T | |
override val onAwait: SelectClause1<T> get() = this | |
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) = | |
registerSelectClause1Internal(select, block) | |
} | |
private class LazyDeferredCoroutine<T>( | |
parentContext: CoroutineContext, | |
block: suspend CoroutineScope.() -> T | |
) : DeferredCoroutine<T>(parentContext, active = false) { | |
private val continuation = block.createCoroutineUnintercepted(this, this) | |
override fun onStart() { | |
continuation.startCoroutineCancellable(this) | |
} | |
} | |
// --------------- withContext --------------- | |
/** | |
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns | |
* the result. | |
* | |
* The resulting context for the [block] is derived by merging the current [coroutineContext] with the | |
* specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]). | |
* This suspending function is cancellable. It immediately checks for cancellation of | |
* the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive]. | |
* | |
* Calls to [withContext] whose [context] argument provides a [CoroutineDispatcher] that is | |
* different from the current one, by necessity, perform additional dispatches: the [block] | |
* can not be executed immediately and needs to be dispatched for execution on | |
* the passed [CoroutineDispatcher], and then when the [block] completes, the execution | |
* has to shift back to the original dispatcher. | |
* | |
* Note that the result of `withContext` invocation is dispatched into the original context in a cancellable way | |
* with a **prompt cancellation guarantee**, which means that if the original [coroutineContext] | |
* in which `withContext` was invoked is cancelled by the time its dispatcher starts to execute the code, | |
* it discards the result of `withContext` and throws [CancellationException]. | |
* | |
* The cancellation behaviour described above is enabled if and only if the dispatcher is being changed. | |
* For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and | |
* this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it. | |
*/ | |
public suspend fun <T> withContext( | |
context: CoroutineContext, | |
block: suspend CoroutineScope.() -> T | |
): T { | |
contract { | |
callsInPlace(block, InvocationKind.EXACTLY_ONCE) | |
} | |
return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> | |
// compute new context | |
val oldContext = uCont.context | |
// Copy CopyableThreadContextElement if necessary | |
val newContext = oldContext.newCoroutineContext(context) | |
// always check for cancellation of new context | |
newContext.ensureActive() | |
// FAST PATH #1 -- new context is the same as the old one | |
if (newContext === oldContext) { | |
val coroutine = ScopeCoroutine(newContext, uCont) | |
return@sc coroutine.startUndispatchedOrReturn(coroutine, block) | |
} | |
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) | |
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) | |
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { | |
val coroutine = UndispatchedCoroutine(newContext, uCont) | |
// There are changes in the context, so this thread needs to be updated | |
withCoroutineContext(newContext, null) { | |
return@sc coroutine.startUndispatchedOrReturn(coroutine, block) | |
} | |
} | |
// SLOW PATH -- use new dispatcher | |
val coroutine = DispatchedCoroutine(newContext, uCont) | |
block.startCoroutineCancellable(coroutine, coroutine) | |
coroutine.getResult() | |
} | |
} | |
/** | |
* Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it | |
* completes, and returns the result. | |
* | |
* This inline function calls [withContext]. | |
*/ | |
public suspend inline operator fun <T> CoroutineDispatcher.invoke( | |
noinline block: suspend CoroutineScope.() -> T | |
): T = withContext(this, block) | |
// --------------- implementation --------------- | |
private open class StandaloneCoroutine( | |
parentContext: CoroutineContext, | |
active: Boolean | |
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) { | |
override fun handleJobException(exception: Throwable): Boolean { | |
handleCoroutineException(context, exception) | |
return true | |
} | |
} | |
private class LazyStandaloneCoroutine( | |
parentContext: CoroutineContext, | |
block: suspend CoroutineScope.() -> Unit | |
) : StandaloneCoroutine(parentContext, active = false) { | |
private val continuation = block.createCoroutineUnintercepted(this, this) | |
override fun onStart() { | |
continuation.startCoroutineCancellable(this) | |
} | |
} | |
// Used by withContext when context changes, but dispatcher stays the same | |
internal expect class UndispatchedCoroutine<in T>( | |
context: CoroutineContext, | |
uCont: Continuation<T> | |
) : ScopeCoroutine<T> | |
private const val UNDECIDED = 0 | |
private const val SUSPENDED = 1 | |
private const val RESUMED = 2 | |
// Used by withContext when context dispatcher changes | |
internal class DispatchedCoroutine<in T>( | |
context: CoroutineContext, | |
uCont: Continuation<T> | |
) : ScopeCoroutine<T>(context, uCont) { | |
// this is copy-and-paste of a decision state machine inside AbstractionContinuation | |
// todo: we may some-how abstract it via inline class | |
private val _decision = atomic(UNDECIDED) | |
private fun trySuspend(): Boolean { | |
_decision.loop { decision -> | |
when (decision) { | |
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true | |
RESUMED -> return false | |
else -> error("Already suspended") | |
} | |
} | |
} | |
private fun tryResume(): Boolean { | |
_decision.loop { decision -> | |
when (decision) { | |
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true | |
SUSPENDED -> return false | |
else -> error("Already resumed") | |
} | |
} | |
} | |
override fun afterCompletion(state: Any?) { | |
// Call afterResume from afterCompletion and not vice-versa, because stack-size is more | |
// important for afterResume implementation | |
afterResume(state) | |
} | |
override fun afterResume(state: Any?) { | |
if (tryResume()) return // completed before getResult invocation -- bail out | |
// Resume in a cancellable way because we have to switch back to the original dispatcher | |
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) | |
} | |
fun getResult(): Any? { | |
if (trySuspend()) return COROUTINE_SUSPENDED | |
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state | |
val state = this.state.unboxState() | |
if (state is CompletedExceptionally) throw state.cause | |
@Suppress("UNCHECKED_CAST") | |
return state as T | |
} | |
} |