-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Arkadii Ivanov
committed
Oct 10, 2020
1 parent
f38af46
commit a9b302e
Showing
2 changed files
with
107 additions
and
79 deletions.
There are no files selected for viewing
104 changes: 57 additions & 47 deletions
104
reaktive/src/jvmJsCommonMain/kotlin/com/badoo/reaktive/utils/serializer/SerializerImpl.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,76 +1,86 @@ | ||
package com.badoo.reaktive.utils.serializer | ||
|
||
import com.badoo.reaktive.utils.atomic.AtomicInt | ||
import com.badoo.reaktive.utils.queue.Queue | ||
import kotlin.jvm.Volatile | ||
|
||
internal abstract class SerializerImpl<in T>(queue: Queue<T>) : Serializer<T> { | ||
/* | ||
* Derived from RxJava SerializedEmitter | ||
*/ | ||
internal abstract class SerializerImpl<in T>( | ||
private val queue: Queue<T> | ||
) : Serializer<T> { | ||
|
||
private var queue: Queue<T>? = queue | ||
private val monitor = Any() | ||
private var isDraining = false | ||
@Volatile | ||
private var isDone = false | ||
private val counter = AtomicInt() | ||
|
||
override fun accept(value: T) { | ||
val queueToDrain = | ||
synchronized(monitor) { | ||
val queue = queue ?: return | ||
if (isDone) { | ||
return | ||
} | ||
|
||
if (isDraining) { | ||
queue.offer(value) | ||
return | ||
} | ||
if (counter.compareAndSet(0, 1)) { | ||
if (!onValue(value)) { | ||
isDone = true | ||
return | ||
} | ||
|
||
isDraining = true | ||
queue | ||
if (counter.addAndGet(-1) == 0) { | ||
return | ||
} | ||
} else { | ||
synchronized(queue) { | ||
queue.offer(value) | ||
} | ||
|
||
if (!processValue(value)) { | ||
return | ||
if (counter.addAndGet(1) > 1) { | ||
return | ||
} | ||
} | ||
|
||
queueToDrain.drain() | ||
drainLoop() | ||
} | ||
|
||
override fun clear() { | ||
synchronized(monitor) { | ||
queue?.clear() | ||
} | ||
synchronized(queue, queue::clear) | ||
} | ||
|
||
private fun Queue<T>.drain() { | ||
abstract fun onValue(value: T): Boolean | ||
|
||
private fun drainLoop() { | ||
var missed = 1 | ||
while (true) { | ||
val value = | ||
synchronized(monitor) { | ||
if (isEmpty) { | ||
onDrainFinished(false) | ||
return | ||
while (true) { | ||
if (isDone) { | ||
return | ||
} | ||
|
||
var isEmpty = false | ||
var value: T? = null | ||
|
||
synchronized(queue) { | ||
isEmpty = queue.isEmpty | ||
if (!isEmpty) { | ||
value = queue.poll() | ||
} | ||
} | ||
|
||
@Suppress("UNCHECKED_CAST") | ||
poll() as T | ||
if (isEmpty) { | ||
break | ||
} | ||
|
||
if (!processValue(value)) { | ||
return | ||
@Suppress("UNCHECKED_CAST") | ||
if (!onValue(value as T)) { | ||
isDone = true | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
private fun processValue(value: T): Boolean { | ||
if (!onValue(value)) { | ||
synchronized(monitor) { | ||
onDrainFinished(true) | ||
missed = counter.addAndGet(-missed) | ||
if (missed == 0) { | ||
break | ||
} | ||
return false | ||
} | ||
|
||
return true | ||
} | ||
|
||
private fun onDrainFinished(terminate: Boolean) { | ||
isDraining = false | ||
if (terminate) { | ||
queue = null | ||
} | ||
} | ||
|
||
protected abstract fun onValue(value: T): Boolean | ||
} |
82 changes: 50 additions & 32 deletions
82
reaktive/src/nativeCommonMain/kotlin/com/badoo/reaktive/utils/serializer/SerializerImpl.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,65 +1,83 @@ | ||
package com.badoo.reaktive.utils.serializer | ||
|
||
import com.badoo.reaktive.utils.atomic.AtomicBoolean | ||
import com.badoo.reaktive.utils.atomic.AtomicInt | ||
import com.badoo.reaktive.utils.atomic.AtomicReference | ||
import com.badoo.reaktive.utils.atomic.getAndUpdate | ||
import com.badoo.reaktive.utils.atomic.update | ||
import com.badoo.reaktive.utils.plusSorted | ||
|
||
/* | ||
* Derived from RxJava SerializedEmitter | ||
*/ | ||
internal abstract class SerializerImpl<in T>( | ||
private val comparator: Comparator<in T>? = null | ||
) : Serializer<T> { | ||
|
||
private val state = AtomicReference<State<T>?>(State()) | ||
private val queue = AtomicReference<List<T>>(emptyList()) | ||
private val isDone = AtomicBoolean() | ||
private val counter = AtomicInt() | ||
|
||
override fun accept(value: T) { | ||
state | ||
.getAndUpdate { state -> | ||
state?.copy( | ||
queue = state.queue.addAndSort(value, comparator), | ||
isDraining = true | ||
) | ||
if (isDone.value) { | ||
return | ||
} | ||
|
||
if (counter.compareAndSet(0, 1)) { | ||
if (!onValue(value)) { | ||
isDone.value = true | ||
return | ||
} | ||
|
||
if (counter.addAndGet(-1) == 0) { | ||
return | ||
} | ||
?.isDraining | ||
?.takeUnless { it } | ||
?.run { drain() } | ||
} else { | ||
queue.update { it.addAndSort(value, comparator) } | ||
|
||
if (counter.addAndGet(1) > 1) { | ||
return | ||
} | ||
} | ||
|
||
drainLoop() | ||
} | ||
|
||
override fun clear() { | ||
state.update { | ||
it?.copy(queue = emptyList()) | ||
} | ||
queue.value = emptyList() | ||
} | ||
|
||
private fun drain() { | ||
abstract fun onValue(value: T): Boolean | ||
|
||
private fun drainLoop() { | ||
var missed = 1 | ||
while (true) { | ||
val oldState = | ||
state.getAndUpdate { | ||
it?.copy( | ||
queue = it.queue.drop(1), | ||
isDraining = it.queue.isNotEmpty() | ||
) | ||
while (true) { | ||
if (isDone.value) { | ||
return | ||
} | ||
|
||
if ((oldState == null) || oldState.queue.isEmpty()) { | ||
return | ||
val oldQueue = queue.getAndUpdate { it.drop(1) } | ||
|
||
if (oldQueue.isEmpty()) { | ||
break | ||
} | ||
|
||
if (!onValue(oldQueue[0])) { | ||
isDone.value = true | ||
return | ||
} | ||
} | ||
|
||
if (!onValue(oldState.queue[0])) { | ||
state.value = null | ||
return | ||
missed = counter.addAndGet(-missed) | ||
if (missed == 0) { | ||
break | ||
} | ||
} | ||
} | ||
|
||
protected abstract fun onValue(value: T): Boolean | ||
|
||
private companion object { | ||
private fun <T> List<T>.addAndSort(item: T, comparator: Comparator<in T>?): List<T> = | ||
if (comparator == null) plus(item) else plusSorted(item, comparator) | ||
} | ||
|
||
private data class State<out T>( | ||
val queue: List<T> = emptyList(), | ||
val isDraining: Boolean = false | ||
) | ||
} |