From 2528d4d4cf50d43a9d6cae159f20669a9d70cba2 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 26 May 2021 17:41:10 +0300 Subject: [PATCH 1/4] Prevent StackOverflowError in CompletableFuture.asDeferred and properly report exceptions from completion handlers * It turned out that 'cancel' on completed future tries to help and invoke 'whenComplete' handlers that also invoke 'cancel' on the very same future * Use top-level exception handler as a last resort to deliver an exception Fixes #2730 --- .../src/ListenableFuture.kt | 6 ++- ...eferredUnhandledCompletionExceptionTest.kt | 49 +++++++++++++++++++ .../test/ListenableFutureTest.kt | 19 +++++++ .../src/future/Future.kt | 19 ++++--- ...eferredUnhandledCompletionExceptionTest.kt | 38 ++++++++++++++ .../test/future/FutureTest.kt | 18 +++++++ kotlinx-coroutines-core/jvm/src/Future.kt | 10 ++-- 7 files changed, 145 insertions(+), 14 deletions(-) create mode 100644 integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt create mode 100644 integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 35e0aeb379..8f11e0a916 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -136,11 +136,13 @@ public fun ListenableFuture.asDeferred(): Deferred { override fun onSuccess(result: T?) { // Here we work with flexible types, so we unchecked cast to trick the type system @Suppress("UNCHECKED_CAST") - deferred.complete(result as T) + runCatching { deferred.complete(result as T) } + .onFailure { handleCoroutineException(EmptyCoroutineContext, it) } } override fun onFailure(t: Throwable) { - deferred.completeExceptionally(t) + runCatching { deferred.completeExceptionally(t) } + .onFailure { handleCoroutineException(EmptyCoroutineContext, it) } } }, MoreExecutors.directExecutor()) diff --git a/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt new file mode 100644 index 0000000000..6da1b7ed5e --- /dev/null +++ b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.guava + +import com.google.common.util.concurrent.* +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { + + // This is a separate test in order to avoid interference with uncaught exception handlers in other tests + private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + private lateinit var caughtException: Throwable + + @Before + fun setUp() { + Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e } + } + + @After + fun tearDown() { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + + @Test + fun testLostExceptionOnSuccess() = runTest { + val future = SettableFuture.create() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.set(1) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } + + @Test + fun testLostExceptionOnFailure() = runTest { + val future = SettableFuture.create() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + val failedFuture = CompletableDeferred().apply { + completeExceptionally(TestException2()) + }.asListenableFuture() + future.setFuture(failedFuture) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } +} diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index c463174a8d..8ef7c92caa 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -11,6 +11,7 @@ import org.junit.Ignore import org.junit.Test import java.util.concurrent.* import java.util.concurrent.CancellationException +import java.util.concurrent.atomic.* import kotlin.test.* class ListenableFutureTest : TestBase() { @@ -755,4 +756,22 @@ class ListenableFutureTest : TestBase() { future(start = CoroutineStart.ATOMIC) { } future(start = CoroutineStart.UNDISPATCHED) { } } + + @Test + fun testStackOverflow() = runTest { + val future = SettableFuture.create() + val completed = AtomicLong() + val count = 10000L + for (i in 0 until count) { + launch(Dispatchers.Default) { + future.asDeferred().await() + completed.incrementAndGet() + } + } + future.set(1) + withTimeout(60_000) { + coroutineContext.job.children.toList().joinAll() + assertEquals(count, completed.get()) + } + } } diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index 7e9c349c66..dc9dab509c 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -126,13 +126,18 @@ public fun CompletionStage.asDeferred(): Deferred { } val result = CompletableDeferred() whenComplete { value, exception -> - if (exception == null) { - // the future has completed normally - result.complete(value) - } else { - // the future has completed with an exception, unwrap it consistently with fast path - // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping - result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) + try { + if (exception == null) { + // the future has completed normally + result.complete(value) + } else { + // the future has completed with an exception, unwrap it consistently with fast path + // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping + result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) + } + } catch (e: Throwable) { + // We can come here iff the upstream future is completed and the deferred internals threw an exception from its handler + handleCoroutineException(EmptyCoroutineContext, e) } } result.cancelFutureOnCompletion(future) diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt new file mode 100644 index 0000000000..bf810af7aa --- /dev/null +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package future + +import kotlinx.coroutines.* +import kotlinx.coroutines.future.* +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { + + // This is a separate test in order to avoid interference with uncaught exception handlers in other tests + private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + private lateinit var caughtException: Throwable + + @Before + fun setUp() { + Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e } + } + + @After + fun tearDown() { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + + @Test + fun testLostException() = runTest { + val future = CompletableFuture() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.complete(1) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } +} diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index 08e5cdad93..9d20da0f95 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -575,4 +575,22 @@ class FutureTest : TestBase() { future(start = CoroutineStart.ATOMIC) { } future(start = CoroutineStart.UNDISPATCHED) { } } + + @Test + fun testStackOverflow() = runTest { + val future = CompletableFuture() + val completed = AtomicLong() + val count = 10000L + for (i in 0 until count) { + launch(Dispatchers.Default) { + future.asDeferred().await() + completed.incrementAndGet() + } + } + future.complete(1) + withTimeout(60_000) { + coroutineContext.job.children.toList().joinAll() + assertEquals(count, completed.get()) + } + } } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index 948ef6065c..b27a970845 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -13,20 +13,20 @@ import java.util.concurrent.* * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCompletion { future.cancel(false) } + * invokeOnCompletion { if (it != null) future.cancel(false) } * ``` * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = - invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well? + invokeOnCompletion(handler = CancelFutureOnCompletion(future)) /** * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCancellation { future.cancel(false) } + * invokeOnCancellation { if (it != null) future.cancel(false) } * ``` */ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit = @@ -38,7 +38,7 @@ private class CancelFutureOnCompletion( override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } } @@ -46,7 +46,7 @@ private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandle override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } override fun toString() = "CancelFutureOnCancel[$future]" } From 9398f7fddba72006235b63c027f6318c1f8ff3ca Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 27 May 2021 16:18:33 +0300 Subject: [PATCH 2/4] ~fix a test --- .../kotlinx-coroutines-guava/test/ListenableFutureTest.kt | 5 +++-- .../kotlinx-coroutines-jdk8/test/future/FutureTest.kt | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index 8ef7c92caa..69ba193071 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -762,15 +762,16 @@ class ListenableFutureTest : TestBase() { val future = SettableFuture.create() val completed = AtomicLong() val count = 10000L + val children = ArrayList() for (i in 0 until count) { - launch(Dispatchers.Default) { + children += launch(Dispatchers.Default) { future.asDeferred().await() completed.incrementAndGet() } } future.set(1) withTimeout(60_000) { - coroutineContext.job.children.toList().joinAll() + children.forEach { it.join() } assertEquals(count, completed.get()) } } diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index 9d20da0f95..372e79ef1d 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -581,15 +581,16 @@ class FutureTest : TestBase() { val future = CompletableFuture() val completed = AtomicLong() val count = 10000L + val children = ArrayList() for (i in 0 until count) { - launch(Dispatchers.Default) { + children += launch(Dispatchers.Default) { future.asDeferred().await() completed.incrementAndGet() } } future.complete(1) withTimeout(60_000) { - coroutineContext.job.children.toList().joinAll() + children.forEach { it.join() } assertEquals(count, completed.get()) } } From 5263411f55fcf15e943982ca8ae2476667a072a8 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 27 May 2021 17:51:07 +0300 Subject: [PATCH 3/4] Update integration/kotlinx-coroutines-jdk8/src/future/Future.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- integration/kotlinx-coroutines-jdk8/src/future/Future.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index dc9dab509c..caf2a3c359 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -136,7 +136,7 @@ public fun CompletionStage.asDeferred(): Deferred { result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) } } catch (e: Throwable) { - // We can come here iff the upstream future is completed and the deferred internals threw an exception from its handler + // We come here iff the internals of Deferred threw an exception during its completion handleCoroutineException(EmptyCoroutineContext, e) } } From 4ac1a0c5b129982d9dbc1131e8227cb3c1e227fe Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 27 May 2021 17:51:45 +0300 Subject: [PATCH 4/4] ~simplify LF test --- .../test/FutureAsDeferredUnhandledCompletionExceptionTest.kt | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt index 6da1b7ed5e..d6469a947e 100644 --- a/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt +++ b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -40,10 +40,7 @@ class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { val future = SettableFuture.create() val deferred = future.asDeferred() deferred.invokeOnCompletion { throw TestException() } - val failedFuture = CompletableDeferred().apply { - completeExceptionally(TestException2()) - }.asListenableFuture() - future.setFuture(failedFuture) + future.setException(TestException2()) assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } } }