Skip to content

Commit

Permalink
Structured concurrency for Completable/Listenable futures
Browse files Browse the repository at this point in the history
BREAKING BEHAVIOR CHANGE:

* kotlinx-coroutines-jdk8 and -guava integration modules
  future { ... } builders now honor structured concurrency in
  the same way as all other builders -- a failure inside the child
  (builder) code now cancels parent coroutine.
  Note that is does not affect non-structured (typical) usage like
  GlobalScope.future { ... }

MINOR BEHAVIOR CHANGE:

* Exception in installed CancellableCoroutine.invokeOnCancellation
  handler does not cancel the parent job, but is considered to be an
  uncaught exception, so it goes to CoroutineExceptionHandler.

Internal changes:

* JobSupport.cancelsParents=true is now a default, since there are
  only a fewer exceptions for builder that throw their exception from block
* JobSupport.handleJobException has additional "handled" parameter to
  distinguish cases when parent did/did-not handle it.
* handleCoroutineException logic is updated. It never cancels parent,
  since parent cancellation is taken care of by structured concurrency.
* handleCoroutineException is always invoked with current coroutine's
  context (as opposed to parent)

Fixes #1007
  • Loading branch information
elizarov committed Feb 25, 2019
1 parent 4451d72 commit 092480c
Show file tree
Hide file tree
Showing 27 changed files with 162 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ public final class kotlinx/coroutines/CoroutineExceptionHandler$Key : kotlin/cor

