Skip to content

Commit

Permalink
Structured concurrency for Completable/Listenable futures
Browse files Browse the repository at this point in the history
BREAKING BEHAVIOR CHANGE:

* kotlinx-coroutines-jdk8 and -guava integration modules
  future { ... } builders now honor structured concurrency in
  the same way as all other builders -- a failure inside the child
  (builder) code now cancels parent coroutine.
  Note that is does not affect non-structured (typical) usage like
  GlobalScope.future { ... }

MINOR BEHAVIOR CHANGE:

* Exception in installed CancellableCoroutine.invokeOnCancellation
  handler does not cancel the parent job, but is considered to be an
  uncaught exception, so it goes to CoroutineExceptionHandler.

Internal changes:

* JobSupport.cancelsParents=true is now a default, since there are
  only a fewer exceptions for builder that throw their exception from block
* JobSupport.handleJobException has additional "handled" parameter to
  distinguish cases when parent did/did-not handle it.
* handleCoroutineException logic is updated. It never cancels parent,
  since parent cancellation is taken care of by structured concurrency.
* handleCoroutineException is always invoked with current coroutine's
  context (as opposed to parent)

Fixes #1007
  • Loading branch information
elizarov committed Feb 25, 2019
1 parent 4451d72 commit 91083a9
Show file tree
Hide file tree
Showing 26 changed files with 160 additions and 96 deletions.
12 changes: 6 additions & 6 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ public fun <T> CoroutineScope.future(

private class ListenableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: SettableFuture<T>
private val future: SettableFuture<T>
) : AbstractCoroutine<T>(context), FutureCallback<T> {

/*
* We register coroutine as callback to the future this coroutine completes.
* But when future is cancelled externally, we'd like to cancel coroutine,
Expand All @@ -66,12 +65,13 @@ private class ListenableFutureCoroutine<T>(
}

override fun onCompleted(value: T) {
completion.set(value)
future.set(value)
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!completion.setException(exception)) {
handleCoroutineException(parentContext, exception, this)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!future.setException(exception) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
handleCoroutineException(context, exception)
}
}
}
Expand Down
34 changes: 32 additions & 2 deletions integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,24 @@ class ListenableFutureTest : TestBase() {
}

@Test
fun testChildException() = runTest {
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}

@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}

result.checkFutureException<TestException>()
}

Expand Down Expand Up @@ -295,7 +306,26 @@ class ListenableFutureTest : TestBase() {
throw TestException()
}
}
result.cancel(true)
finish(3)
}

@Test
fun testUnhandledExceptionOnExternalCancellation() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
}
}
result.cancel(true)
finish(3)
}
Expand Down
12 changes: 6 additions & 6 deletions integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ public fun <T> CoroutineScope.future(

private class CompletableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: CompletableFuture<T>
private val future: CompletableFuture<T>
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {

override fun accept(value: T?, exception: Throwable?) {
cancel()
}

override fun onCompleted(value: T) {
completion.complete(value)
future.complete(value)
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!completion.completeExceptionally(exception)) {
handleCoroutineException(parentContext, exception, this)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!future.completeExceptionally(exception) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
handleCoroutineException(context, exception)
}
}
}
Expand Down
47 changes: 39 additions & 8 deletions integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,39 @@ class FutureTest : TestBase() {
}

@Test
fun testChildException() = runTest {
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}

@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}

result.checkFutureException<TestException>()
}

@Test
fun testExceptionAggregation() = runTest {
fun testExceptionAggregation() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
throw TestException()
}

expect(1)
result.checkFutureException<TestException>(TestException1::class, TestException2::class)
yield()
finish(2) // we are not cancelled
finish(1)
}

@Test
Expand All @@ -409,7 +419,9 @@ class FutureTest : TestBase() {
}

@Test
fun testExceptionOnExternalCompletion() = runTest(expected = {it is TestException}) {
fun testExceptionOnExternalCompletion() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
expect(1)
val result = future(Dispatchers.Unconfined) {
try {
Expand All @@ -419,7 +431,26 @@ class FutureTest : TestBase() {
throw TestException()
}
}
result.complete(Unit)
finish(3)
}

@Test
fun testUnhandledExceptionOnExternalCompletion() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
}
}
result.complete(Unit)
finish(3)
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public abstract class AbstractCoroutine<in T>(
}

internal final override fun handleOnCompletionException(exception: Throwable) {
handleCoroutineException(parentContext, exception, this)
handleCoroutineException(context, exception)
}

internal override fun nameString(): String {
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
override val cancelsParent: Boolean get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
Expand Down Expand Up @@ -169,8 +168,9 @@ private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override val cancelsParent: Boolean get() = true
override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!handled) handleCoroutineException(context, exception)
}
}

private class LazyStandaloneCoroutine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ internal open class CancellableContinuationImpl<in T>(
try {
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
context,
CompletionHandlerException("Exception in cancellation handler for $this", ex)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private class CompletableDeferredImpl<T>(
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
override val cancelsParent: Boolean get() = true
override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
Expand Down
33 changes: 7 additions & 26 deletions kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,16 @@ import kotlin.coroutines.*
internal expect fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable)

/**
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
* Helper function for coroutine builder implementations to handle uncaught and unexpected exceptions in coroutines,
* that could not be otherwise handled in a normal way through structured concurrency, saving them to a future, and
* cannot be rethrown. This is a last resort handler to prevent lost exceptions.
*
* It tries to handle uncaught exception in the following way:
* If current exception is [CancellationException], it's ignored: [CancellationException] is a normal way to cancel
* coroutine.
*
* If there is a [Job] in the context and it's not a [caller], then [Job.cancel] is invoked.
* If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
* Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used. If it throws an exception during handling
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
* If there is [CoroutineExceptionHandler] in the context, then it is used. If it throws an exception during handling
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and
* [Thread.uncaughtExceptionHandler] are invoked.
*/
@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable, caller: Job? = null) {
// Ignore CancellationException (they are normal ways to terminate a coroutine)
if (exception is CancellationException) return // nothing to do
// Try propagate exception to parent
val job = context[Job]
@Suppress("DEPRECATION")
if (job !== null && job !== caller && job.cancel(exception)) return // handle by parent
// otherwise -- use exception handlers
handleExceptionViaHandler(context, exception)
}

