Skip to content

Mutex for Kotlin/Common #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open

Conversation

bbrockbernd
Copy link
Collaborator

@dkhalanskyjb
Copy link

It would be better to set native-thread-parking as the base branch of this PR. It's more difficult to review just the mutex when the changes related to parking are also included.

* Multiplatform mutex.
* On native based on pthread system calls.
* On JVM delegates to ReentrantLock.
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is user-visible documentation, and so it must explain the contracts of the data structure. What is a mutex? What would an example of its usage look like? Is it reentrant? Is it fair? It should mention that it's unsuitable for async (suspend) code, blocks the thread, and is only suitable for low-level technical work.

The same goes for every function in the file: all API entries need to be documented. When does each function return? When can it throw an exception? What does it do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have adjusted the docs, wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks!

* On JVM delegates to ReentrantLock.
*/
expect class Mutex() {
fun isLocked(): Boolean

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it turns out someone does need this function, they can do fun Mutex.isLocked() = if (tryLock()) { unlock(); true } else false (right?), so including this function is purely a performance optimization for a performance problem that may not even exist. So, unless we know of some specific use cases for this function, let's not take on the extra commitment of including it.

Copy link
Collaborator Author

@bbrockbernd bbrockbernd Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. I have tried to keep the api similar to the coroutines mutex's api. But will remove it for now.

P.S. true and false should be swapped right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, this won't work if I am holding the lock myself since it is reentrant.

/**
* Multiplatform mutex.
* On native based on pthread system calls.
* On JVM delegates to ReentrantLock.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem accurate, though? ReentrantLock does get used in synchronized and kotlinx.atomicfu.ReentrantLock, but not in Mutex, right?

If so, there seems to be some inconsistency: on the one hand, atomicfu.ReentrantLock delegates to Java's ReentrantLock, on the other, atomicfu.Mutex doesn't, whereas on Native, atomicfu.ReentrantLock and atomcifu.Mutex are basically the same thing.
If we believe that our mutex is better, we should use it throughout; if we don't, we should use ReentrantLock throughout. Are there measurements showing which one performs better?

* Multiplatform mutex.
* On native based on pthread system calls.
* On JVM delegates to ReentrantLock.
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks!

@bbrockbernd bbrockbernd force-pushed the bbrockbernd/common-mutex branch from 71dfd4b to 28f0d71 Compare March 13, 2025 14:42
@bbrockbernd bbrockbernd force-pushed the bbrockbernd/common-mutex branch 2 times, most recently from bfa21fb to cdc1df9 Compare April 25, 2025 14:18
@bbrockbernd bbrockbernd force-pushed the native-thread-parking branch from 519e59d to 0d9ea07 Compare April 29, 2025 11:59
@bbrockbernd bbrockbernd force-pushed the bbrockbernd/common-mutex branch from cdc1df9 to f9c6167 Compare May 1, 2025 08:57
@bbrockbernd bbrockbernd force-pushed the bbrockbernd/common-mutex branch from f9c6167 to 7a35ef7 Compare June 3, 2025 13:21
@bbrockbernd bbrockbernd changed the base branch from native-thread-parking to develop June 4, 2025 12:44
@bbrockbernd bbrockbernd requested a review from dkhalanskyjb June 4, 2025 12:45

final fun lock() // kotlinx.atomicfu.locks/NativeMutexNode.lock|lock(){}[0]
final fun unlock() // kotlinx.atomicfu.locks/NativeMutexNode.unlock|unlock(){}[0]
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume removing this implementation detail is safe, but cc @fzhinkin: he probably knows if this was already used by someone else.

// Dequeueing woken nodes can lead to issues when pre-unparked.
parkingQueue.dequeue()
nextParker = parkingQueue.getHead()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully get it. Take a lock in one thread, then await the lock with a small timeout in another thread, time out. state == 2. unlock is called, currentState == 1. nodeWake fails, and decrementAndGet() returns 0, after which we exit without dequeueing anything at all. Repeat that a hundred times, and you get a hundred useless nodes in the queue. Is this a memory leak, or do I misunderstand some logic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It indeed needs to be either getAndDecrement or >= 0. I have added some validation logic to the lincheck tests to check that the queue is indeed always empty when all operations are finished.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change broke correctness (tests fail). I'd expect getAndDecrement >= 0 to be a reason to dequeue the caller, but not a reason to attempt to wake another one up.

@bbrockbernd bbrockbernd requested a review from dkhalanskyjb July 2, 2025 17:57
@dkhalanskyjb dkhalanskyjb requested a review from fzhinkin July 3, 2025 06:40
Copy link

@dkhalanskyjb dkhalanskyjb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done! All I have left are stylistic nitpicks.


class NativeMutexTimeoutLincheckTest {
class Counter {
@Volatile

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what this does. Since Counter is guarded by a lock, what purpose does Volatile serve?


class NativeMutexReentrantLincheckTest {
class Counter {
@Volatile

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


class NativeMutexLincheckTest {
class Counter {
@Volatile

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

public actual fun tryLock(timeout: Duration): Boolean = true
public actual fun lock(): Unit { state++ }
public actual fun unlock(): Unit {
if (state == 0) throw IllegalStateException("Mutex already unlocked")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: here and everywhere else where we have if (cond) throw IllegalStateException(str), it can be replaced with check(!cond) { str }, which looks nicer.

Comment on lines +31 to +37
t.lockInt.lock()
if (t.lockInt.n % t.mod == t.id) t.lockInt.n++
if (t.lockInt.n >= t.max) {
t.lockInt.unlock()
break
}
t.lockInt.unlock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
t.lockInt.lock()
if (t.lockInt.n % t.mod == t.id) t.lockInt.n++
if (t.lockInt.n >= t.max) {
t.lockInt.unlock()
break
}
t.lockInt.unlock()
t.lockInt.lock()
try {
if (t.lockInt.n % t.mod == t.id) t.lockInt.n++
if (t.lockInt.n >= t.max) break
} finally {
t.lockInt.unlock()
}

is a somewhat more idiomatic way to express the same thing.

Comment on lines +42 to +47
data class LockIntTest(
val lockInt: LockInt,
val max: Int,
val mod: Int,
val id: Int,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can just be function parameters, I think, so there's no need for this whole class, right?

Comment on lines +20 to +24
val futureList = mutableListOf<Fut>()
repeat(nThreads) { i ->
val test = LockIntTest(lockInt, countTo, nThreads, i)
futureList.add(testWithThread(test))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val futureList = mutableListOf<Fut>()
repeat(nThreads) { i ->
val test = LockIntTest(lockInt, countTo, nThreads, i)
futureList.add(testWithThread(test))
}
val futureList = List(nThreads) { i ->
testWithThread(LockIntTest(lockInt, countTo, nThreads, i))
}

private const val N_REPEATS_SLOW = 100
private const val N_REPEATS_FAST = 30_000

class NativeMutexTest {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two tests look basically the same, the only two parameters that make them different are the number of repetitions (100 vs 30_000) and the length of the sleep between adding two values (10 vs 0 milliseconds). Seems worth it to abstract this logic into one function that gets called from two tests.

Comment on lines +20 to +44
val threads = mutableListOf<TestThread>()

repeat(N_THREADS) { threadId ->
val thread = testThread {
while (counter.value < TARGET_COUNT) {
// Try to acquire the lock with a timeout
if (mutex.tryLock(getRandomTimeout().milliseconds)) {
try {
// Increment the counter if lock was acquired
if (counter.value < TARGET_COUNT) {
counter.incrementAndGet()
}
// Random sleep after increment to increase variation
sleepMillis(getRandomWait())
} finally {
mutex.unlock()
}
}

// Random sleep between increment attempts to increase variation
sleepMillis(getRandomWait())
}
}
threads.add(thread)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val threads = mutableListOf<TestThread>()
repeat(N_THREADS) { threadId ->
val thread = testThread {
while (counter.value < TARGET_COUNT) {
// Try to acquire the lock with a timeout
if (mutex.tryLock(getRandomTimeout().milliseconds)) {
try {
// Increment the counter if lock was acquired
if (counter.value < TARGET_COUNT) {
counter.incrementAndGet()
}
// Random sleep after increment to increase variation
sleepMillis(getRandomWait())
} finally {
mutex.unlock()
}
}
// Random sleep between increment attempts to increase variation
sleepMillis(getRandomWait())
}
}
threads.add(thread)
}
val threads = List(N_THREADS) { threadId ->
testThread {
while (counter.value < TARGET_COUNT) {
// Try to acquire the lock with a timeout
if (mutex.tryLock(getRandomTimeout().milliseconds)) {
try {
// Increment the counter if lock was acquired
if (counter.value < TARGET_COUNT) {
counter.incrementAndGet()
}
// Random sleep after increment to increase variation
sleepMillis(getRandomWait())
} finally {
mutex.unlock()
}
}
// Random sleep between increment attempts to increase variation
sleepMillis(getRandomWait())
}
}
}

@fzhinkin fzhinkin self-assigned this Jul 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants