Skip to content

Commit

Permalink
Introduce SegmentQueueSynchronizer abstraction for synchronization …
Browse files Browse the repository at this point in the history
…primitives and `ReadWriteMutex`

Signed-off-by: Nikita Koval <ndkoval@ya.ru>
  • Loading branch information
ndkoval committed Feb 13, 2023
1 parent 43b6be5 commit b2ed1d6
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
returnValue(value)
}

@Suppress("INFERRED_TYPE_VARIABLE_INTO_POSSIBLE_EMPTY_INTERSECTION")
internal fun suspendCancelled(): T? {
// Increment `suspendIdx` and find the segment
// with the corresponding id. It is guaranteed
Expand Down Expand Up @@ -238,14 +239,15 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
if (value !== BROKEN && segment.cas(i, value, TAKEN)) {
// The elimination is performed successfully,
// complete with the value stored in the cell.
@Suppress("UNCHECKED_CAST")
return value as T
}
// The cell is broken, this can happen only in the `SYNC` resumption mode.
assert { resumeMode == SYNC && segment.get(i) === BROKEN }
return null
}

@Suppress("UNCHECKED_CAST")
@Suppress("UNCHECKED_CAST", "INFERRED_TYPE_VARIABLE_INTO_POSSIBLE_EMPTY_INTERSECTION")
internal fun suspend(waiter: Waiter): Boolean {
// Increment `suspendIdx` and find the segment
// with the corresponding id. It is guaranteed
Expand Down Expand Up @@ -335,7 +337,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
* moves [resumeIdx] to the first possibly non-cancelled cell, i.e.,
* to the first segment id multiplied by [SEGMENT_SIZE].
*/
@Suppress("UNCHECKED_CAST")
@Suppress("UNCHECKED_CAST", "INFERRED_TYPE_VARIABLE_INTO_POSSIBLE_EMPTY_INTERSECTION")
private fun tryResumeImpl(value: T, adjustResumeIdx: Boolean): Int {
// Check that `adjustResumeIdx` is `false` in the simple cancellation mode.
assertNot { cancellationMode == SIMPLE && adjustResumeIdx }
Expand Down Expand Up @@ -561,12 +563,14 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
// provided by a concurrent `resume(..)`.
// The value could be put only in the asynchronous mode,
// so the `resume(..)` call above must not fail.
@Suppress("UNCHECKED_CAST")
resume(value as T)
} else {
// The `resume(..)` that will come to this cell should be refused.
// Mark the cell correspondingly and help a concurrent
// `resume(..)` to process its value if needed.
val value = markRefuse(index) ?: return
@Suppress("UNCHECKED_CAST")
returnRefusedValue(value as T)
}
}
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ internal open class MutexImpl(locked: Boolean) : SegmentQueueSynchronizer<Unit>(
assert { this.owner.value === NO_OWNER }
when (waiter) {
is CancellableContinuation<*> -> {
@Suppress("UNCHECKED_CAST")
waiter as CancellableContinuation<Unit>
waiter.resume(Unit, null)
}
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
if (owner != null) error("ReadWriteMutex.write does not support owners")
writeLock()
}
@Suppress("OVERRIDE_DEPRECATION")
override val onLock: SelectClause2<Any?, Mutex> get() = error("ReadWriteMutex.write does not support `onLock`")
override fun holdsLock(owner: Any) = error("ReadWriteMutex.write does not support owners")
override fun unlock(owner: Any?) {
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/test/sync/MutexTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class MutexTest : TestBase() {
}

@Test
@Suppress("DEPRECATION")
fun testIllegalStateInvariant() = runTest {
val mutex = Mutex()
val owner = Any()
Expand Down
8 changes: 8 additions & 0 deletions kotlinx-coroutines-core/jvm/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {
protected suspend fun currentDispatcher() = coroutineContext[ContinuationInterceptor]!!
}

fun <T> CancellableContinuation<T>.tryResume0(value: T, onCancellation: (Throwable?) -> Unit): Boolean {
tryResume(value, null, onCancellation).let {
if (it == null) return false
completeResume(it)
return true
}
}

/*
* We ignore tests that test **real** non-virtualized tests with time on Windows, because
* our CI Windows is virtualized itself (oh, the irony) and its clock resolution is dozens of ms,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class MutexLincheckTest : AbstractLincheckTest() {

// TODO: `onLock` with non-null owner is non-linearizable
// onLock may suspend in case of clause re-registration.
@Suppress("DEPRECATION")
@Operation(allowExtraSuspension = true, promptCancellation = true)
suspend fun onLock() = select<Unit> { mutex.onLock(null) {} }

Expand Down

0 comments on commit b2ed1d6

Please sign in to comment.