diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt index 2c093a07d9..b86304ea52 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt @@ -16,11 +16,15 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.atomicfu.* -import kotlinx.coroutines.experimental.internal.* -import kotlinx.coroutines.experimental.internalAnnotations.* -import kotlinx.coroutines.experimental.intrinsics.* -import kotlinx.coroutines.experimental.selects.* +import kotlinx.coroutines.experimental.internal.ReentrantLock +import kotlinx.coroutines.experimental.internal.Symbol +import kotlinx.coroutines.experimental.internal.subscriberList +import kotlinx.coroutines.experimental.internal.withLock +import kotlinx.coroutines.experimental.internalAnnotations.JvmField +import kotlinx.coroutines.experimental.internalAnnotations.Volatile +import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched +import kotlinx.coroutines.experimental.selects.SelectClause2 +import kotlinx.coroutines.experimental.selects.SelectInstance /** * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers. @@ -33,44 +37,22 @@ import kotlinx.coroutines.experimental.selects.* * A secondary constructor can be used to create an instance of this class that already holds a value. * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation. * - * This implementation is fully lock-free. In this implementation + * This implementation is synchronized. In this implementation * [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the * number of subscribers. */ public class ConflatedBroadcastChannel() : BroadcastChannel { - /** - * Creates an instance of this class that already holds a value. - * - * It is as a shortcut to creating an instance with a default constructor and - * immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`. - */ - constructor(value: E) : this() { - _state.lazySet(State(value, null)) - } - - private val _state = atomic(INITIAL_STATE) // State | Closed - private val _updating = atomic(0) - - private companion object { - @JvmField - val CLOSED = Closed(null) - - @JvmField - val UNDEFINED = Symbol("UNDEFINED") - @JvmField - val INITIAL_STATE = State(UNDEFINED, null) + public constructor(value: E) : this() { + _state = value } - private class State( - @JvmField val value: Any?, // UNDEFINED | E - @JvmField val subscribers: Array>? - ) + private val _lock = ReentrantLock() - private class Closed(@JvmField val closeCause: Throwable?) { - val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE) - val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE) - } + @Volatile + private var _state: Any? = UNDEFINED + + private var _subscribers = subscriberList() /** * The most recently sent element to this channel. @@ -80,184 +62,124 @@ public class ConflatedBroadcastChannel() : BroadcastChannel { * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. */ @Suppress("UNCHECKED_CAST") - public val value: E get() { - _state.loop { state -> - when (state) { - is Closed -> throw state.valueException - is State<*> -> { - if (state.value === UNDEFINED) throw IllegalStateException("No value") - return state.value as E - } - else -> error("Invalid state $state") - } + public val value: E + get() { + val state = _state + if (state === UNDEFINED) throw IllegalStateException("No value") + (state as? Closed)?.run { throw valueException } + return state as E } - } /** * The most recently sent element to this channel or `null` when this class is constructed without * initial value and no value was sent yet or if it was [closed][close]. */ @Suppress("UNCHECKED_CAST") - public val valueOrNull: E? get() { - val state = _state.value - when (state) { - is Closed -> return null - is State<*> -> { - if (state.value === UNDEFINED) return null - return state.value as E + public val valueOrNull: E? + get() { + val state = _state + return when { + state === UNDEFINED -> null + state is Closed -> state.closeCause?.let { throw it } + else -> state as E } - else -> error("Invalid state $state") } - } - public override val isClosedForSend: Boolean get() = _state.value is Closed public override val isFull: Boolean get() = false - @Suppress("UNCHECKED_CAST") - public override fun openSubscription(): ReceiveChannel { - val subscriber = Subscriber(this) - _state.loop { state -> - when (state) { - is Closed -> { - subscriber.close(state.closeCause) - return subscriber - } - is State<*> -> { - if (state.value !== UNDEFINED) - subscriber.offerInternal(state.value as E) - val update = State(state.value, addSubscriber((state as State).subscribers, subscriber)) - if (_state.compareAndSet(state, update)) - return subscriber - } - else -> error("Invalid state $state") - } - } - } + public override val isClosedForSend: Boolean get() = _state is Closed - @Suppress("UNCHECKED_CAST") - private fun closeSubscriber(subscriber: Subscriber) { - _state.loop { state -> - when (state) { - is Closed -> return - is State<*> -> { - val update = State(state.value, removeSubscriber((state as State).subscribers!!, subscriber)) - if (_state.compareAndSet(state, update)) - return + public override val onSend: SelectClause2> + get() = object : SelectClause2> { + override fun registerSelectClause2(select: SelectInstance, param: E, block: suspend (SendChannel) -> R) { + if (!select.trySelect(null)) return + offerInternal(param)?.let { + select.resumeSelectCancellableWithException(it.sendException) + return } - else -> error("Invalid state $state") + block.startCoroutineUndispatched(receiver = this@ConflatedBroadcastChannel, completion = select.completion) } } - } - - private fun addSubscriber(list: Array>?, subscriber: Subscriber): Array> { - if (list == null) return Array(1) { subscriber } - return list + subscriber - } - @Suppress("UNCHECKED_CAST") - private fun removeSubscriber(list: Array>, subscriber: Subscriber): Array>? { - val n = list.size - val i = list.indexOf(subscriber) - check(i >= 0) - if (n == 1) return null - val update = arrayOfNulls>(n - 1) - arraycopy(list, 0, update, 0, i) - arraycopy(list, i + 1, update, i, n - i - 1) - return update as Array> - } + public override fun cancel(cause: Throwable?): Boolean = close(cause) - @Suppress("UNCHECKED_CAST") public override fun close(cause: Throwable?): Boolean { - _state.loop { state -> - when (state) { - is Closed -> return false - is State<*> -> { - val update = if (cause == null) CLOSED else Closed(cause) - if (_state.compareAndSet(state, update)) { - (state as State).subscribers?.forEach { it.close(cause) } - return true - } - } - else -> error("Invalid state $state") - } + _lock.withLock { + if (_state is Closed) return false + _state = Closed(cause) + + // dispose all subscribers + _subscribers.forEach { it.close(cause) } + _subscribers.clear() + return true } } - /** - * Closes this broadcast channel. Same as [close]. - */ - public override fun cancel(cause: Throwable?): Boolean = close(cause) - - /** - * Sends the value to all subscribed receives and stores this value as the most recent state for - * future subscribers. This implementation never suspends. - * It throws exception if the channel [isClosedForSend] (see [close] for details). - */ - public override suspend fun send(element: E) { - offerInternal(element)?.let { throw it.sendException } - } - - /** - * Sends the value to all subscribed receives and stores this value as the most recent state for - * future subscribers. This implementation always returns `true`. - * It throws exception if the channel [isClosedForSend] (see [close] for details). - */ public override fun offer(element: E): Boolean { offerInternal(element)?.let { throw it.sendException } return true } - @Suppress("UNCHECKED_CAST") private fun offerInternal(element: E): Closed? { - // If some other thread is updating the state in its offer operation we assume that our offer had linearized - // before that offer (we lost) and that offer overwrote us and conflated our offer. - if (!_updating.compareAndSet(0, 1)) return null - try { - _state.loop { state -> - when (state) { - is Closed -> return state - is State<*> -> { - val update = State(element, (state as State).subscribers) - if (_state.compareAndSet(state, update)) { - // Note: Using offerInternal here to ignore the case when this subscriber was - // already concurrently closed (assume the close had conflated our offer for this - // particular subscriber). - state.subscribers?.forEach { it.offerInternal(element) } - return null - } - } - else -> error("Invalid state $state") - } + if (_lock.tryLock()) { + try { + (_state as? Closed)?.let { return it } + + _state = element + _subscribers.forEach { it.tryOffer(element) } + return null + } finally { + _lock.unlock() } - } finally { - _updating.value = 0 // reset the updating flag to zero even when something goes wrong + } else { + return _state as? Closed } } - public override val onSend: SelectClause2> - get() = object : SelectClause2> { - override fun registerSelectClause2(select: SelectInstance, param: E, block: suspend (SendChannel) -> R) { - registerSelectSend(select, param, block) + public override fun openSubscription(): ReceiveChannel { + val subscriber = Subscriber() + _subscribers.add(subscriber) + + do { + val state = _state + @Suppress("UNCHECKED_CAST") + when { + state is Closed -> subscriber.close(state.closeCause) + state !== UNDEFINED -> subscriber.tryOffer(state as E) } - } + // manage offerInternal/close contention + } while (_state !== state) - private fun registerSelectSend(select: SelectInstance, element: E, block: suspend (SendChannel) -> R) { - if (!select.trySelect(null)) return - offerInternal(element)?.let { - select.resumeSelectCancellableWithException(it.sendException) - return - } - block.startCoroutineUndispatched(receiver = this, completion = select.completion) + if (subscriber.isClosedForSend) _subscribers.remove(subscriber) + return subscriber + } + + public override suspend fun send(element: E) { + offerInternal(element)?.run { throw sendException } } - private class Subscriber( - private val broadcastChannel: ConflatedBroadcastChannel - ) : ConflatedChannel(), ReceiveChannel, SubscriptionReceiveChannel { + private inner class Subscriber : ConflatedChannel() { + + /** + * Offer an element without throw exception + */ + fun tryOffer(element: E) { + super.offerInternal(element) + } + override fun cancel(cause: Throwable?): Boolean = - close(cause).also { closed -> - if (closed) broadcastChannel.closeSubscriber(this) - } + super.cancel(cause).also { closed -> + if (closed) this@ConflatedBroadcastChannel._subscribers.remove(this) + } + } - public override fun offerInternal(element: E): Any = super.offerInternal(element) + private class Closed(@JvmField val closeCause: Throwable?) { + val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE) + val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE) + } + + private companion object { + @JvmField + val UNDEFINED = Symbol("UNDEFINED") } -} \ No newline at end of file +}