Skip to content

Commit

Permalink
Remove atomic cancellation support
Browse files Browse the repository at this point in the history
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
  • Loading branch information
elizarov committed Apr 22, 2020
1 parent ee3776a commit 3144a2a
Show file tree
Hide file tree
Showing 31 changed files with 492 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
71 changes: 27 additions & 44 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public interface CancellableContinuation<in T> : Continuation<T> {
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any? = null): Any?

/**
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
* guaranteed can be provided by having a cancellation fallback.
*/
@InternalCoroutinesApi
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any?

/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
Expand Down Expand Up @@ -118,8 +126,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun cancel(cause: Throwable? = null): Boolean

/**
* Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
* When the continuation is already cancelled, the handler will be immediately invoked
* Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
* When the continuation is already cancelled, the handler is immediately invoked
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
* continuation is cancelled.
*
Expand All @@ -128,7 +136,12 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* At most one [handler] can be installed on a continuation.
* At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
* time produces [IllegalStateException].
*
* This handler is also called when this continuation [resumes][resume] normally (with a value) and then
* is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
* the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
*
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This `handler` can be invoked concurrently with the surrounding code.
Expand Down Expand Up @@ -204,40 +217,24 @@ public suspend inline fun <T> suspendCancellableCoroutine(
}

/**
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
*
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
block(cancellable)
cancellable.getResult()
}

/**
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
* [CancellableContinuationImpl] is reused.
*/
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode = MODE_CANCELLABLE_REUSABLE)
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
internal fun <T> getOrCreateCancellableContinuation(
delegate: Continuation<T>, resumeMode: Int
): CancellableContinuationImpl<T> {
assert { resumeMode.isReusableMode }
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
return CancellableContinuationImpl(delegate, resumeMode)
}
/*
* Attempt to claim reusable instance.
Expand All @@ -253,24 +250,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
* thus leaking CC instance for indefinite time.
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
*/
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
?: return CancellableContinuationImpl(delegate, resumeMode)
}

/**
* @suppress **Deprecated**
*/
@Deprecated(
message = "holdCancellability parameter is deprecated and is no longer used",
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
)
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendAtomicCancellableCoroutine(block)

/**
* Removes the specified [node] on cancellation.
*/
Expand Down
Loading

0 comments on commit 3144a2a

Please sign in to comment.