Skip to content

Commit

Permalink
Fix non-linearizability in ArrayChannel while moving an element fro…
Browse files Browse the repository at this point in the history
…m the waiting queue to the buffer
  • Loading branch information
ndkoval committed Dec 25, 2019
1 parent 16d2606 commit 1f77783
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 37 deletions.
31 changes: 17 additions & 14 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {

/**
* Returns `true` if this channel's buffer is full.
* This operation should be atomic if it is invoked by [enqueueSend].
* @suppress **This is unstable API and it is subject to change.**
*/
protected abstract val isBufferFull: Boolean
Expand Down Expand Up @@ -140,8 +141,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
// ------ SendChannel ------

public final override val isClosedForSend: Boolean get() = closedForSend != null
public final override val isFull: Boolean get() = full
private val full: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull // TODO rename to `isFull`
public override val isFull: Boolean get() = isFullImpl
protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull

public final override suspend fun send(element: E) {
// fast path -- try offer non-blocking
Expand Down Expand Up @@ -182,7 +183,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {

private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
loop@ while (true) {
if (full) {
if (isFullImpl) {
val send = SendElement(element, cont)
val enqueueResult = enqueueSend(send)
when {
Expand Down Expand Up @@ -227,7 +228,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
*/
private fun enqueueSend(send: Send): Any? {
protected open fun enqueueSend(send: Send): Any? {
if (isBufferAlwaysFull) {
queue.addLastIfPrev(send) { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
Expand Down Expand Up @@ -382,7 +383,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
while (true) {
if (select.isSelected) return
if (full) {
if (isFullImpl) {
val node = SendSelect(element, this, select, block)
val enqueueResult = enqueueSend(node)
when {
Expand Down Expand Up @@ -495,6 +496,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

/**
* Returns `true` if this channel's buffer is empty.
* This operation should be atomic if it is invoked by [enqueueReceive].
* @suppress **This is unstable API and it is subject to change.**
*/
protected abstract val isBufferEmpty: Boolean
Expand Down Expand Up @@ -542,8 +544,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

// ------ ReceiveChannel ------

public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty
public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
public override val isEmpty: Boolean get() = isEmptyImpl
protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty

public final override suspend fun receive(): E {
// fast path -- try poll non-blocking
Expand Down Expand Up @@ -580,12 +583,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

private fun enqueueReceive(receive: Receive<E>): Boolean {
val result = if (isBufferAlwaysEmpty)
queue.addLastIfPrev(receive) { it !is Send } else
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
queue.addLastIfPrev(receive) { it !is Send } else
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })

private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
if (result) onReceiveEnqueued()
return result
}

public final override suspend fun receiveOrNull(): E? {
Expand Down Expand Up @@ -718,7 +721,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
while (true) {
if (select.isSelected) return
if (isEmpty) {
if (isEmptyImpl) {
if (enqueueReceiveSelect(select, block, receiveMode)) return
} else {
val pollResult = pollSelectInternal(select)
Expand Down Expand Up @@ -1058,7 +1061,7 @@ internal class Closed<in E>(
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
}

private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
}
Expand Down
58 changes: 35 additions & 23 deletions kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,38 @@ internal open class ArrayChannel<E>(
*/
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
private var head: Int = 0
private var size = 0 // Invariant: size <= capacity
private val size = atomic(0) // Invariant: size <= capacity

protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = lock.withLock { size == 0 }
protected final override val isBufferEmpty: Boolean get() = size.value == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = lock.withLock { size == capacity }
protected final override val isBufferFull: Boolean get() = size.value == capacity

override val isFull: Boolean get() = lock.withLock { isFullImpl }
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
override val isClosedForReceive: Boolean get() = lock.withLock { super.isClosedForReceive }

// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
protected override fun offerInternal(element: E): Any {
var receive: ReceiveOrClosed<E>? = null
lock.withLock {
val size = this.size
val size = this.size.value
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size = size + 1 // update size before checking queue (!!!)
this.size.value = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
this.size = size // restore size
this.size.value = size // restore size
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
this.size = size // restore size
this.size.value = size // restore size
return@withLock
}
}
Expand All @@ -84,26 +88,26 @@ internal open class ArrayChannel<E>(
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
var receive: ReceiveOrClosed<E>? = null
lock.withLock {
val size = this.size
val size = this.size.value
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size = size + 1 // update size before checking queue (!!!)
this.size.value = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
this.size = size // restore size
this.size.value = size // restore size
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> {
this.size = size // restore size
this.size.value = size // restore size
return failure
}
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
Expand All @@ -112,7 +116,7 @@ internal open class ArrayChannel<E>(
}
// let's try to select sending this element to buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size = size // restore size
this.size.value = size // restore size
return ALREADY_SELECTED
}
ensureCapacity(size)
Expand All @@ -127,6 +131,10 @@ internal open class ArrayChannel<E>(
return receive!!.offerResult
}

override fun enqueueSend(send: Send): Any? = lock.withLock {
super.enqueueSend(send)
}

// Guarded by lock
private fun ensureCapacity(currentSize: Int) {
if (currentSize >= buffer.size) {
Expand All @@ -146,12 +154,12 @@ internal open class ArrayChannel<E>(
var resumed = false
var result: Any? = null
lock.withLock {
val size = this.size
val size = this.size.value
if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
this.size = size - 1 // update size before checking queue (!!!)
this.size.value = size - 1 // update size before checking queue (!!!)
// check for senders that were waiting on full queue
var replacement: Any? = POLL_FAILED
if (size == capacity) {
Expand All @@ -167,7 +175,7 @@ internal open class ArrayChannel<E>(
}
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
this.size.value = size // restore size
buffer[(head + size) % buffer.size] = replacement
}
head = (head + 1) % buffer.size
Expand All @@ -184,12 +192,12 @@ internal open class ArrayChannel<E>(
var success = false
var result: Any? = null
lock.withLock {
val size = this.size
val size = this.size.value
if (size == 0) return closedForSend ?: POLL_FAILED
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
this.size = size - 1 // update size before checking queue (!!!)
this.size.value = size - 1 // update size before checking queue (!!!)
// check for senders that were waiting on full queue
var replacement: Any? = POLL_FAILED
if (size == capacity) {
Expand All @@ -206,7 +214,7 @@ internal open class ArrayChannel<E>(
failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED -> {
this.size = size // restore size
this.size.value = size // restore size
buffer[head] = result // restore head
return failure
}
Expand All @@ -221,12 +229,12 @@ internal open class ArrayChannel<E>(
}
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
this.size.value = size // restore size
buffer[(head + size) % buffer.size] = replacement
} else {
// failed to poll or is already closed --> let's try to select receiving this element from buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size = size // restore size
this.size.value = size // restore size
buffer[head] = result // restore head
return ALREADY_SELECTED
}
Expand All @@ -239,16 +247,20 @@ internal open class ArrayChannel<E>(
return result
}

override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
super.enqueueReceiveInternal(receive)
}

// Note: this function is invoked when channel is already closed
override fun onCancelIdempotent(wasClosed: Boolean) {
// clear buffer first, but do not wait for it in helpers
if (wasClosed) {
lock.withLock {
repeat(size) {
repeat(size.value) {
buffer[head] = 0
head = (head + 1) % buffer.size
}
size = 0
size.value = 0
}
}
// then clean all queued senders
Expand All @@ -258,5 +270,5 @@ internal open class ArrayChannel<E>(
// ------ debug ------

override val bufferDebugString: String
get() = "(buffer:capacity=$capacity,size=$size)"
get() = "(buffer:capacity=$capacity,size=${size.value})"
}

0 comments on commit 1f77783

Please sign in to comment.