diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index f8b717365e..5ef5ad62be 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* +import kotlin.experimental.* import kotlin.js.* import kotlin.jvm.* @@ -319,8 +320,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun notifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) - list.close(LIST_CANCELLATION_PERMISSION) - notifyHandlers(list, cause) { it.onCancelling } + notifyHandlers(list, LIST_CANCELLATION_PERMISSION, cause) { it.onCancelling } // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent } @@ -352,13 +352,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } private fun NodeList.notifyCompletion(cause: Throwable?) { - close(LIST_ON_COMPLETION_PERMISSION) - notifyHandlers(this, cause) { true } + notifyHandlers(this, LIST_ON_COMPLETION_PERMISSION, cause) { true } } - private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) { + private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) { var exception: Throwable? = null - list.forEach { node -> + list.forEach(forbidBitmask = permissionBitmask) { node -> if (node is JobNode && predicate(node)) { try { node.invoke(cause) @@ -925,13 +924,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children - val child = list.nextChild() - if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) - return COMPLETING_WAITING_CHILDREN - list.close(LIST_CHILD_PERMISSION) - val anotherChild = list.nextChild() - if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate)) - return COMPLETING_WAITING_CHILDREN + if (shouldWaitForChildren(finishing, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) return finalizeFinishingState(finishing, proposedUpdate) } @@ -939,43 +932,44 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val Any?.exceptionOrNull: Throwable? get() = (this as? CompletedExceptionally)?.cause - // return false when there is no more incomplete children to wait - // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. - private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { - val handle = child.childJob.invokeOnCompletion( - invokeImmediately = false, - handler = ChildCompletion(this, state, child, proposedUpdate) - ) - if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it - val nextChild = state.list.nextChild(startAfter = child) ?: return false - return tryWaitForChild(state, nextChild, proposedUpdate) + private fun shouldWaitForChildren(state: Finishing, proposedUpdate: Any?, suggestedStart: ChildHandleNode? = null): Boolean { + val list = state.list + fun tryFindChildren(suggestedStart: ChildHandleNode?, closeList: Boolean): Boolean { + var startAfter: ChildHandleNode? = suggestedStart + while (true) { + val child = run { + list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startAfter = startAfter) { + if (it is ChildHandleNode) return@run it + } + null + } ?: break + val handle = child.childJob.invokeOnCompletion( + invokeImmediately = false, + handler = ChildCompletion(this, state, child, proposedUpdate) + ) + if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it + startAfter = child + } + return false + } + // Look for children that are currently in the list after the suggested start node. + if (tryFindChildren(suggestedStart = suggestedStart, closeList = false)) return true + // We didn't find anyone in the list after the suggested start node. Let's check the beginning now. + if (suggestedStart != null && tryFindChildren(suggestedStart = null, closeList = false)) return true + // Now we know that, at the moment this function started, there were no more children. + // We can close the list for the new children, and if we still don't find any, we can be sure there are none. + return tryFindChildren(suggestedStart = null, closeList = true) } // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { assert { this.state === state } // consistency check -- it cannot change while we are waiting for children - // figure out if we need to wait for next child - val waitChild = state.list.nextChild(startAfter = lastChild) - // try wait for next child - if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child - // no more children to wait -- stop accepting children - state.list.close(LIST_CHILD_PERMISSION) - // did any children get added? - val waitChildAgain = state.list.nextChild(startAfter = lastChild) - // try wait for next child - if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) return // waiting for next child + if (shouldWaitForChildren(state, proposedUpdate, suggestedStart = lastChild)) return // waiting for the next child // no more children, now we are sure; try to update the state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) } - private fun NodeList.nextChild(startAfter: LockFreeLinkedListNode? = null): ChildHandleNode? { - forEach(startAfter) { - if (it is ChildHandleNode) return it - } - return null - } - public final override val children: Sequence get() = sequence { when (val state = this@JobSupport.state) { is ChildHandleNode -> yield(state.childJob) @@ -1389,9 +1383,9 @@ private val EMPTY_NEW = Empty(false) private val EMPTY_ACTIVE = Empty(true) // bit mask -private const val LIST_ON_COMPLETION_PERMISSION = 1 -private const val LIST_CHILD_PERMISSION = 2 -private const val LIST_CANCELLATION_PERMISSION = 4 +private const val LIST_ON_COMPLETION_PERMISSION = 1.toByte() +private const val LIST_CHILD_PERMISSION = 2.toByte() +private const val LIST_CANCELLATION_PERMISSION = 4.toByte() private class Empty(override val isActive: Boolean) : Incomplete { override val list: NodeList? get() = null diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index ab58699032..c5544f0aa9 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -2,40 +2,216 @@ package kotlinx.coroutines.internal +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.experimental.* +import kotlin.jvm.* + /** @suppress **This is unstable API and it is subject to change.** */ -public expect open class LockFreeLinkedListNode() { +internal open class LockFreeLinkedListNode { /** - * Try putting `node` into a list. + * Try putting this node into a list. * * Returns: * - The new head of the list if the operation succeeded. * - The head of the list if someone else concurrently added this node to the list, * but no other modifications to the list were made. - * - Some garbage if the list was already edited. */ - public fun attachToList(node: LockFreeLinkedListHead): LockFreeLinkedListNode + fun attachToList(head: LockFreeLinkedListHead): LockFreeLinkedListHead { + val newAddress = head.addLastWithoutModifying(this, permissionsBitmask = 0) + assert { newAddress != null } + return if (_address.compareAndSet(null, newAddress)) { + head + } else { + _address.value!!.segment.head + } + } /** * Remove this node from the list. */ - public open fun remove() + open fun remove() { + _address.value?.let { + val segment = it.segment + segment.clearSlot(it.index) + } + } + + private val _address = atomic(null) + + val address: Address get() = _address.value!! + + internal fun trySetAddress(address: Address) = this._address.compareAndSet(null, address) } /** @suppress **This is unstable API and it is subject to change.** */ -public expect open class LockFreeLinkedListHead() { +internal open class LockFreeLinkedListHead { + private val head = LockFreeLinkedListSegment( + id = 0, + prev = null, + pointers = 2, + head = this, + ) + private val tail = atomic(head) + private val nextElement = atomic(0L) + + /** + * The list of bits that are forbidden from entering the list. + * + * TODO: we can store this in the extra bits in [head], there's enough space for that there, and it's never removed. + */ + private val forbiddenBits: AtomicInt = atomic(0) + /** * Iterates over all non-removed elements in this list, skipping every node until (and including) [startAfter]. */ - public inline fun forEach(startAfter: LockFreeLinkedListNode? = null, block: (LockFreeLinkedListNode) -> Unit) + inline fun forEach( + forbidBitmask: Byte = 0, + startAfter: LockFreeLinkedListNode? = null, + block: (LockFreeLinkedListNode) -> Unit + ) { + forbiddenBits.update { it or forbidBitmask.toInt() } + val startAddress = startAfter?.address + var segment: LockFreeLinkedListSegment? = startAddress?.segment ?: head + var startIndex: Int = startAddress?.index?.let { it + 1 } ?: 0 + while (segment != null) { + segment.forEach(forbidBitmask = forbidBitmask, startIndex = startIndex, block = block) + segment = segment.next + startIndex = 0 + } + } /** - * Closes the list for anything that requests the permission [forbiddenElementsBit]. - * Only a single permission can be forbidden at a time, but this isn't checked. + * Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list, + * and then sets the [node]'s address to the new address. */ - public fun close(forbiddenElementsBit: Int) + fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Boolean { + val address = addLastWithoutModifying(node, permissionsBitmask) ?: return false + val success = node.trySetAddress(address) + assert { success } + return true + } /** * Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list. + * As opposed to [addLast], doesn't modify the [node]'s address. */ - public fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Int): Boolean + fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Address? { + /** First, avoid modifying the list at all if it was already closed for elements like ours. */ + if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null + /** Obtain the place from which the desired segment will certainly be reachable. */ + val curTail = tail.value + /** Allocate a place for our element. */ + val index = nextElement.getAndIncrement() + /** Find or create a segment where the node can be stored. */ + val createNewSegment = ::createSegment // can't just pass the function, as the compiler crashes (KT-67332) + val segmentId = index / SEGMENT_SIZE + val segment = tail.findSegmentAndMoveForward(id = segmentId, curTail, createNewSegment).segment + assert { segment.id == segmentId } + val indexInSegment = (index % SEGMENT_SIZE).toInt() + /** Double-check that it's still not forbidden for the node to enter the list. */ + if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null + /** Now we know that the list was still not closed at some point *even after the segment* was created. + * Because [forbiddenBits] is set before [forEach] traverses the list, this means that [forEach] is guaranteed + * to observe the new segment and either break the cell where [node] wants to arrive or process the [node]. + * In any case, we have linearizable behavior. */ + return if (segment.tryAdd(node, permissionsBitmask = permissionsBitmask, indexInSegment = indexInSegment)) { + Address(segment, indexInSegment) + } else { + null + } + } } + +internal open class LockFreeLinkedListSegment( + id: Long, + prev: LockFreeLinkedListSegment?, + pointers: Int, + /** Used only during promoting of a single node to a list to ensure wait-freedom of the promotion operation. + * Without this, promotion can't be implemented without a (possibly bounded) spin loop: once the node is committed + * to be part of some list, the other threads can't do anything until that one thread sets the state to be the + * head of the list. */ + @JvmField val head: LockFreeLinkedListHead, +) : Segment(id = id, prev = prev, pointers = pointers) +{ + /** Each cell is a [LockFreeLinkedListNode], a [BrokenForSomeElements], or `null`. */ + private val cells = atomicArrayOfNulls(SEGMENT_SIZE) + + override val numberOfSlots: Int get() = SEGMENT_SIZE + + fun clearSlot(index: Int) { + cells[index].value = null + onSlotCleaned() + } + + inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode) -> Unit) { + for (i in startIndex until SEGMENT_SIZE) { + val node = breakCellOrGetValue(forbidBitmask, i) + if (node != null) block(node) + } + } + + private fun breakCellOrGetValue(forbidBitmask: Byte, index: Int): LockFreeLinkedListNode? { + while (true) { + val value = cells[index].value + if (value is BrokenForSomeElements?) { + val newForbiddenBits = value.forbiddenBits or forbidBitmask + if (newForbiddenBits == value.forbiddenBits + || cells[index].compareAndSet(value, BrokenForSomeElements.fromBitmask(newForbiddenBits))) + return null + } else { + return value as LockFreeLinkedListNode + } + } + } + + /** + * Adds the [node] to the array of cells if the slot wasn't broken. + */ + fun tryAdd(node: LockFreeLinkedListNode, permissionsBitmask: Byte, indexInSegment: Int): Boolean { + if (cells[indexInSegment].compareAndSet(null, node)) return true + cells[indexInSegment].loop { value -> + // This means that some elements are forbidden from entering the list. + value as BrokenForSomeElements + // Are *we* forbidden from entering the list? + if (value.forbiddenBits and permissionsBitmask != 0.toByte()) { + cells[indexInSegment].value = BrokenForSomeElements.FULLY_BROKEN + onSlotCleaned() + return false + } + // We aren't forbidden. Let's try entering it. + if (cells[indexInSegment].compareAndSet(value, node)) return true + } + } + + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { + throw UnsupportedOperationException("Cancellation is not supported on LockFreeLinkedList") + } +} + +internal class Address(@JvmField val segment: LockFreeLinkedListSegment, @JvmField val index: Int) + +private fun createSegment(id: Long, prev: LockFreeLinkedListSegment): LockFreeLinkedListSegment = + LockFreeLinkedListSegment( + id = id, + prev = prev, + pointers = 0, + head = prev.head + ) + +private const val SEGMENT_SIZE = 8 + +@JvmInline +private value class BrokenForSomeElements private constructor(val forbiddenBits: Byte) { + companion object { + fun fromBitmask(forbiddenBits: Byte): BrokenForSomeElements? = when (forbiddenBits) { + 0.toByte() -> null // no one is forbidden + else -> BrokenForSomeElements(forbiddenBits) + } + + val FULLY_BROKEN = BrokenForSomeElements(255.toByte()) + } +} + +private val BrokenForSomeElements?.forbiddenBits get() = this?.forbiddenBits ?: 0 diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt deleted file mode 100644 index e46853feac..0000000000 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ /dev/null @@ -1,294 +0,0 @@ -@file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE") - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import kotlin.jvm.* - -private typealias Node = LockFreeLinkedListNode - -/** - * Doubly-linked concurrent list node with remove support. - * Based on paper - * ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf) - * by Sundell and Tsigas with considerable changes. - * - * The core idea of the algorithm is to maintain a doubly-linked list with an ever-present sentinel node (it is - * never removed) that serves both as a list head and tail and to linearize all operations (both insert and remove) on - * the update of the next pointer. Removed nodes have their next pointer marked with [Removed] class. - * - * Important notes: - * - There are no operations to add items to left side of the list, only to the end (right side), because we cannot - * efficiently linearize them with atomic multi-step head-removal operations. In short, - * support for [describeRemoveFirst] operation precludes ability to add items at the beginning. - * - Previous pointers are not marked for removal. We don't support linearizable backwards traversal. - * - Remove-helping logic is simplified and consolidated in [correctPrev] method. - * - * @suppress **This is unstable API and it is subject to change.** - */ -@Suppress("LeakingThis") -@InternalCoroutinesApi -public actual open class LockFreeLinkedListNode { - private val _next = atomic(this) // Node | Removed | OpDescriptor - private val _prev = atomic(this) // Node to the left (cannot be marked as removed) - private val _removedRef = atomic(null) // lazily cached removed ref to this - - private fun removed(): Removed = - _removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) } - - public open val isRemoved: Boolean get() = next is Removed - - // LINEARIZABLE. Returns Node | Removed - public val next: Any get() = _next.value - - // LINEARIZABLE. Returns next non-removed Node - val nextNode: Node get() = - next.let { (it as? Removed)?.ref ?: it as Node } // unwraps the `next` node - - // LINEARIZABLE WHEN THIS NODE IS NOT REMOVED: - // Returns prev non-removed Node, makes sure prev is correct (prev.next === this) - // NOTE: if this node is removed, then returns non-removed previous node without applying - // prev.next correction, which does not provide linearizable backwards iteration, but can be used to - // resume forward iteration when current node was removed. - public val prevNode: Node - get() = correctPrev() ?: findPrevNonRemoved(_prev.value) - - private tailrec fun findPrevNonRemoved(current: Node): Node { - if (!current.isRemoved) return current - return findPrevNonRemoved(current._prev.value) - } - - // ------ addOneIfEmpty ------ - - public actual fun attachToList(node: LockFreeLinkedListHead): Node { - (node as Node)._prev.lazySet(this) - (node as Node)._next.lazySet(this) - while (true) { - val next = next - if (next !== this) return nextNode // this is not an empty list! - if (_next.compareAndSet(this, node)) { - // added successfully (linearized add) -- fixup the list - (node as Node).finishAdd(this) - return node - } - } - } - - // ------ addLastXXX ------ - - /** - * Adds last item to this list. Returns `false` if the list is closed. - */ - fun addLast(node: Node, permissionsBitmask: Int): Boolean { - while (true) { // lock-free loop on prev.next - val currentPrev = prevNode - return when { - currentPrev is ListClosed -> - currentPrev.forbiddenElementsBitmask and permissionsBitmask == 0 && - currentPrev.addLast(node, permissionsBitmask) - currentPrev.addNext(node, this) -> true - else -> continue - } - } - } - - /** - * Given: - * ``` - * +-----------------------+ - * this | node V next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | | P | N | | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ | - * +-----------------------+ - * ``` - * Produces: - * ``` - * this node next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | ==> | P | N | --> | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ | ^ | - * +---------+ +---------+ - * ``` - * Where `==>` denotes linearization point. - * Returns `false` if `next` was not following `this` node. - */ - @PublishedApi - internal fun addNext(node: Node, next: Node): Boolean { - node._prev.lazySet(this) - node._next.lazySet(next) - if (!_next.compareAndSet(next, node)) return false - // added successfully (linearized add) -- fixup the list - node.finishAdd(next) - return true - } - - // ------ removeXXX ------ - - /** - * Removes this node from the list. Returns `true` when removed successfully, or `false` if the node was already - * removed or if it was not added to any list in the first place. - * - * **Note**: Invocation of this operation does not guarantee that remove was actually complete if result was `false`. - * In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed". - */ - public actual open fun remove() { - removeOrNext() - } - - // returns null if removed successfully or next node if this node is already removed - @PublishedApi - internal fun removeOrNext(): Node? { - while (true) { // lock-free loop on next - val next = this.next - if (next is Removed) return next.ref // was already removed -- don't try to help (original thread will take care) - if (next === this) return next // was not even added - val removed = (next as Node).removed() - if (_next.compareAndSet(next, removed)) { - // was removed successfully (linearized remove) -- fixup the list - next.correctPrev() - return null - } - } - } - - // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation - // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) - - - // ------ other helpers ------ - - /** - * Given: - * ``` - * - * prev this next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | --> | P | N | --> | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ ^ | | - * | +---------+ | - * +-------------------------+ - * ``` - * Produces: - * ``` - * prev this next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | --> | P | N | --> | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ | ^ | - * +---------+ +---------+ - * ``` - */ - private fun finishAdd(next: Node) { - next._prev.loop { nextPrev -> - if (this.next !== next) return // this or next was removed or another node added, remover/adder fixes up links - if (next._prev.compareAndSet(nextPrev, this)) { - // This newly added node could have been removed, and the above CAS would have added it physically again. - // Let us double-check for this situation and correct if needed - if (isRemoved) next.correctPrev() - return - } - } - } - - /** - * Returns the corrected value of the previous node while also correcting the `prev` pointer - * (so that `this.prev.next === this`) and helps complete node removals to the left ot this node. - * - * It returns `null` in two special cases: - * - * - When this node is removed. In this case there is no need to waste time on corrections, because - * remover of this node will ultimately call [correctPrev] on the next node and that will fix all - * the links from this node, too. - */ - private tailrec fun correctPrev(): Node? { - val oldPrev = _prev.value - var prev: Node = oldPrev - var last: Node? = null // will be set so that last.next === prev - while (true) { // move the left until first non-removed node - val prevNext: Any = prev._next.value - when { - // fast path to find quickly find prev node when everything is properly linked - prevNext === this -> { - if (oldPrev === prev) return prev // nothing to update -- all is fine, prev found - // otherwise need to update prev - if (!this._prev.compareAndSet(oldPrev, prev)) { - // Note: retry from scratch on failure to update prev - return correctPrev() - } - return prev // return the correct prev - } - // slow path when we need to help remove operations - this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time - prevNext is Removed -> { - if (last !== null) { - // newly added (prev) node is already removed, correct last.next around it - if (!last._next.compareAndSet(prev, prevNext.ref)) { - return correctPrev() // retry from scratch on failure to update next - } - prev = last - last = null - } else { - prev = prev._prev.value - } - } - else -> { // prevNext is a regular node, but not this -- help delete - last = prev - prev = prevNext as Node - } - } - } - } - - internal fun validateNode(prev: Node, next: Node) { - assert { prev === this._prev.value } - assert { next === this._next.value } - } - - override fun toString(): String = "${this::classSimpleName}@${this.hexAddress}" -} - -private class Removed(@JvmField val ref: Node) { - override fun toString(): String = "Removed[$ref]" -} - -/** - * Head (sentinel) item of the linked list that is never removed. - * - * @suppress **This is unstable API and it is subject to change.** - */ -public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { - /** - * Iterates over all elements in this list of a specified type. - */ - public actual inline fun forEach(startAfter: LockFreeLinkedListNode?, block: (Node) -> Unit) { - var cur: Node = startAfter ?: this - while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head) - cur = cur.nextNode - while (cur !== this) { - if (!cur.isRemoved) { - block(cur) - } - cur = cur.nextNode - } - } - - // just a defensive programming -- makes sure that list head sentinel is never removed - public final override fun remove(): Nothing = error("head cannot be removed") - - // optimization: because head is never removed, we don't have to read _next.value to check these: - override val isRemoved: Boolean get() = false - - /** - * Forbids adding new items to this list. - */ - public actual fun close(forbiddenElementsBit: Int) { - addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) - } -} - -private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt deleted file mode 100644 index 1e1b829d33..0000000000 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ /dev/null @@ -1,75 +0,0 @@ -@file:Suppress("unused", "NO_EXPLICIT_RETURN_TYPE_IN_API_MODE", "NO_EXPLICIT_VISIBILITY_IN_API_MODE") - -package kotlinx.coroutines.internal - -import kotlinx.coroutines.* - -private typealias Node = LockFreeLinkedListNode - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual open class LockFreeLinkedListNode { - @PublishedApi internal var _next = this - @PublishedApi internal var _prev = this - @PublishedApi internal var _removed: Boolean = false - - fun addLast(node: Node, permissionsBitmask: Int): Boolean = when (val prev = this._prev) { - is ListClosed -> - prev.forbiddenElementsBitmask and permissionsBitmask == 0 && prev.addLast(node, permissionsBitmask) - else -> { - node._next = this - node._prev = prev - prev._next = node - this._prev = node - true - } - } - - /* - * Remove that is invoked as a virtual function with a - * potentially augmented behaviour. - * I.g. `LockFreeLinkedListHead` throws, while `SendElementWithUndeliveredHandler` - * invokes handler on remove - */ - public actual open fun remove() { - if (_removed) return - val prev = this._prev - val next = this._next - prev._next = next - next._prev = prev - _removed = true - } - - public actual fun attachToList(node: LockFreeLinkedListHead): Node { - if (_next !== this) return _next - val success = addLast(node, Int.MIN_VALUE) - assert { success } - return node - } -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual open class LockFreeLinkedListHead : Node() { - /** - * Iterates over all elements in this list of a specified type. - */ - public actual inline fun forEach(startAfter: LockFreeLinkedListNode?, block: (Node) -> Unit) { - var cur: Node = startAfter ?: this - while (cur._removed) cur = cur._prev // rollback to prev non-removed (or list head) - cur = cur._next - while (cur !== this) { - if (!cur._removed) { - block(cur) - } - cur = cur._next - } - } - - public actual fun close(forbiddenElementsBit: Int) { - addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) - } - - // just a defensive programming -- makes sure that list head sentinel is never removed - public final override fun remove(): Nothing = throw UnsupportedOperationException() -} - -private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt deleted file mode 100644 index b8550ef9da..0000000000 --- a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt +++ /dev/null @@ -1,41 +0,0 @@ -package kotlinx.coroutines.internal - -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class LinkedListTest { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - - @Test - fun testSimpleAddLastRemove() { - val list = LockFreeLinkedListHead() - assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1, 2, 3, 4) - n1.remove() - assertContents(list, 2, 3, 4) - n3.remove() - assertContents(list, 2, 4) - n4.remove() - assertContents(list, 2) - n2.remove() - assertContents(list) - } - - private fun assertContents(list: LockFreeLinkedListHead, vararg expected: Int) { - val n = expected.size - val actual = IntArray(n) - var index = 0 - list.forEach { if (it is IntNode) actual[index++] = it.i } - assertEquals(n, index) - for (i in 0 until n) assertEquals(expected[i], actual[i], "item i") - } -} diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt deleted file mode 100644 index 95be1cbe62..0000000000 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt +++ /dev/null @@ -1,81 +0,0 @@ -package kotlinx.coroutines.internal - -import kotlinx.coroutines.testing.TestBase -import org.junit.Test -import java.util.* -import java.util.concurrent.atomic.AtomicInteger -import kotlin.concurrent.thread - -/** - * This stress test has 2 threads adding on one side on list, 2 more threads adding on the other, - * and 6 threads iterating and concurrently removing items. The resulting list that is being - * stressed is long. - */ -class LockFreeLinkedListLongStressTest : TestBase() { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - val list = LockFreeLinkedListHead() - - val threads = mutableListOf() - private val nAdded = 10_000_000 // should not stress more, because that'll run out of memory - private val nAddThreads = 4 // must be power of 2 (!!!) - private val nRemoveThreads = 6 - private val removeProbability = 0.2 - private val workingAdders = AtomicInteger(nAddThreads) - - private fun shallRemove(i: Int) = i and 63 != 42 - - @Test - fun testStress() { - println("--- LockFreeLinkedListLongStressTest") - for (j in 0 until nAddThreads) - threads += thread(start = false, name = "adder-$j") { - for (i in j until nAdded step nAddThreads) { - list.addLast(IntNode(i), Int.MAX_VALUE) - } - println("${Thread.currentThread().name} completed") - workingAdders.decrementAndGet() - } - for (j in 0 until nRemoveThreads) - threads += thread(start = false, name = "remover-$j") { - val rnd = Random() - do { - val lastTurn = workingAdders.get() == 0 - list.forEach { node -> - if (node is IntNode && shallRemove(node.i) && (lastTurn || rnd.nextDouble() < removeProbability)) - node.remove() - } - } while (!lastTurn) - println("${Thread.currentThread().name} completed") - } - println("Starting ${threads.size} threads") - for (thread in threads) - thread.start() - println("Joining threads") - for (thread in threads) - thread.join() - // verification - println("Verify result") - list.validate() - val expected = iterator { - for (i in 0 until nAdded) - if (!shallRemove(i)) - yield(i) - } - list.forEach { node -> - require(node !is IntNode || node.i == expected.next()) - } - require(!expected.hasNext()) - } - - private fun LockFreeLinkedListHead.validate() { - var prev: LockFreeLinkedListNode = this - var cur: LockFreeLinkedListNode = next as LockFreeLinkedListNode - while (cur != this) { - val next = cur.nextNode - cur.validateNode(prev, next) - prev = cur - cur = next - } - validateNode(prev, next as LockFreeLinkedListNode) - } -}