public final class kotlinx/coroutines/CoroutineExceptionHandlerKt {
public static final fun CoroutineExceptionHandler (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/CoroutineExceptionHandler;
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;Lkotlinx/coroutines/Job;)V
public static synthetic fun handleCoroutineException$default (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;Lkotlinx/coroutines/Job;ILjava/lang/Object;)V
public static final fun handleExceptionViaHandler (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
}

public final class kotlinx/coroutines/CoroutineName : kotlin/coroutines/AbstractCoroutineContextElement {
Expand Down Expand Up @@ -371,7 +369,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
protected fun getHandlesException ()Z
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
protected fun handleJobException (Ljava/lang/Throwable;)V
protected fun handleJobException (Ljava/lang/Throwable;Z)V
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
public fun isActive ()Z
Expand Down
12 changes: 6 additions & 6 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ public fun <T> CoroutineScope.future(

private class ListenableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: SettableFuture<T>
private val future: SettableFuture<T>
) : AbstractCoroutine<T>(context), FutureCallback<T> {

/*
* We register coroutine as callback to the future this coroutine completes.
* But when future is cancelled externally, we'd like to cancel coroutine,
Expand All @@ -66,12 +65,13 @@ private class ListenableFutureCoroutine<T>(
}

override fun onCompleted(value: T) {
completion.set(value)
future.set(value)
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!completion.setException(exception)) {
handleCoroutineException(parentContext, exception, this)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!future.setException(exception) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
handleCoroutineException(context, exception)
}
}
}
Expand Down
34 changes: 32 additions & 2 deletions integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,24 @@ class ListenableFutureTest : TestBase() {
}

@Test
fun testChildException() = runTest {
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}

@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}

result.checkFutureException<TestException>()
}

Expand Down Expand Up @@ -295,7 +306,26 @@ class ListenableFutureTest : TestBase() {
throw TestException()
}
}
result.cancel(true)
finish(3)
}

@Test
fun testUnhandledExceptionOnExternalCancellation() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
}
}
result.cancel(true)
finish(3)
}
Expand Down
12 changes: 6 additions & 6 deletions integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ public fun <T> CoroutineScope.future(

private class CompletableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: CompletableFuture<T>
private val future: CompletableFuture<T>
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {

override fun accept(value: T?, exception: Throwable?) {
cancel()
}

override fun onCompleted(value: T) {
completion.complete(value)
future.complete(value)
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!completion.completeExceptionally(exception)) {
handleCoroutineException(parentContext, exception, this)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!future.completeExceptionally(exception) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
handleCoroutineException(context, exception)
}
}
}
Expand Down
47 changes: 39 additions & 8 deletions integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,39 @@ class FutureTest : TestBase() {
}

@Test
fun testChildException() = runTest {
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}

@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}

result.checkFutureException<TestException>()
}

@Test
fun testExceptionAggregation() = runTest {
fun testExceptionAggregation() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
throw TestException()
}

expect(1)
result.checkFutureException<TestException>(TestException1::class, TestException2::class)
yield()
finish(2) // we are not cancelled
finish(1)
}

@Test
Expand All @@ -409,7 +419,9 @@ class FutureTest : TestBase() {
}

@Test
fun testExceptionOnExternalCompletion() = runTest(expected = {it is TestException}) {
fun testExceptionOnExternalCompletion() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
expect(1)
val result = future(Dispatchers.Unconfined) {
try {
Expand All @@ -419,7 +431,26 @@ class FutureTest : TestBase() {
throw TestException()
}
}
result.complete(Unit)
finish(3)
}

@Test
fun testUnhandledExceptionOnExternalCompletion() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
}
}
result.complete(Unit)
finish(3)
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public abstract class AbstractCoroutine<in T>(
}

internal final override fun handleOnCompletionException(exception: Throwable) {
handleCoroutineException(parentContext, exception, this)
handleCoroutineException(context, exception)
}

internal override fun nameString(): String {
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
override val cancelsParent: Boolean get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
Expand Down Expand Up @@ -169,8 +168,9 @@ private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override val cancelsParent: Boolean get() = true
override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!handled) handleCoroutineException(context, exception)
}
}

private class LazyStandaloneCoroutine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ internal open class CancellableContinuationImpl<in T>(
try {
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
context,
CompletionHandlerException("Exception in cancellation handler for $this", ex)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private class CompletableDeferredImpl<T>(
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
override val cancelsParent: Boolean get() = true
override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
Expand Down
33 changes: 7 additions & 26 deletions kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,16 @@ import kotlin.coroutines.*
internal expect fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable)

/**
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
* Helper function for coroutine builder implementations to handle uncaught and unexpected exceptions in coroutines,
* that could not be otherwise handled in a normal way through structured concurrency, saving them to a future, and
* cannot be rethrown. This is a last resort handler to prevent lost exceptions.
*
* It tries to handle uncaught exception in the following way:
* If current exception is [CancellationException], it's ignored: [CancellationException] is a normal way to cancel
* coroutine.
*
* If there is a [Job] in the context and it's not a [caller], then [Job.cancel] is invoked.
* If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
* Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used. If it throws an exception during handling
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
* If there is [CoroutineExceptionHandler] in the context, then it is used. If it throws an exception during handling
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and
* [Thread.uncaughtExceptionHandler] are invoked.
*/
@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable, caller: Job? = null) {
// Ignore CancellationException (they are normal ways to terminate a coroutine)
if (exception is CancellationException) return // nothing to do
// Try propagate exception to parent
val job = context[Job]
@Suppress("DEPRECATION")
if (job !== null && job !== caller && job.cancel(exception)) return // handle by parent
// otherwise -- use exception handlers
handleExceptionViaHandler(context, exception)
}

/**
* @suppress This is an internal API and it is subject to change.
*/
@InternalCoroutinesApi
public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let {
Expand All @@ -47,7 +29,6 @@ public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throw
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}

// If handler is not present in the context or exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}
Expand Down
19 changes: 12 additions & 7 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// cancelled job final state
else -> CompletedExceptionally(finalException)
}
// Now handle exception if parent can't handle it
if (finalException != null && !cancelParent(finalException)) {
handleJobException(finalException)
// Now handle the final exception
if (finalException != null) {
val handledByParent = cancelParent(finalException)
handleJobException(finalException, handledByParent)
}
// Then CAS to completed state -> it must succeed
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
Expand Down Expand Up @@ -891,24 +892,29 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open val cancelsParent: Boolean get() = false
protected open val cancelsParent: Boolean get() = true

/**
* Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
* into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
* handle its exceptions is [JobImpl].
* handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open val handlesException: Boolean get() = true

/**
* Handles the final job [exception] after it was reported to the by the parent,
* where [handled] is `true` when parent had already handled exception and `false` otherwise.
*
* This method is invoked **exactly once** when the final exception of the job is determined
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
*
* Note, [handled] is always `true` when [exception] is [CancellationException].
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open fun handleJobException(exception: Throwable) {}
protected open fun handleJobException(exception: Throwable, handled: Boolean) {}

private fun cancelParent(cause: Throwable): Boolean {
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
Expand Down Expand Up @@ -1184,7 +1190,6 @@ private class Empty(override val isActive: Boolean) : Incomplete {

internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
init { initParentJobInternal(parent) }
override val cancelsParent: Boolean get() = true
override val onCancelComplete get() = true
override val handlesException: Boolean get() = false
override fun complete() = makeCompleting(Unit)
Expand Down
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ private open class TimeoutCoroutine<U, in T: U>(
override val defaultResumeMode: Int get() = MODE_DIRECT
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)?.callerFrame
override fun getStackTraceElement(): StackTraceElement? = (uCont as? CoroutineStackFrame)?.getStackTraceElement()

override val cancelsParent: Boolean
get() = false // it throws exception to parent instead of cancelling it

@Suppress("LeakingThis", "Deprecation")
override fun run() {
cancel(TimeoutCancellationException(time, this))
Expand Down
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ private open class BroadcastCoroutine<E>(
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
override val cancelsParent: Boolean get() = true
override val isActive: Boolean get() = super.isActive

override val channel: SendChannel<E>
Expand All @@ -106,7 +105,7 @@ private open class BroadcastCoroutine<E>(
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
val cause = (state as? CompletedExceptionally)?.cause
val processed = _channel.close(cause)
if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
if (cause != null && !processed && suppressed) handleCoroutineException(context, cause)
}
}

Expand Down
Loading

0 comments on commit 092480c

Please sign in to comment.