-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rework reusability control in cancellable continuation (#2581)
* Rework reusability control in cancellable continuation * Update initCancellability documentation and implementation to be aligned with current invariants * Make parentHandle non-volatile and ensure there are no races around it * Establish new reusability invariants - Reusable continuation can be used _only_ if it states is not REUSABLE_CLAIMED - If it is, spin-loop and wait for release - Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and no one will be able to release intercepted continuation (-> detach from parent) - It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play * Leverage the fact that it's non-atomic and do not check it for cancellation prematurely. It increases the performance of fast-path, but potentially affects rare cancellation cases Fixes #2564
- Loading branch information
Showing
8 changed files
with
167 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines | ||
|
||
import kotlinx.coroutines.channels.* | ||
import org.junit.Test | ||
import kotlin.test.* | ||
|
||
class ReusableCancellableContinuationLeakStressTest : TestBase() { | ||
|
||
@Suppress("UnnecessaryVariable") | ||
private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T { | ||
val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in | ||
return r | ||
} | ||
|
||
private val iterations = 100_000 * stressTestMultiplier | ||
|
||
class Leak(val i: Int) | ||
|
||
@Test // Simplified version of #2564 | ||
fun testReusableContinuationLeak() = runTest { | ||
val channel = produce(capacity = 1) { // from the main thread | ||
(0 until iterations).forEach { | ||
send(Leak(it)) | ||
} | ||
} | ||
|
||
launch(Dispatchers.Default) { | ||
repeat (iterations) { | ||
val value = channel.receiveBatch() | ||
assertEquals(it, value.i) | ||
} | ||
(channel as Job).join() | ||
|
||
FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } | ||
} | ||
} | ||
} |