Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement optional thread interrupt on coroutine cancellation (#57) #1922

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected fun afterResume (Ljava/lang/Object;)V
protected fun cancellationExceptionMessage ()Ljava/lang/String;
public final fun completeCoroutine (Ljava/lang/Object;)Ljava/lang/Object;
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun isActive ()Z
Expand Down Expand Up @@ -187,6 +188,30 @@ public final class kotlinx/coroutines/CoroutineExceptionHandlerKt {
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
}

public abstract class kotlinx/coroutines/CoroutineInterruptController : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/CoroutineInterruptController$Key;
public fun <init> ()V
public abstract fun updateCoroutineCompleteState (Ljava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CoroutineInterruptController$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/CoroutineInterruptible : kotlinx/coroutines/CoroutineInterruptController, kotlinx/coroutines/ThreadContextElement {
public static final field INSTANCE Lkotlinx/coroutines/CoroutineInterruptible;
public synthetic fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)V
public fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineInterruptible$ThreadState;)V
public fun updateCoroutineCompleteState (Ljava/lang/Object;)Ljava/lang/Object;
public synthetic fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object;
public fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineInterruptible$ThreadState;
}

public final class kotlinx/coroutines/CoroutineInterruptible$ThreadState {
public fun <init> ()V
public final fun clearInterrupt ()V
public final fun initInterrupt (Lkotlinx/coroutines/Job;)V
}

public final class kotlinx/coroutines/CoroutineName : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/CoroutineName$Key;
public fun <init> (Ljava/lang/String;)V
Expand Down
13 changes: 12 additions & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,22 @@ public abstract class AbstractCoroutine<in T>(
onCompleted(state as T)
}

/**
* Completes execution of this coroutine with the specified state.
*/
public fun completeCoroutine(state: Any?): Any? {
var completeState = state
context[CoroutineInterruptController]?.let {
completeState = it.updateCoroutineCompleteState(completeState)
}
return makeCompletingOnce(completeState)
}

