From 03a42b9e87679cd9a2b2662790617141cf3f7fd6 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 11 Apr 2024 16:36:10 +0200 Subject: [PATCH] Reduce the memory consumption slightly In exchange, now, removal is linear in the size of number of registered handlers instead of being amortized constant. --- .../common/src/JobSupport.kt | 55 +++++++++----- .../src/internal/LockFreeLinkedList.common.kt | 76 ++++++------------- .../jvm/test/MemoryFootprintTest.kt | 8 +- 3 files changed, 65 insertions(+), 74 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5ef5ad62be..2c89401aaa 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -357,7 +357,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) { var exception: Throwable? = null - list.forEach(forbidBitmask = permissionBitmask) { node -> + list.forEach(forbidBitmask = permissionBitmask) { node, _, _ -> if (node is JobNode && predicate(node)) { try { node.invoke(cause) @@ -558,7 +558,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun promoteSingleToNodeList(state: JobNode) { // try to promote it to list (SINGLE+ state) - _state.compareAndSet(state, state.attachToList(NodeList())) + val list = NodeList() + val address = list.addLastWithoutModifying(state, permissionsBitmask = 0) + assert { address == 0L } + _state.compareAndSet(state, list) } public final override suspend fun join() { @@ -621,7 +624,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } is Incomplete -> { // may have a list of completion handlers // remove node from the list if there is a list - if (state.list != null) node.remove() + state.list?.remove(node) return } else -> return // it is complete and does not have any completion handlers @@ -932,39 +935,52 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val Any?.exceptionOrNull: Throwable? get() = (this as? CompletedExceptionally)?.cause - private fun shouldWaitForChildren(state: Finishing, proposedUpdate: Any?, suggestedStart: ChildHandleNode? = null): Boolean { + private fun shouldWaitForChildren( + state: Finishing, + proposedUpdate: Any?, + suggestedStartSegment: LockFreeLinkedListSegment? = null, + suggestedStartIndex: Int? = null + ): Boolean { val list = state.list - fun tryFindChildren(suggestedStart: ChildHandleNode?, closeList: Boolean): Boolean { - var startAfter: ChildHandleNode? = suggestedStart + fun tryFindChildren( + closeList: Boolean, + suggestedStartSegment: LockFreeLinkedListSegment? = null, + suggestedStartIndex: Int? = null, + ): Boolean { + var startSegment = suggestedStartSegment + var startIndex = suggestedStartIndex while (true) { val child = run { - list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startAfter = startAfter) { - if (it is ChildHandleNode) return@run it + list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startInSegment = startSegment, startAfterIndex = startIndex) { node, segment, indexInSegment -> + if (node is ChildHandleNode) { + startSegment = segment + startIndex = indexInSegment + return@run node + } } null } ?: break val handle = child.childJob.invokeOnCompletion( invokeImmediately = false, - handler = ChildCompletion(this, state, child, proposedUpdate) + handler = ChildCompletion(this, state, startSegment!!, startIndex!!, proposedUpdate) ) if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it - startAfter = child } return false } // Look for children that are currently in the list after the suggested start node. - if (tryFindChildren(suggestedStart = suggestedStart, closeList = false)) return true + if (tryFindChildren(suggestedStartSegment = suggestedStartSegment, suggestedStartIndex = suggestedStartIndex, closeList = false)) return true // We didn't find anyone in the list after the suggested start node. Let's check the beginning now. - if (suggestedStart != null && tryFindChildren(suggestedStart = null, closeList = false)) return true + if (suggestedStartSegment != null && tryFindChildren(closeList = false)) return true // Now we know that, at the moment this function started, there were no more children. // We can close the list for the new children, and if we still don't find any, we can be sure there are none. - return tryFindChildren(suggestedStart = null, closeList = true) + return tryFindChildren(closeList = true) } // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. - private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { + private fun continueCompleting(state: Finishing, proposedUpdate: Any?, lastSegment: LockFreeLinkedListSegment, lastIndexInSegment: Int) { assert { this.state === state } // consistency check -- it cannot change while we are waiting for children - if (shouldWaitForChildren(state, proposedUpdate, suggestedStart = lastChild)) return // waiting for the next child + if (shouldWaitForChildren(state, proposedUpdate, suggestedStartSegment = lastSegment, suggestedStartIndex = lastIndexInSegment)) return // waiting for the next child // no more children, now we are sure; try to update the state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) @@ -974,7 +990,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren when (val state = this@JobSupport.state) { is ChildHandleNode -> yield(state.childJob) is Incomplete -> state.list?.let { list -> - list.forEach { if (it is ChildHandleNode) yield(it.childJob) } + list.forEach { it, _, _ -> if (it is ChildHandleNode) yield(it.childJob) } } } } @@ -1232,11 +1248,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private class ChildCompletion( private val parent: JobSupport, private val state: Finishing, - private val child: ChildHandleNode, + private val segment: LockFreeLinkedListSegment, + private val indexInSegment: Int, private val proposedUpdate: Any? ) : JobNode() { override fun invoke(cause: Throwable?) { - parent.continueCompleting(state, child, proposedUpdate) + parent.continueCompleting(state, proposedUpdate, lastSegment = segment, lastIndexInSegment = indexInSegment) } override val onCancelling: Boolean get() = false } @@ -1477,7 +1494,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete { append(state) append("}[") var first = true - this@NodeList.forEach { node -> + this@NodeList.forEach { node, _, _ -> if (node is JobNode) { if (first) first = false else append(", ") append(node) diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 9b4adbf3af..3172d39348 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -11,38 +11,10 @@ import kotlin.jvm.* /** @suppress **This is unstable API and it is subject to change.** */ internal open class LockFreeLinkedListNode { /** - * Try putting this node into a list. - * - * Returns: - * - The new head of the list if the operation succeeded. - * - The head of the list if someone else concurrently added this node to the list, - * but no other modifications to the list were made. - */ - fun attachToList(head: LockFreeLinkedListHead): LockFreeLinkedListHead { - val newAddress = head.addLastWithoutModifying(this, permissionsBitmask = 0) - assert { newAddress != null } - return if (_address.compareAndSet(null, newAddress)) { - head - } else { - _address.value!!.segment.head - } - } - - /** - * Remove this node from the list. + * The default value of 0 means that either the node is not in any list or [LockFreeLinkedListHead.addLast] wasn't + * yet called on it. */ - open fun remove() { - _address.value?.let { - val segment = it.segment - segment.clearSlot(it.index) - } - } - - private val _address = atomic(null) - - val address: Address get() = _address.value!! - - internal fun trySetAddress(address: Address) = this._address.compareAndSet(null, address) + var address: Long = 0 } /** @suppress **This is unstable API and it is subject to change.** */ @@ -66,13 +38,13 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment( */ inline fun forEach( forbidBitmask: Byte = 0, - startAfter: LockFreeLinkedListNode? = null, - block: (LockFreeLinkedListNode) -> Unit + startInSegment: LockFreeLinkedListSegment? = null, + startAfterIndex: Int? = null, + block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit ) { forbiddenBits.update { it or forbidBitmask.toInt() } - val startAddress = startAfter?.address - var segment: LockFreeLinkedListSegment? = startAddress?.segment ?: head - var startIndex: Int = startAddress?.index?.let { it + 1 } ?: 0 + var segment: LockFreeLinkedListSegment? = startInSegment ?: this + var startIndex: Int = startAfterIndex?.let { it + 1 } ?: 0 while (segment != null) { segment.forEach(forbidBitmask = forbidBitmask, startIndex = startIndex, block = block) segment = segment.next @@ -85,9 +57,7 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment( * and then sets the [node]'s address to the new address. */ fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Boolean { - val address = addLastWithoutModifying(node, permissionsBitmask) ?: return false - val success = node.trySetAddress(address) - assert { success } + node.address = addLastWithoutModifying(node, permissionsBitmask) ?: return false return true } @@ -95,7 +65,7 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment( * Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list. * As opposed to [addLast], doesn't modify the [node]'s address. */ - fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Address? { + fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Long? { /** First, avoid modifying the list at all if it was already closed for elements like ours. */ if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null /** Obtain the place from which the desired segment will certainly be reachable. */ @@ -115,13 +85,21 @@ internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment( * to observe the new segment and either break the cell where [node] wants to arrive or process the [node]. * In any case, we have linearizable behavior. */ return if (segment.tryAdd(node, permissionsBitmask = permissionsBitmask, indexInSegment = indexInSegment)) { - Address(segment, indexInSegment) + index } else { null } } - override val head: LockFreeLinkedListHead get() = this + fun remove(node: LockFreeLinkedListNode) { + val address = node.address + val id = address / SEGMENT_SIZE + var segment: LockFreeLinkedListSegment = this + while (segment.id < id) { segment = segment.next!! } + if (segment.id == id) { + segment.clearSlot((address % SEGMENT_SIZE).toInt(), node) + } + } } internal open class LockFreeLinkedListSegment( @@ -135,15 +113,15 @@ internal open class LockFreeLinkedListSegment( override val numberOfSlots: Int get() = SEGMENT_SIZE - fun clearSlot(index: Int) { - cells[index].value = null - onSlotCleaned() + fun clearSlot(index: Int, node: LockFreeLinkedListNode) { + if (cells[index].compareAndSet(node, null)) + onSlotCleaned() } - inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode) -> Unit) { + inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit) { for (i in startIndex until SEGMENT_SIZE) { val node = breakCellOrGetValue(forbidBitmask, i) - if (node != null) block(node) + if (node != null) block(node, this, i) } } @@ -183,12 +161,8 @@ internal open class LockFreeLinkedListSegment( override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { throw UnsupportedOperationException("Cancellation is not supported on LockFreeLinkedList") } - - open val head: LockFreeLinkedListHead get() = prev!!.head } -internal class Address(@JvmField val segment: LockFreeLinkedListSegment, @JvmField val index: Int) - private fun createSegment(id: Long, prev: LockFreeLinkedListSegment): LockFreeLinkedListSegment = LockFreeLinkedListSegment( id = id, diff --git a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt index 97cada8af9..468d04c5a9 100644 --- a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt @@ -18,10 +18,10 @@ class MemoryFootprintTest : TestBase(true) { @Test fun testJobSize() { - assertTotalSize(jobWithChildren(1), 112) - assertTotalSize(jobWithChildren(2), 336) // originally: 192 - assertTotalSize(jobWithChildren(3), 416) // originally: 248 - assertTotalSize(jobWithChildren(4), 496) // originally: 304 + assertTotalSize(jobWithChildren(1), 120) // originally: 112 + assertTotalSize(jobWithChildren(2), 304) // originally: 192 + assertTotalSize(jobWithChildren(3), 368) // originally: 248 + assertTotalSize(jobWithChildren(4), 432) // originally: 304 } private fun jobWithChildren(numberOfChildren: Int): Job {