Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework reusability control in cancellable contination #2581

Merged
merged 6 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun completeResume(token: Any)

/**
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
* This function does nothing and is left only for binary compatibility with old compiled code.
* Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
* It's illegal to call this function in any non-`kotlinx.coroutines` code and
* such calls lead to undefined behaviour.
* Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
*
* @suppress **This is unstable API and it is subject to change.**
*/
Expand Down Expand Up @@ -332,7 +334,7 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
}
/*
* Attempt to claim reusable instance.
Expand Down
116 changes: 70 additions & 46 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>(
*/
private val _state = atomic<Any?>(Active)

private val _parentHandle = atomic<DisposableHandle?>(null)
private var parentHandle: DisposableHandle?
get() = _parentHandle.value
set(value) { _parentHandle.value = value }
private var parentHandle: DisposableHandle? = null

internal val state: Any? get() = _state.value

Expand All @@ -93,7 +90,21 @@ internal open class CancellableContinuationImpl<in T>(
}

public override fun initCancellability() {
setupCancellation()
/*
* Invariant: at the moment of invocation, `this` has not yet
* leaked to user code and no one is able to invoke `resume` or `cancel`
* on it yet. Also, this function is not invoked for reusable continuations.
*/
val handle = installParentHandle()
?: return // fast path -- don't do anything without parent
// now check our state _after_ registering, could have completed while we were registering,
// but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
// so we are helping it and cleaning the node ourselves
if (isCompleted) {
// Can be invoked concurrently in 'parentCancelled', no problems here
handle.dispose()
parentHandle = NonDisposableHandle
}
}

private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
Expand All @@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl<in T>(
return true
}