/**
* Completes execution of this with coroutine with the specified result.
*/
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
val state = completeCoroutine(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
Expand Down
55 changes: 55 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineInterruptController.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext

/**
* This [CoroutineContext] element makes a coroutine interruptible.
*
* With this element, the thread executing the coroutine is interrupted when the coroutine is canceled, making
* blocking procedures stop. Exceptions that indicate an interrupted procedure, eg., InterruptedException on JVM
* are transformed into [CancellationException] at the end of the coroutine. Thus, everything else goes as if this
* element is not present. In particular, the parent coroutine won't be canceled by those exceptions.
*
* This is an abstract element and will be implemented by each individual platform (or won't be implemented).
* The JVM implementation is named CoroutineInterruptible.
*
* Example:
* ```
* GlobalScope.launch(Dispatchers.IO + CoroutineInterruptible) {
* async {
* // This block will throw [CancellationException] instead of an exception indicating
* // interruption, such as InterruptedException on JVM.
* withContext(CoroutineName) {
* doSomethingUseful()
*
* // This blocking procedure will be interrupted when this coroutine is canceled
* // by Exception thrown by the below async block.
* doSomethingElseUsefulInterruptible()
* }
* }
*
* async {
* delay(500L)
* throw Exception()
* }
* }
* ```
*/
abstract class CoroutineInterruptController : AbstractCoroutineContextElement(Key) {
/**
* Key for [CoroutineInterruptController] instance in the coroutine context.
*/
@InternalCoroutinesApi
companion object Key : CoroutineContext.Key<CoroutineInterruptController>

/**
* Update the complete state of a coroutine, mainly for exception transformation.
*/
@InternalCoroutinesApi
abstract fun updateCoroutineCompleteState(completeState: Any?): Any?
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
* not a timeout exception.
*/
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
val state = makeCompletingOnce(result)
val state = completeCoroutine(result)
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
return if (state is CompletedExceptionally) { // (3)
when {
Expand Down
152 changes: 152 additions & 0 deletions kotlinx-coroutines-core/jvm/src/CoroutineInterruptible.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlin.coroutines.CoroutineContext

/**
* This is the [CoroutineInterruptController] implementation on JVM. See [CoroutineInterruptController] for detailed
* description and examples.
*/
object CoroutineInterruptible :
CoroutineInterruptController(), ThreadContextElement<CoroutineInterruptible.ThreadState?> {

/**
* Update the complete state of a coroutine on JVM.
* Transforms [InterruptedException] into [CancellationException] for coroutines with this context element.
*/
@InternalCoroutinesApi
override fun updateCoroutineCompleteState(completeState: Any?): Any? =
if (completeState is CompletedExceptionally && completeState.cause is InterruptedException)
CompletedExceptionally(CancellationException())
else
completeState

/**
* Updates context of the current thread.
* This function is invoked before the coroutine in the specified [context] is resumed in the current thread.
* Prepares interruption for this execution, watching the [Job] for cancellation and interrupt this executing
* thread on cancellation.
*/
@InternalCoroutinesApi
override fun updateThreadContext(context: CoroutineContext): ThreadState? {
// Fast path: no Job in this context
val job = context[Job] ?: return null
// Slow path
val threadState = ThreadState()
threadState.initInterrupt(job)
return threadState
}

/**
* Restores context of the current thread.
* This function is invoked after the coroutine in the specified [context] is suspended in the current thread.
* Stops watching the [Job] for cancellation and do clean-up work.
*/
@InternalCoroutinesApi
override fun restoreThreadContext(context: CoroutineContext, oldState: ThreadState?) {
// Fast path: no Job in this context
val threadState = oldState ?: return
// Slow path
threadState.clearInterrupt()
}

/**
* Holds the state of executions for interruption.
*/
@InternalCoroutinesApi
class ThreadState {
fun initInterrupt(job: Job) {
initInvokeOnCancel(job)
initThread()
}

fun clearInterrupt() {
state.loop { s ->
when {
s is Working -> {
if (state.compareAndSet(s, Finish)) {
s.cancelHandle?.let { it.dispose() } // no more watching
return
}
}
s === Interrupting -> Thread.yield() // eases the thread
s === Interrupted -> { Thread.interrupted(); return } // no interrupt leak
s === Init || s === Finish -> throw IllegalStateException("impossible state")
else -> throw IllegalStateException("unknown state")
}
}
}

private fun initInvokeOnCancel(job: Job) {
// watches the job for cancellation
val cancelHandle =
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = CancelHandler())
// remembers the cancel handle or drops it
state.loop { s ->
when {
s === Init -> if (state.compareAndSet(s, Working(null, cancelHandle))) return
s is Working -> if (state.compareAndSet(s, Working(s.thread, cancelHandle))) return
s === Finish -> { cancelHandle.dispose(); return } // no more watching needed
s === Interrupting || s === Interrupted -> return
else -> throw IllegalStateException("unknown state")
}
}
}

private fun initThread() {
val thread = Thread.currentThread()
state.loop { s ->
when {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps when (s) {?

s === Init -> if (state.compareAndSet(s, Working(thread, null))) return
s is Working -> if (state.compareAndSet(s, Working(thread, s.cancelHandle))) return
s === Interrupted -> { thread.interrupt(); return } // interrupted before the thread is set
s === Finish || s === Interrupting -> throw IllegalStateException("impossible state")
else -> throw IllegalStateException("unknown state")
}
}
}

private inner class CancelHandler : CompletionHandler {
override fun invoke(cause: Throwable?) {
state.loop { s ->
when {
s === Init || (s is Working && s.thread === null) -> {
Copy link

@taralx taralx Apr 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you swap initThread and initInvokeOnCancel then s.thread === null can't happen, right?

Copy link
Contributor Author

@jxdabc jxdabc Apr 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a great idea, I will check it

if (state.compareAndSet(s, Interrupted))
return
}
s is Working -> {
if (state.compareAndSet(s, Interrupting)) {
s.thread!!.interrupt()
state.value = Interrupted
return
}
}
s === Finish -> return
s === Interrupting || s === Interrupted -> return
else -> throw IllegalStateException("unknown state")
}
}
}
}

private val state: AtomicRef<State> = atomic(Init)

private interface State
// initial state
private object Init : State
// cancellation watching is setup and/or the continuation is running
private data class Working(val thread: Thread?, val cancelHandle: DisposableHandle?) : State
// the continuation done running without interruption
private object Finish : State
// interrupting this thread
private object Interrupting: State
// done interrupting
private object Interrupted: State
}
}
Loading