-
Notifications
You must be signed in to change notification settings - Fork 67
Mutex for Kotlin/Common #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
bbrockbernd
commented
Feb 7, 2025
- A FIFO reentrant mutex for Kotlin/Common.
- Depends on Thread parking for Kotlin/Common #498 (Parking logic is under review there)
It would be better to set |
* Multiplatform mutex. | ||
* On native based on pthread system calls. | ||
* On JVM delegates to ReentrantLock. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is user-visible documentation, and so it must explain the contracts of the data structure. What is a mutex? What would an example of its usage look like? Is it reentrant? Is it fair? It should mention that it's unsuitable for async (suspend
) code, blocks the thread, and is only suitable for low-level technical work.
The same goes for every function in the file: all API entries need to be documented. When does each function return? When can it throw an exception? What does it do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have adjusted the docs, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks!
* On JVM delegates to ReentrantLock. | ||
*/ | ||
expect class Mutex() { | ||
fun isLocked(): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if it turns out someone does need this function, they can do fun Mutex.isLocked() = if (tryLock()) { unlock(); true } else false
(right?), so including this function is purely a performance optimization for a performance problem that may not even exist. So, unless we know of some specific use cases for this function, let's not take on the extra commitment of including it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. I have tried to keep the api similar to the coroutines mutex's api. But will remove it for now.
P.S. true and false should be swapped right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, this won't work if I am holding the lock myself since it is reentrant.
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt
Outdated
Show resolved
Hide resolved
/** | ||
* Multiplatform mutex. | ||
* On native based on pthread system calls. | ||
* On JVM delegates to ReentrantLock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem accurate, though? ReentrantLock
does get used in synchronized
and kotlinx.atomicfu.ReentrantLock
, but not in Mutex
, right?
If so, there seems to be some inconsistency: on the one hand, atomicfu.ReentrantLock
delegates to Java's ReentrantLock
, on the other, atomicfu.Mutex
doesn't, whereas on Native, atomicfu.ReentrantLock
and atomcifu.Mutex
are basically the same thing.
If we believe that our mutex is better, we should use it throughout; if we don't, we should use ReentrantLock
throughout. Are there measurements showing which one performs better?
* Multiplatform mutex. | ||
* On native based on pthread system calls. | ||
* On JVM delegates to ReentrantLock. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks!
71dfd4b
to
28f0d71
Compare
bfa21fb
to
cdc1df9
Compare
519e59d
to
0d9ea07
Compare
cdc1df9
to
f9c6167
Compare
f9c6167
to
7a35ef7
Compare
atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.jsAndWasmShared.kt
Outdated
Show resolved
Hide resolved
|
||
final fun lock() // kotlinx.atomicfu.locks/NativeMutexNode.lock|lock(){}[0] | ||
final fun unlock() // kotlinx.atomicfu.locks/NativeMutexNode.unlock|unlock(){}[0] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume removing this implementation detail is safe, but cc @fzhinkin: he probably knows if this was already used by someone else.
atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt
Outdated
Show resolved
Hide resolved
atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt
Outdated
Show resolved
Hide resolved
// Dequeueing woken nodes can lead to issues when pre-unparked. | ||
parkingQueue.dequeue() | ||
nextParker = parkingQueue.getHead() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully get it. Take a lock in one thread, then await the lock with a small timeout in another thread, time out. state == 2
. unlock
is called, currentState == 1
. nodeWake
fails, and decrementAndGet()
returns 0
, after which we exit without dequeueing anything at all. Repeat that a hundred times, and you get a hundred useless nodes in the queue. Is this a memory leak, or do I misunderstand some logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! It indeed needs to be either getAndDecrement
or >= 0
. I have added some validation logic to the lincheck tests to check that the queue is indeed always empty when all operations are finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this change broke correctness (tests fail). I'd expect getAndDecrement >= 0
to be a reason to dequeue the caller, but not a reason to attempt to wake another one up.
…tion logic to lincheck tests to check that queue is empty after each execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done! All I have left are stylistic nitpicks.
|
||
class NativeMutexTimeoutLincheckTest { | ||
class Counter { | ||
@Volatile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what this does. Since Counter
is guarded by a lock, what purpose does Volatile
serve?
|
||
class NativeMutexReentrantLincheckTest { | ||
class Counter { | ||
@Volatile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
|
||
class NativeMutexLincheckTest { | ||
class Counter { | ||
@Volatile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
public actual fun tryLock(timeout: Duration): Boolean = true | ||
public actual fun lock(): Unit { state++ } | ||
public actual fun unlock(): Unit { | ||
if (state == 0) throw IllegalStateException("Mutex already unlocked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: here and everywhere else where we have if (cond) throw IllegalStateException(str)
, it can be replaced with check(!cond) { str }
, which looks nicer.
t.lockInt.lock() | ||
if (t.lockInt.n % t.mod == t.id) t.lockInt.n++ | ||
if (t.lockInt.n >= t.max) { | ||
t.lockInt.unlock() | ||
break | ||
} | ||
t.lockInt.unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.lockInt.lock() | |
if (t.lockInt.n % t.mod == t.id) t.lockInt.n++ | |
if (t.lockInt.n >= t.max) { | |
t.lockInt.unlock() | |
break | |
} | |
t.lockInt.unlock() | |
t.lockInt.lock() | |
try { | |
if (t.lockInt.n % t.mod == t.id) t.lockInt.n++ | |
if (t.lockInt.n >= t.max) break | |
} finally { | |
t.lockInt.unlock() | |
} |
is a somewhat more idiomatic way to express the same thing.
data class LockIntTest( | ||
val lockInt: LockInt, | ||
val max: Int, | ||
val mod: Int, | ||
val id: Int, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These can just be function parameters, I think, so there's no need for this whole class, right?
val futureList = mutableListOf<Fut>() | ||
repeat(nThreads) { i -> | ||
val test = LockIntTest(lockInt, countTo, nThreads, i) | ||
futureList.add(testWithThread(test)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val futureList = mutableListOf<Fut>() | |
repeat(nThreads) { i -> | |
val test = LockIntTest(lockInt, countTo, nThreads, i) | |
futureList.add(testWithThread(test)) | |
} | |
val futureList = List(nThreads) { i -> | |
testWithThread(LockIntTest(lockInt, countTo, nThreads, i)) | |
} |
private const val N_REPEATS_SLOW = 100 | ||
private const val N_REPEATS_FAST = 30_000 | ||
|
||
class NativeMutexTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two tests look basically the same, the only two parameters that make them different are the number of repetitions (100
vs 30_000
) and the length of the sleep between adding two values (10
vs 0
milliseconds). Seems worth it to abstract this logic into one function that gets called from two tests.
val threads = mutableListOf<TestThread>() | ||
|
||
repeat(N_THREADS) { threadId -> | ||
val thread = testThread { | ||
while (counter.value < TARGET_COUNT) { | ||
// Try to acquire the lock with a timeout | ||
if (mutex.tryLock(getRandomTimeout().milliseconds)) { | ||
try { | ||
// Increment the counter if lock was acquired | ||
if (counter.value < TARGET_COUNT) { | ||
counter.incrementAndGet() | ||
} | ||
// Random sleep after increment to increase variation | ||
sleepMillis(getRandomWait()) | ||
} finally { | ||
mutex.unlock() | ||
} | ||
} | ||
|
||
// Random sleep between increment attempts to increase variation | ||
sleepMillis(getRandomWait()) | ||
} | ||
} | ||
threads.add(thread) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val threads = mutableListOf<TestThread>() | |
repeat(N_THREADS) { threadId -> | |
val thread = testThread { | |
while (counter.value < TARGET_COUNT) { | |
// Try to acquire the lock with a timeout | |
if (mutex.tryLock(getRandomTimeout().milliseconds)) { | |
try { | |
// Increment the counter if lock was acquired | |
if (counter.value < TARGET_COUNT) { | |
counter.incrementAndGet() | |
} | |
// Random sleep after increment to increase variation | |
sleepMillis(getRandomWait()) | |
} finally { | |
mutex.unlock() | |
} | |
} | |
// Random sleep between increment attempts to increase variation | |
sleepMillis(getRandomWait()) | |
} | |
} | |
threads.add(thread) | |
} | |
val threads = List(N_THREADS) { threadId -> | |
testThread { | |
while (counter.value < TARGET_COUNT) { | |
// Try to acquire the lock with a timeout | |
if (mutex.tryLock(getRandomTimeout().milliseconds)) { | |
try { | |
// Increment the counter if lock was acquired | |
if (counter.value < TARGET_COUNT) { | |
counter.incrementAndGet() | |
} | |
// Random sleep after increment to increase variation | |
sleepMillis(getRandomWait()) | |
} finally { | |
mutex.unlock() | |
} | |
} | |
// Random sleep between increment attempts to increase variation | |
sleepMillis(getRandomWait()) | |
} | |
} | |
} |