Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Condition Variables #2531

Open
paulo-raca opened this issue Feb 9, 2021 · 3 comments
Open

Proposal: Condition Variables #2531

paulo-raca opened this issue Feb 9, 2021 · 3 comments

Comments

@paulo-raca
Copy link

Condition Variables are an useful concurrency primitive, which I have been missing since starting with Kotlin Coroutines

Although it has not been optimised, I have written a very simple implementation and tests here

@qwwdfsad
Copy link
Member

qwwdfsad commented Mar 23, 2021

Could you please elaborate on the use-case of such primitive in coroutines? E.g. what domain-specific problem does it solve that cannot be solved otherwise? In general, for any kind of notifications channels or flows are preferred and can be used instead.

Apart from that, coroutines are phantom, so one cannot ensure whether the current coroutine holds the lock. E.g. in your implementation, it's possible that one thread/coroutine acquires the lock, while the other awaits on its condition without actually holding the lock.
And this won't lead to an exception, but rather to an inconsistent state (-> heisenbugs) that should've been guarded by the mutex.

Providing primitive that may be inconsistent is a time-bomb waiting to explode and we'd very much like to avoid it

@qwwdfsad qwwdfsad closed this as completed Apr 6, 2021
@paulo-raca
Copy link
Author

Hello,
Sorry, I missed you original reply.

Could you please elaborate on the use-case of such primitive in coroutines? E.g. what domain-specific problem does it solve that cannot be solved otherwise? In general, for any kind of notifications channels or flows are preferred and can be used instead.

There is no use-case that cannot be solved otherwise, since is it possible to implement Conditions from other primitives 😅

However it is perfectly suited anywhere you need to wait until <condition>.

A few examples on how I'm using on my current project:

Waiting for Debugging session

My current project involves network traffic, and I integrated it with a Wireshark capture plugin.
When debugging, I often add a waitForWireshark() in the code -- Similiar to Debug.waitForDebugger in an Android app.

This is a good/simple use-case. The (much-simplified) code looks like this:

val wiresharkSessions = HashSet<WiresharkSession>()
val mutex = Mutex()
val condition = mutex.newCondition()

suspend fun onWiresharkSessionAdded(session: WiresharkSession) {
    mutex.withLock {
        wiresharkSessions.add(session)
        condition.signal()
    }
}

suspend fun waitForWireshark() {
    mutex.withLock {
        condition.awaitUntil {  !wiresharkSessions.isEmpty()  }
    }
}

Sparse data buffer

My project also deals with data transfers, and chunks of data can arrive out of order (similar to bittorrent).

However, when consuming data, the reader must wait until it is available

class SparseDataBuffer {
    val mutex = Mutex()
    val condition = mutex.newCondition()

    val buffer = ByteBuffer(N)
    val validBufferPositions = RangeSet(Long)
    
    suspend fun writeChunk(data: ByteBuffer, pos: Long) {
        mutex.withLock {
            // Copy data to local buffer
            val n = data.remaining()
            buffer.position(pos).limit(pos + n).put(data)
            
            // Mark data as available and signal any coroutine awaiting for it
            validBufferPositions.add(Range.closedOpen(pos, pos + n))
            condition.signalAll()
        }
    }
    
    suspend fun readChunk(dst: ByteBuffer, pos: Long) {
        mutex.withLock {
            val n = dst.remaining()
            // Await until the desired chunk is available
            condition.awaitUntil { validBufferPositions.contains(Range.closedOpen(pos, pos + n)) }
            // Copy data into destination buffer
            buffer.position(pos).limit(n)
            dst.put(buffer)
        }
    }
}

Handling timeouts

A more complicated example: my project also keeps track of operations that can timeout, and specific events may extended the timeout (e.g., heartbeat messages)

A simple solution which I used initially only relies on launch and delay, and looks like this:

class Operation {
    var timeoutJob: Job? = null

