Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework reusability control in cancellable contination #2581

Merged
merged 6 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun completeResume(token: Any)

/**
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
* This function does nothing and is left only for binary compatibility with old compiled code.
* Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
* It's illegal to call this function in any non-`kotlinx.coroutines` code and
* such calls lead to undefined behaviour.
* Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
*
* @suppress **This is unstable API and it is subject to change.**
*/
Expand Down
115 changes: 70 additions & 45 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>(
*/
private val _state = atomic<Any?>(Active)

private val _parentHandle = atomic<DisposableHandle?>(null)
private var parentHandle: DisposableHandle?
get() = _parentHandle.value
set(value) { _parentHandle.value = value }
private var parentHandle: DisposableHandle? = null

internal val state: Any? get() = _state.value

Expand All @@ -93,7 +90,25 @@ internal open class CancellableContinuationImpl<in T>(
}

public override fun initCancellability() {
setupCancellation()
/*
* Invariant: at the moment of invocation, `this` has not yet
* leaked to user code and no one is able to invoke `resume` or `cancel`
* on it yet. Also, this function is not invoked for reusable continuations.
*/
val parent = context[Job] ?: return // fast path -- don't do anything without parent
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
// now check our state _after_ registering, could have completed while we were registering,
// but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
// so we are helping him and cleaning the node ourselves
if (isCompleted) {
// Can be invoked concurrently in 'parentCancelled', no problems here
handle.dispose()
parentHandle = NonDisposableHandle
}
}

private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
Expand All @@ -118,40 +133,6 @@ internal open class CancellableContinuationImpl<in T>(
return true
}

/**
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
* It is only invoked from an internal [getResult] function for reusable continuations
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
*/
private fun setupCancellation() {
if (checkCompleted()) return
if (parentHandle !== null) return // fast path 2 -- was already initialized
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
// now check our state _after_ registering (could have completed while we were registering)
// Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
if (isCompleted && !isReusable()) {
handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}

private fun checkCompleted(): Boolean {
val completed = isCompleted
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
if (!completed) {
// Note: this cancel may fail if one more concurrent cancel is currently being invoked
cancel(cause)
}
return true
}

public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame

Expand Down Expand Up @@ -216,7 +197,7 @@ internal open class CancellableContinuationImpl<in T>(

private inline fun callCancelHandlerSafely(block: () -> Unit) {
try {
block()
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -276,8 +257,24 @@ internal open class CancellableContinuationImpl<in T>(

@PublishedApi
internal fun getResult(): Any? {
setupCancellation()
if (trySuspend()) return COROUTINE_SUSPENDED
val isReusable = isReusable()
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
// or we got async cancellation from parent.
if (trySuspend()) {
/*
* We were neither resumed nor cancelled, time to suspend.
* But first we have to install parent cancellation handle (if we didn't yet),
* so CC could be properly resumed on parent cancellation.
*/
if (parentHandle == null) {
installParentHandleReusable()
} else if (isReusable) {
releaseClaimedReusableContinuation()
}
return COROUTINE_SUSPENDED
} else if (isReusable) {
releaseClaimedReusableContinuation()
}
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
Expand All @@ -296,6 +293,31 @@ internal open class CancellableContinuationImpl<in T>(
return getSuccessfulResult(state)
}

private fun installParentHandleReusable() {
val parent = context[Job] ?: return // don't do anything without parent or if completed
// Install the handle
parentHandle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(this).asHandler
)
/*
* Finally release the continuation after installing the handle. If we were successful, then
* do nothing, it's ok to reuse the instance now.
* Otherwise, dispose the handle by ourselves.
*/
releaseClaimedReusableContinuation()
}

private fun releaseClaimedReusableContinuation() {
// Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
parentHandle?.let {
it.dispose()
parentHandle = NonDisposableHandle
}
cancel(cancellationCause)
}

override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)

Expand Down Expand Up @@ -462,12 +484,15 @@ internal open class CancellableContinuationImpl<in T>(

/**
* Detaches from the parent.
* Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
* * Used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
* * Used from [parentCancelled] iff [isReusable] is `false`
*/
internal fun detachChild() {
val handle = parentHandle
handle?.dispose()
parentHandle = NonDisposableHandle
if (handle != null) {
handle.dispose()
parentHandle = NonDisposableHandle
}
}

// Note: Always returns RESUME_TOKEN | null
Expand Down
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher :

@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
/*
* Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
* any ClassCastException can only indicate compiler bug
*/
val dispatched = continuation as DispatchedContinuation<*>
dispatched.release()
}

/**
Expand Down
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
delegate: Continuation<T>,
private val job: JobSupport
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {

init {
initCancellability()
}

override fun getContinuationCancellationCause(parent: Job): Throwable {
val state = job.state
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>(
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
* [CancellableContinuationImpl.getResult] will check for cancellation later.
*
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
* AbstractChannel.receive method relies on the fact that the following pattern
* [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
* In the `getResult`, we have the following code:
* ```
* suspendCancellableCoroutineReusable { cont ->
* val result = pollFastPath()
* if (result != null) cont.resume(result)
* if (trySuspend()) {
* // <- at this moment current continuation can be redispatched and claimed again.
* attachChildToParent()
* releaseClaimedContinuation()
* }
* ```
* always succeeds.
* To make it always successful, we actually postpone "reusable" cancellation
* to this phase and set cancellation only at the moment of instantiation.
*/
private val _reusableCancellableContinuation = atomic<Any?>(null)

Expand All @@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>(
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
/*
* Reusability control:
* `null` -> no reusability at all, false
* `null` -> no reusability at all, `false`
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
* Else, if result is CCI === requester.
* Else, if result is CCI === requester, then it's our reusable continuation
* Identity check my fail for the following pattern:
* ```
* loop:
Expand All @@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>(
return true
}


/**
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
* stop mutating cached instance
*/
public fun awaitReusability() {
_reusableCancellableContinuation.loop { it ->
if (it !== REUSABLE_CLAIMED) return
}
}

public fun release() {
/*
* Called from `releaseInterceptedContinuation`, can be concurrent with
* the code in `getResult` right after `trySuspend` returned `true`, so we have
* to wait for a release here.
*/
awaitReusability()
reusableCancellableContinuation?.detachChild()
}

/**
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
* so all cancellations will be postponed.
Expand All @@ -103,11 +122,16 @@ internal class DispatchedContinuation<in T>(
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
return null
}
// potentially competing with cancel
state is CancellableContinuationImpl<*> -> {
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
return state as CancellableContinuationImpl<T>
}
}
state === REUSABLE_CLAIMED -> {
// Do nothing, wait until reusable instance will be returned from
// getResult() of a previous `suspendCancellableCoroutineReusable`
}
else -> error("Inconsistent state $state")
}
}
Expand All @@ -127,14 +151,13 @@ internal class DispatchedContinuation<in T>(
*
* See [CancellableContinuationImpl.getResult].
*/
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
_reusableCancellableContinuation.loop { state ->
// not when(state) to avoid Intrinsics.equals call
when {
state === REUSABLE_CLAIMED -> {
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
}
state === null -> return null
state is Throwable -> {
require(_reusableCancellableContinuation.compareAndSet(state, null))
return state
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
private fun keepMe(a: ByteArray) {
// does nothing, makes sure the variable is kept in state-machine
}
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/FieldWalker.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -56,7 +56,7 @@ object FieldWalker {
* Reflectively starts to walk through object graph and map to all the reached object to their path
* in from root. Use [showPath] do display a path if needed.
*/
private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
val visited = IdentityHashMap<Any, Ref>()
if (root == null) return visited
visited[root] = Ref.RootRef
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*

class ReusableCancellableContinuationLeakStressTest : TestBase() {

@Suppress("UnnecessaryVariable")
private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
return r
}

private val iterations = 100_000 * stressTestMultiplier

class Leak(val i: Int)

@Test // Simplified version of #2564
fun testReusableContinuationLeak() = runTest {
val channel = produce(capacity = 1) { // from the main thread
(0 until iterations).forEach {
send(Leak(it))
}
}

launch(Dispatchers.Default) {
repeat (iterations) {
val value = channel.receiveBatch()
assertEquals(it, value.i)
}
(channel as Job).join()

FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
}
}
}