/**
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
* It is only invoked from an internal [getResult] function for reusable continuations
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
*/
private fun setupCancellation() {
if (checkCompleted()) return
if (parentHandle !== null) return // fast path 2 -- was already initialized
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
// now check our state _after_ registering (could have completed while we were registering)
// Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
if (isCompleted && !isReusable()) {
handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}

private fun checkCompleted(): Boolean {
val completed = isCompleted
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
if (!completed) {
// Note: this cancel may fail if one more concurrent cancel is currently being invoked
cancel(cause)
}
return true
}

public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame

Expand Down Expand Up @@ -188,7 +165,9 @@ internal open class CancellableContinuationImpl<in T>(
*/
private fun cancelLater(cause: Throwable): Boolean {
if (!resumeMode.isReusableMode) return false
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
// Ensure that we are postponing cancellation to the right instance
if (!isReusable()) return false
val dispatched = delegate as DispatchedContinuation<*>
return dispatched.postponeCancellation(cause)
}

Expand Down Expand Up @@ -216,7 +195,7 @@ internal open class CancellableContinuationImpl<in T>(

private inline fun callCancelHandlerSafely(block: () -> Unit) {
try {
block()
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -276,9 +255,33 @@ internal open class CancellableContinuationImpl<in T>(

@PublishedApi
internal fun getResult(): Any? {
setupCancellation()
if (trySuspend()) return COROUTINE_SUSPENDED
val isReusable = isReusable()
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
// or we got async cancellation from parent.
if (trySuspend()) {
/*
* We were neither resumed nor cancelled, time to suspend.
* But first we have to install parent cancellation handle (if we didn't yet),
* so CC could be properly resumed on parent cancellation.
*/
if (parentHandle == null) {
installParentHandle()
}
/*
* Release the continuation after installing the handle (if needed).
* If we were successful, then do nothing, it's ok to reuse the instance now.
* Otherwise, dispose the handle by ourselves.
*/
if (isReusable) {
releaseClaimedReusableContinuation()
}
return COROUTINE_SUSPENDED
}
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
if (isReusable) {
// release claimed reusable continuation for the future reuse
releaseClaimedReusableContinuation()
}
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
Expand All @@ -296,6 +299,28 @@ internal open class CancellableContinuationImpl<in T>(
return getSuccessfulResult(state)
}

private fun installParentHandle(): DisposableHandle? {
val parent = context[Job] ?: return null // don't do anything without a parent
// Install the handle
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
return handle
}

/**
* Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
* in which case it detaches from the parent and cancels this continuation.
*/
private fun releaseClaimedReusableContinuation() {
// Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
detachChild()
cancel(cancellationCause)
}

override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)

Expand Down Expand Up @@ -462,11 +487,10 @@ internal open class CancellableContinuationImpl<in T>(

/**
* Detaches from the parent.
* Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
*/
internal fun detachChild() {
val handle = parentHandle
handle?.dispose()
val handle = parentHandle ?: return
handle.dispose()
parentHandle = NonDisposableHandle
}

Expand Down
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher :

@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
/*
* Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
* any ClassCastException can only indicate compiler bug
*/
val dispatched = continuation as DispatchedContinuation<*>
dispatched.release()
}

/**
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
// we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
cont.initCancellability()
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
cont.getResult()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>(
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
* [CancellableContinuationImpl.getResult] will check for cancellation later.
*
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
* AbstractChannel.receive method relies on the fact that the following pattern
* [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
* In the `getResult`, we have the following code:
* ```
* suspendCancellableCoroutineReusable { cont ->
* val result = pollFastPath()
* if (result != null) cont.resume(result)
* if (trySuspend()) {
* // <- at this moment current continuation can be redispatched and claimed again.
* attachChildToParent()
* releaseClaimedContinuation()
* }
* ```
* always succeeds.
* To make it always successful, we actually postpone "reusable" cancellation
* to this phase and set cancellation only at the moment of instantiation.
*/
private val _reusableCancellableContinuation = atomic<Any?>(null)

Expand All @@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>(
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
/*
* Reusability control:
* `null` -> no reusability at all, false
* `null` -> no reusability at all, `false`
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
* Else, if result is CCI === requester.
* Else, if result is CCI === requester, then it's our reusable continuation
* Identity check my fail for the following pattern:
* ```
* loop:
Expand All @@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>(
return true
}


/**
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
* stop mutating cached instance
*/
public fun awaitReusability() {
_reusableCancellableContinuation.loop { it ->
if (it !== REUSABLE_CLAIMED) return
}
}

public fun release() {
/*
* Called from `releaseInterceptedContinuation`, can be concurrent with
* the code in `getResult` right after `trySuspend` returned `true`, so we have
* to wait for a release here.
*/
awaitReusability()
reusableCancellableContinuation?.detachChild()
}

/**
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
* so all cancellations will be postponed.
Expand All @@ -103,11 +122,20 @@ internal class DispatchedContinuation<in T>(
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
return null
}
// potentially competing with cancel
state is CancellableContinuationImpl<*> -> {
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
return state as CancellableContinuationImpl<T>
}
}
state === REUSABLE_CLAIMED -> {
// Do nothing, wait until reusable instance will be returned from
// getResult() of a previous `suspendCancellableCoroutineReusable`
}
state is Throwable -> {
// Also do nothing, Throwable can only indicate that the CC
// is in REUSABLE_CLAIMED state, but with postponed cancellation
}
else -> error("Inconsistent state $state")
}
}
Expand All @@ -127,14 +155,13 @@ internal class DispatchedContinuation<in T>(
*
* See [CancellableContinuationImpl.getResult].
*/
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
_reusableCancellableContinuation.loop { state ->
// not when(state) to avoid Intrinsics.equals call
when {
state === REUSABLE_CLAIMED -> {
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
}
state === null -> return null
state is Throwable -> {
require(_reusableCancellableContinuation.compareAndSet(state, null))
return state
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
private fun keepMe(a: ByteArray) {
// does nothing, makes sure the variable is kept in state-machine
}
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/FieldWalker.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -56,7 +56,7 @@ object FieldWalker {
* Reflectively starts to walk through object graph and map to all the reached object to their path
* in from root. Use [showPath] do display a path if needed.
*/
private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
val visited = IdentityHashMap<Any, Ref>()
if (root == null) return visited
visited[root] = Ref.RootRef
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*

class ReusableCancellableContinuationLeakStressTest : TestBase() {

@Suppress("UnnecessaryVariable")
private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
return r
}

private val iterations = 100_000 * stressTestMultiplier

class Leak(val i: Int)

@Test // Simplified version of #2564
fun testReusableContinuationLeak() = runTest {
val channel = produce(capacity = 1) { // from the main thread
(0 until iterations).forEach {
send(Leak(it))
}
}

launch(Dispatchers.Default) {
repeat (iterations) {
val value = channel.receiveBatch()
assertEquals(it, value.i)
}
(channel as Job).join()

FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
}
}
}