    suspend fun resetOperationTimeout(newTimeout: Duration) {
        timeoutJob?.cancel()
        timeoutJob = launch {
            delay(timeout)
            operationTimedOut()
        }
    }

    suspend fun timedOut() {
        // do something
    }

However, while I only have a small-ish number of operations, I call resetOperationTimeout a lot. While coroutines are very efficient, creating it only to cancel a few microseconds later, adds up to a bottleneck.

My solution was to write my own scheduler, which stores all timeouts in a Heap and uses conditions to await until the next timeout/queue update. Timeout updates are very efficient (O(log(N)), cause no allocations, and only rarely wakes the scheduler

class Timer {
    val mutex = Mutex()
    val condition = mutex.newCondition()
    val activeOperations = PriorityQueue<Operation>()  // Operations ordered by target time


    val job = launch {
        while (true) {
            takeOp().timedOut()
        }
    }
    
    suspend fun takeOp(): Operation {
        mutex.withLock {
            while (true) {  // Wait for the first task to timeout
                val op = activeOperations.peek()
                if (op == null) {
                    condition.await()  // Await until there is something in the queue
                } else {
                    val waitTime = task.targetTime - now()
                    if (waitTime > 0) {
                        condition.await(waitTime)  // Await until the remaining time has elapsed -OR- something was added to the head of the queue
                    } else {
                        // op has timed out and is ready to execute
                        return activeOperations.remove()
                    }
                }
            }            
        }
    }

    class Operation : Comparable<Operation> {
        var targetTime: Timestamp?

        suspend fun resetOperationTimeout(newTimeout: Duration) {
            mutex.withLock {
                // Updates this element's timeout and it's position in the queue
                // Note: My solution actually keeps track of this item's position in the heap
                // and updates it in `O(log(N))`, however the added complexity doesn't matter in this example.
                activeOperations.remove(this)
                this.targetTime = now() + newTimeout
                activeOperations.add(this)

                // Wake up the executor coroutine, but only if we are now the first element.
                if (activeOperations.peek() == this) {
                    condition.signal()
                }
            }
    
        suspend fun timedOut() {
            // do something
        }
        
        fun compareTo(other: Operation): Int {
            // compare targetTime
        }
    }
}

(Again, both snippets are much simplified)

Apart from that, coroutines are phantom, so one cannot ensure whether the current coroutine holds the lock. E.g. in your implementation, it's possible that one thread/coroutine acquires the lock, while the other awaits on its condition without actually holding the lock.

I'm not sure what you mean here.

Just like you can call mutex.lock() in a coroutine and mutex.unlock in a different one, which is weird yet valid, you could use a condition in a separate coroutine from where the lock was acquired.

The contract is that the mutex should be locked when await() is called, it might be be unlocked/relocked a few times before returning, and will be locked on return -- It doesn't matter which coroutine it all happens on.

Having another coroutine acquire the mutex while condition.await() executes is expected: This allows the state to be updated, possible satisfying the condition and resuming first coroutine (as soon as the mutex is released)

@qwwdfsad qwwdfsad added the design label Apr 8, 2021
@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 8, 2021

Thanks for a detailed explanation!

Indeed these usages can be abstracted away, even in more idiomatic way (but it's a matter of style and I don't encourage you to rewrite it of course).
It seems like you are using a condition as a notification mechanism. For that, Flow (whether it's StateFlow, regular flow or channel-based) is a more preferrable way. E.g., it does not creates a bottleneck during mutual exclusion where it is not really necessary.

Just like you can call mutex.lock() in a coroutine and mutex.unlock in a different one, which is weird yet valid, you could use a condition in a separate coroutine from where the lock was acquired.

Indeed, though thread-obliviousness is a much less harmful property here.

I'll keep this issue open for a while in case there are more compelling use-cases, but otherwise, benefits of conditional variable don't seem to outweigh its cons

@qwwdfsad qwwdfsad reopened this Apr 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants