Skip to content

Commit

Permalink
Derive RxJava serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkadii Ivanov committed Oct 12, 2020
1 parent f38af46 commit a88a35e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,76 +1,82 @@
package com.badoo.reaktive.utils.serializer

import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.queue.Queue
import kotlin.jvm.Volatile

internal abstract class SerializerImpl<in T>(queue: Queue<T>) : Serializer<T> {
/*
* Derived from RxJava SerializedEmitter
*/
internal abstract class SerializerImpl<in T>(
private val queue: Queue<T>
) : Serializer<T> {

private var queue: Queue<T>? = queue
private val monitor = Any()
private var isDraining = false
@Volatile
private var isDone = false
private val counter = AtomicInt()

override fun accept(value: T) {
val queueToDrain =
synchronized(monitor) {
val queue = queue ?: return
if (isDone) {
return
}

if (isDraining) {
queue.offer(value)
return
}
if (counter.compareAndSet(0, 1)) {
if (!onValue(value)) {
isDone = true
return
}

isDraining = true
queue
if (counter.addAndGet(-1) == 0) {
return
}
} else {
synchronized(queue) {
queue.offer(value)
}

if (!processValue(value)) {
return
if (counter.addAndGet(1) > 1) {
return
}
}

queueToDrain.drain()
drainLoop()
}

override fun clear() {
synchronized(monitor) {
queue?.clear()
}
synchronized(queue, queue::clear)
}

private fun Queue<T>.drain() {
abstract fun onValue(value: T): Boolean

private fun drainLoop() {
var missed = 1
while (true) {
val value =
synchronized(monitor) {
if (isEmpty) {
onDrainFinished(false)
return
while (true) {
var isEmpty = false
var value: T? = null

synchronized(queue) {
isEmpty = queue.isEmpty
if (!isEmpty) {
value = queue.poll()
}
}

@Suppress("UNCHECKED_CAST")
poll() as T
if (isEmpty) {
break
}

if (!processValue(value)) {
return
@Suppress("UNCHECKED_CAST")
if (!onValue(value as T)) {
isDone = true
return
}
}
}
}

private fun processValue(value: T): Boolean {
if (!onValue(value)) {
synchronized(monitor) {
onDrainFinished(true)
missed = counter.addAndGet(-missed)
if (missed == 0) {
break
}
return false
}

return true
}

private fun onDrainFinished(terminate: Boolean) {
isDraining = false
if (terminate) {
queue = null
}
}

protected abstract fun onValue(value: T): Boolean
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,79 @@
package com.badoo.reaktive.utils.serializer

import com.badoo.reaktive.utils.atomic.AtomicBoolean
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.atomic.AtomicReference
import com.badoo.reaktive.utils.atomic.getAndUpdate
import com.badoo.reaktive.utils.atomic.update
import com.badoo.reaktive.utils.plusSorted

/*
* Derived from RxJava SerializedEmitter
*/
internal abstract class SerializerImpl<in T>(
private val comparator: Comparator<in T>? = null
) : Serializer<T> {

private val state = AtomicReference<State<T>?>(State())
private val queue = AtomicReference<List<T>>(emptyList())
private val isDone = AtomicBoolean()
private val counter = AtomicInt()

override fun accept(value: T) {
state
.getAndUpdate { state ->
state?.copy(
queue = state.queue.addAndSort(value, comparator),
isDraining = true
)
if (isDone.value) {
return
}

if (counter.compareAndSet(0, 1)) {
if (!onValue(value)) {
isDone.value = true
return
}

if (counter.addAndGet(-1) == 0) {
return
}
?.isDraining
?.takeUnless { it }
?.run { drain() }
} else {
queue.update { it.addAndSort(value, comparator) }

if (counter.addAndGet(1) > 1) {
return
}
}

drainLoop()
}

override fun clear() {
state.update {
it?.copy(queue = emptyList())
}
queue.value = emptyList()
}

private fun drain() {
abstract fun onValue(value: T): Boolean

private fun drainLoop() {
var missed = 1
while (true) {
val oldState =
state.getAndUpdate {
it?.copy(
queue = it.queue.drop(1),
isDraining = it.queue.isNotEmpty()
)
while (true) {
val oldQueue = queue.getAndUpdate { it.drop(1) }

if (oldQueue.isEmpty()) {
break
}

if ((oldState == null) || oldState.queue.isEmpty()) {
return
if (!onValue(oldQueue[0])) {
isDone.value = true
return
}
}

if (!onValue(oldState.queue[0])) {
state.value = null
return
missed = counter.addAndGet(-missed)
if (missed == 0) {
break
}
}
}

protected abstract fun onValue(value: T): Boolean

private companion object {
private fun <T> List<T>.addAndSort(item: T, comparator: Comparator<in T>?): List<T> =
if (comparator == null) plus(item) else plusSorted(item, comparator)
}

private data class State<out T>(
val queue: List<T> = emptyList(),
val isDraining: Boolean = false
)
}

0 comments on commit a88a35e

Please sign in to comment.