diff --git a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt index 291845e75f..a30e6393b6 100644 --- a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt @@ -1,27 +1,36 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* -import org.junit.* -import org.junit.Test import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.test.* +/** + * Testing the procedure of attaching a child to the parent job. + */ class JobChildStressTest : TestBase() { private val N_ITERATIONS = 10_000 * stressTestMultiplier private val pool = newFixedThreadPoolContext(3, "JobChildStressTest") - @After + @AfterTest fun tearDown() { pool.close() } /** - * Perform concurrent launch of a child job & cancellation of the explicit parent job + * Tests attaching a child while the parent is trying to finalize its state. + * + * Checks the following interleavings: + * - A child attaches before the parent is cancelled. + * - A child attaches after the parent is cancelled, but before the parent notifies anyone about it. + * - A child attaches after the parent notifies the children about being cancelled, + * but before it starts waiting for its children. + * - A child attempts to attach after the parent stops waiting for its children, + * which immediately cancels the child. */ @Test @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") - fun testChild() = runTest { + fun testChildAttachmentRacingWithCancellation() = runTest { val barrier = CyclicBarrier(3) repeat(N_ITERATIONS) { var wasLaunched = false @@ -30,7 +39,7 @@ class JobChildStressTest : TestBase() { unhandledException = ex } val scope = CoroutineScope(pool + handler) - val parent = CompletableDeferred() + val parent = createCompletableDeferredForTesting(it) // concurrent child launcher val launcher = scope.launch { barrier.await() @@ -56,13 +65,27 @@ class JobChildStressTest : TestBase() { } } + /** + * Tests attaching a child while the parent is waiting for the last child job to complete. + * + * Checks the following interleavings: + * - A child attaches while the parent is already completing, but is waiting for its children. + * - A child attempts to attach after the parent stops waiting for its children, + * which immediately cancels the child. + */ @Test - fun testFailingChildIsAddedWhenJobFinalizesItsState() { + fun testChildAttachmentRacingWithLastChildCompletion() { // All exceptions should get aggregated here repeat(N_ITERATIONS) { runBlocking { val rogueJob = AtomicReference() + /** not using [createCompletableDeferredForTesting] because we don't need extra children. */ val deferred = CompletableDeferred() + // optionally, add a completion handler to the parent job, so that the child tries to enter a list with + // multiple elements, not just one. + if (it.mod(2) == 0) { + deferred.invokeOnCompletion { } + } launch(pool + deferred) { deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child // **Asynchronously** submit task that launches a child so it races with completion diff --git a/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt index dc2314bb6c..3f085b6f20 100644 --- a/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt @@ -30,6 +30,9 @@ class JobHandlersUpgradeStressTest : TestBase() { val state = atomic(0) } + /** + * Tests handlers not being invoked more than once. + */ @Test fun testStress() { println("--- JobHandlersUpgradeStressTest") @@ -91,4 +94,4 @@ class JobHandlersUpgradeStressTest : TestBase() { println(" Fired handler ${fired.value} times") } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt new file mode 100644 index 0000000000..3df62b666e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt @@ -0,0 +1,192 @@ +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.* +import kotlin.test.* +import kotlin.time.Duration.Companion.seconds + +class JobOnCompletionStressTest: TestBase() { + private val N_ITERATIONS = 10_000 * stressTestMultiplier + private val pool = newFixedThreadPoolContext(2, "JobOnCompletionStressTest") + + private val completionHandlerSeesCompletedParent = AtomicBoolean(false) + private val completionHandlerSeesCancelledParent = AtomicBoolean(false) + private val encounteredException = AtomicReference(null) + + @AfterTest + fun tearDown() { + pool.close() + } + + @Test + fun testOnCompletionRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = true, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCompletionRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = true, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCancellingRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = true, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCancellingRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = true, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCompletionRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = false, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCompletionRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = false, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCancellingRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = false, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCancellingRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = false, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + private suspend fun testHandlerRacingWithCancellation( + onCancelling: Boolean, + invokeImmediately: Boolean, + parentCompletion: CompletableDeferred.() -> Unit, + validate: () -> Unit, + ) { + repeat(N_ITERATIONS) { + val entered = Channel(1) + completionHandlerSeesCompletedParent.set(false) + completionHandlerSeesCancelledParent.set(false) + encounteredException.set(null) + val parent = createCompletableDeferredForTesting(it) + val barrier = CyclicBarrier(2) + val handlerInstallJob = coroutineScope { + launch(pool) { + barrier.await() + parent.parentCompletion() + } + async(pool) { + barrier.await() + parent.invokeOnCompletion( + onCancelling = onCancelling, + invokeImmediately = invokeImmediately, + ) { exception -> + encounteredException.set(exception) + completionHandlerSeesCompletedParent.set(parent.isCompleted) + completionHandlerSeesCancelledParent.set(parent.isCancelled) + entered.trySend(Unit) + } + } + } + if (invokeImmediately || handlerInstallJob.getCompleted() !== NonDisposableHandle) { + withTimeout(1.seconds) { + entered.receive() + } + try { + validate() + } catch (e: Throwable) { + println("Iteration $it failed") + println("invokeOnCompletion returned ${handlerInstallJob.getCompleted()}") + throw e + } + } else { + assertTrue(entered.isEmpty) + } + } + } +} + +/** + * Creates a [CompletableDeferred], optionally adding completion handlers and/or other children to the job depending + * on [iteration]. + * The purpose is to test not just attaching completion handlers to empty or one-element lists (see the [JobSupport] + * implementation for details on what this means), but also to lists with multiple elements. + */ +fun createCompletableDeferredForTesting(iteration: Int): CompletableDeferred { + val parent = CompletableDeferred() + /* We optionally add completion handlers and/or other children to the parent job + to test the scenarios where a child is placed into an empty list, a single-element list, + or a list with multiple elements. */ + if (iteration.mod(2) == 0) { + parent.invokeOnCompletion { } + } + if (iteration.mod(3) == 0) { + GlobalScope.launch(parent) { } + } + return parent +}