/**
* @suppress This is an internal API and it is subject to change.
*/
@InternalCoroutinesApi
public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let {
Expand All @@ -47,7 +29,6 @@ public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throw
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}

// If handler is not present in the context or exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}
Expand Down
19 changes: 12 additions & 7 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// cancelled job final state
else -> CompletedExceptionally(finalException)
}
// Now handle exception if parent can't handle it
if (finalException != null && !cancelParent(finalException)) {
handleJobException(finalException)
// Now handle the final exception
if (finalException != null) {
val handledByParent = cancelParent(finalException)
handleJobException(finalException, handledByParent)
}
// Then CAS to completed state -> it must succeed
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
Expand Down Expand Up @@ -891,24 +892,29 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open val cancelsParent: Boolean get() = false
protected open val cancelsParent: Boolean get() = true

/**
* Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
* into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
* handle its exceptions is [JobImpl].
* handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open val handlesException: Boolean get() = true

/**
* Handles the final job [exception] after it was reported to the by the parent,
* where [handled] is `true` when parent had already handled exception and `false` otherwise.
*
* This method is invoked **exactly once** when the final exception of the job is determined
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
*
* Note, [handled] is always `true` when [exception] is [CancellationException].
*
* @suppress **This is unstable API and it is subject to change.*
*/
protected open fun handleJobException(exception: Throwable) {}
protected open fun handleJobException(exception: Throwable, handled: Boolean) {}

private fun cancelParent(cause: Throwable): Boolean {
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
Expand Down Expand Up @@ -1184,7 +1190,6 @@ private class Empty(override val isActive: Boolean) : Incomplete {

internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
init { initParentJobInternal(parent) }
override val cancelsParent: Boolean get() = true
override val onCancelComplete get() = true
override val handlesException: Boolean get() = false
override fun complete() = makeCompleting(Unit)
Expand Down
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ private open class TimeoutCoroutine<U, in T: U>(
override val defaultResumeMode: Int get() = MODE_DIRECT
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)?.callerFrame
override fun getStackTraceElement(): StackTraceElement? = (uCont as? CoroutineStackFrame)?.getStackTraceElement()

override val cancelsParent: Boolean
get() = false // it throws exception to parent instead of cancelling it

@Suppress("LeakingThis", "Deprecation")
override fun run() {
cancel(TimeoutCancellationException(time, this))
Expand Down
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ private open class BroadcastCoroutine<E>(
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
override val cancelsParent: Boolean get() = true
override val isActive: Boolean get() = super.isActive

override val channel: SendChannel<E>
Expand All @@ -106,7 +105,7 @@ private open class BroadcastCoroutine<E>(
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
val cause = (state as? CompletedExceptionally)?.cause
val processed = _channel.close(cause)
if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
if (cause != null && !processed && suppressed) handleCoroutineException(context, cause)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ internal open class ChannelCoroutine<E>(
protected val _channel: Channel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
override val cancelsParent: Boolean get() = true

val channel: Channel<E> get() = this

override fun cancel() {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ private class ProducerCoroutine<E>(
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
val cause = (state as? CompletedExceptionally)?.cause
val processed = _channel.close(cause)
if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
if (cause != null && !processed && suppressed) handleCoroutineException(context, cause)
}
}
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/Scopes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ internal open class ScopeCoroutine<in T>(
final override fun getStackTraceElement(): StackTraceElement? = null
override val defaultResumeMode: Int get() = MODE_DIRECT

override val cancelsParent: Boolean
get() = false // it throws exception to parent instead of cancelling it

@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally) {
Expand Down
Loading

0 comments on commit 91083a9

Please sign in to comment.