Skip to content

Commit

Permalink
Fix of a bug in the semaphore (#1477)
Browse files Browse the repository at this point in the history
* Add a failing test for a semaphore

This test consistently fails for the current implementation. It
attempts to cause the following state: after `job2` increments
`availablePermits` but before it wakes up the acquirer in the
queue, the acquirer is cancelled. Then, regardless of whether
`RESUMED` or `CANCELLED` was written first, another cell in the
queue is marked to be resumed. However, this is incorrect: on
cancellation, the acquirer incremented the number of available
permits once more, making it `1`; thus, at the same time there
exist two permits for acquiring the mutex. At the next loop
iteration, a new acquirer tries to claim ownership of the mutex and
succeeds because it goes to the thread queue and sees its cell as
`RESUMED`. Thus, two entities own a mutex at the same time.

* Fix a bug in semaphore implementation

The fix works as follows: if `availablePermits` is negative, its
absolute value denotes the logical length of the thread queue.
Increasing its value if it was negative means that this thread
promises to wake exactly one thread, and if its positive, returns
one permit to the semaphore itself.

Before, the error was in that a queue could be of negative length:
if it consisted of only `N` cells, and `N` resume queries arrived,
cancelling any threads would mean that there are more wakers then
there are sleepers, which breaks the invariants of the semaphore.
Thus, if on cancellation the acquirer detects that it leaves the
queue empty in the presence of resumers, it simply transfers the
semaphore acquisition permit to the semaphore itself, because it
knows that it, in a sense, owns it already: there is a thread that
is bound to resume this cell.
  • Loading branch information
dkhalanskyjb authored and elizarov committed Sep 3, 2019
1 parent 57cc364 commit 9a62f27
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Expand Up @@ -166,7 +166,8 @@ private class CancelSemaphoreAcquisitionHandler(
private val index: Int
) : CancelHandler() {
override fun invoke(cause: Throwable?) {
semaphore.incPermits()
val p = semaphore.incPermits()
if (p >= 0) return
if (segment.cancel(index)) return
semaphore.resumeNextFromQueue()
}
Expand Down
38 changes: 37 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
Expand Up @@ -2,6 +2,7 @@ package kotlinx.coroutines.sync

import kotlinx.coroutines.*
import org.junit.Test
import org.junit.After
import kotlin.test.assertEquals

class SemaphoreStressTest : TestBase() {
Expand Down Expand Up @@ -57,4 +58,39 @@ class SemaphoreStressTest : TestBase() {
semaphore.release()
assertEquals(1, semaphore.availablePermits)
}
}

/**
* This checks if repeated releases that race with cancellations put
* the semaphore into an incorrect state where permits are leaked.
*/
@Test
fun stressReleaseCancelRace() = runTest {
val n = 10_000 * stressTestMultiplier
val semaphore = Semaphore(1, 1)
newSingleThreadContext("SemaphoreStressTest").use { pool ->
repeat (n) {
// Initially, we hold the permit and no one else can `acquire`,
// otherwise it's a bug.
assertEquals(0, semaphore.availablePermits)
var job1_entered_critical_section = false
val job1 = launch(start = CoroutineStart.UNDISPATCHED) {
semaphore.acquire()
job1_entered_critical_section = true
semaphore.release()
}
// check that `job1` didn't finish the call to `acquire()`
assertEquals(false, job1_entered_critical_section)
val job2 = launch(pool) {
semaphore.release()
}
// Because `job2` executes in a separate thread, this
// cancellation races with the call to `release()`.
job1.cancelAndJoin()
job2.join()
assertEquals(1, semaphore.availablePermits)
semaphore.acquire()
}
}
}

}

0 comments on commit 9a62f27

Please sign in to comment.