From 00122c511ffe73306dc00c142df5d9180c5bcdd5 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 7 Sep 2021 15:54:40 +0300 Subject: [PATCH] Handle unexpected exceptions in the dispatcher when throwing runnable is passed as an argument --- .../common/src/internal/LimitedDispatcher.kt | 9 ++++--- .../jvm/test/LimitedParallelismTest.kt | 24 ++++++++++++++++++- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 38298507eb..5cbd5b8eef 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -33,7 +33,11 @@ internal class LimitedDispatcher( while (true) { val task = queue.removeFirstOrNull() if (task != null) { - task.run() + try { + task.run() + } catch (e: Throwable) { + handleCoroutineException(EmptyCoroutineContext, e) + } // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { // Do "yield" to let other views to execute their runnable as well @@ -62,8 +66,7 @@ internal class LimitedDispatcher( } /* - * Protect against race when the worker is finished - * right after our check + * Protect against race when the worker is finished right after our check. */ @Suppress("CAST_NEVER_SUCCEEDS") synchronized(this as SynchronizedObject) { diff --git a/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt index c58fa6ba3b..30c54117a9 100644 --- a/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt +++ b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt @@ -4,7 +4,10 @@ package kotlinx.coroutines -import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.coroutines.* +import kotlin.test.* class LimitedParallelismTest : TestBase() { @@ -30,4 +33,23 @@ class LimitedParallelismTest : TestBase() { joinAll(j1, j2) executor.close() } + + @Test + fun testUnhandledException() = runTest { + var caughtException: Throwable? = null + val executor = Executors.newFixedThreadPool( + 1 + ) { + Thread(it).also { + it.uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { _, e -> caughtException = e } + } + }.asCoroutineDispatcher() + val view = executor.limitedParallelism(1) + view.dispatch(EmptyCoroutineContext, Runnable { throw TestException() }) + withContext(view) { + // Verify it is in working state and establish happens-before + } + assertTrue { caughtException is TestException } + executor.close() + } }