Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent StackOverflowError in CompletableFuture.asDeferred and proper… #2731

Merged
merged 4 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
override fun onSuccess(result: T?) {
// Here we work with flexible types, so we unchecked cast to trick the type system
@Suppress("UNCHECKED_CAST")
deferred.complete(result as T)
runCatching { deferred.complete(result as T) }
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}

override fun onFailure(t: Throwable) {
deferred.completeExceptionally(t)
runCatching { deferred.completeExceptionally(t) }
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
dkhalanskyjb marked this conversation as resolved.
Show resolved Hide resolved
}
}, MoreExecutors.directExecutor())

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.guava

import com.google.common.util.concurrent.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import kotlin.test.*

class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {

// This is a separate test in order to avoid interference with uncaught exception handlers in other tests
private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
private lateinit var caughtException: Throwable

@Before
fun setUp() {
Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
}

@After
fun tearDown() {
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}

@Test
fun testLostExceptionOnSuccess() = runTest {
val future = SettableFuture.create<Int>()
val deferred = future.asDeferred()
deferred.invokeOnCompletion { throw TestException() }
future.set(1)
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
}

@Test
fun testLostExceptionOnFailure() = runTest {
val future = SettableFuture.create<Int>()
val deferred = future.asDeferred()
deferred.invokeOnCompletion { throw TestException() }
val failedFuture = CompletableDeferred<Int>().apply {
completeExceptionally(TestException2())
}.asListenableFuture()
future.setFuture(failedFuture)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
}
}
20 changes: 20 additions & 0 deletions integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.*
import kotlin.test.*

class ListenableFutureTest : TestBase() {
Expand Down Expand Up @@ -755,4 +756,23 @@ class ListenableFutureTest : TestBase() {
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}

@Test
fun testStackOverflow() = runTest {
val future = SettableFuture.create<Int>()
val completed = AtomicLong()
val count = 10000L
val children = ArrayList<Job>()
for (i in 0 until count) {
children += launch(Dispatchers.Default) {
future.asDeferred().await()
completed.incrementAndGet()
}
}
future.set(1)
withTimeout(60_000) {
children.forEach { it.join() }
assertEquals(count, completed.get())
}
}
}
19 changes: 12 additions & 7 deletions integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
}
val result = CompletableDeferred<T>()
whenComplete { value, exception ->
if (exception == null) {
// the future has completed normally
result.complete(value)
} else {
// the future has completed with an exception, unwrap it consistently with fast path
// Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
try {
if (exception == null) {
// the future has completed normally
result.complete(value)
} else {
// the future has completed with an exception, unwrap it consistently with fast path
// Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
}
} catch (e: Throwable) {
// We can come here iff the upstream future is completed and the deferred internals threw an exception from its handler
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
handleCoroutineException(EmptyCoroutineContext, e)
}
}
result.cancelFutureOnCompletion(future)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package future

import kotlinx.coroutines.*
import kotlinx.coroutines.future.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import kotlin.test.*

class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {

// This is a separate test in order to avoid interference with uncaught exception handlers in other tests
private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: wouldn't it be more symmetrical if exceptionHandler was also initialized in setUp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it more concise to initialize in-place everything that could be initialized in-place, [opinionated] it reads better and is easier to reason about

private lateinit var caughtException: Throwable

@Before
fun setUp() {
Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
}

@After
fun tearDown() {
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}

@Test
fun testLostException() = runTest {
val future = CompletableFuture<Int>()
val deferred = future.asDeferred()
deferred.invokeOnCompletion { throw TestException() }
future.complete(1)
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
}
}
19 changes: 19 additions & 0 deletions integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -575,4 +575,23 @@ class FutureTest : TestBase() {
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}

@Test
fun testStackOverflow() = runTest {
val future = CompletableFuture<Int>()
val completed = AtomicLong()
val count = 10000L
val children = ArrayList<Job>()
for (i in 0 until count) {
children += launch(Dispatchers.Default) {
future.asDeferred().await()
completed.incrementAndGet()
}
}
future.complete(1)
withTimeout(60_000) {
children.forEach { it.join() }
assertEquals(count, completed.get())
}
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/jvm/src/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ import java.util.concurrent.*
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCompletion { future.cancel(false) }
* invokeOnCompletion { if (it != null) future.cancel(false) }
* ```
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well?
invokeOnCompletion(handler = CancelFutureOnCompletion(future))

/**
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCancellation { future.cancel(false) }
* invokeOnCancellation { if (it != null) future.cancel(false) }
* ```
*/
public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit =
Expand All @@ -38,15 +38,15 @@ private class CancelFutureOnCompletion(
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
if (cause != null) future.cancel(false)
}
}

private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
if (cause != null) future.cancel(false)
}
override fun toString() = "CancelFutureOnCancel[$future]"
}