Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized concatMap and concat operators #546

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,37 +1,13 @@
package com.badoo.reaktive.completable

import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.observable.asCompletable
import com.badoo.reaktive.observable.asObservable
import com.badoo.reaktive.observable.concatMap

fun Iterable<Completable>.concat(): Completable =
completable { emitter ->
val sources = toList()

if (sources.isEmpty()) {
emitter.onComplete()
return@completable
}

val sourceIndex = AtomicInt()

val upstreamObserver =
object : CompletableObserver, CompletableCallbacks by emitter {
override fun onSubscribe(disposable: Disposable) {
emitter.setDisposable(disposable)
}

override fun onComplete() {
sourceIndex
.addAndGet(1)
.let(sources::getOrNull)
?.subscribeSafe(this)
?: emitter.onComplete()
}
}

sources[0].subscribe(upstreamObserver)
}
asObservable()
.concatMap { it.asObservable<Nothing>() }
.asCompletable()

fun concat(vararg sources: Completable): Completable =
sources
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.badoo.reaktive.maybe

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.asObservable
import com.badoo.reaktive.observable.concat
import com.badoo.reaktive.observable.concatMap

fun <T> Iterable<Maybe<T>>.concat(): Observable<T> =
map(Maybe<T>::asObservable)
.concat()
asObservable()
.concatMap { it.asObservable() }

fun <T> concat(vararg sources: Maybe<T>): Observable<T> =
sources
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,8 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.DisposableWrapper
import com.badoo.reaktive.utils.atomic.AtomicInt

fun <T> Iterable<Observable<T>>.concat(): Observable<T> =
observableUnsafe { observer ->
val disposableWrapper = DisposableWrapper()
observer.onSubscribe(disposableWrapper)

val sources = toList()

if (sources.isEmpty()) {
observer.onComplete()
return@observableUnsafe
}

val sourceIndex = AtomicInt()

val upstreamObserver =
object : ObservableObserver<T>, ObservableCallbacks<T> by observer {
override fun onSubscribe(disposable: Disposable) {
disposableWrapper.set(disposable)
}

override fun onComplete() {
sourceIndex
.addAndGet(1)
.let(sources::getOrNull)
?.subscribeSafe(this)
?: observer.onComplete()
}
}

sources[0].subscribeSafe(upstreamObserver)
}
asObservable()
.concatMap { it }

fun <T> concat(vararg sources: Observable<T>): Observable<T> =
sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package com.badoo.reaktive.observable

import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.ValueCallback
import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.ObjectReference
import com.badoo.reaktive.disposable.DisposableWrapper
import com.badoo.reaktive.disposable.addTo
import com.badoo.reaktive.utils.atomic.AtomicReference
import com.badoo.reaktive.utils.atomic.getAndUpdate
import com.badoo.reaktive.utils.atomic.updateAndGet
import com.badoo.reaktive.utils.queue.SharedQueue
import com.badoo.reaktive.utils.serializer.Serializer
import com.badoo.reaktive.utils.serializer.serializer

fun <T, R> Observable<T>.concatMap(mapper: (T) -> Observable<R>): Observable<R> =
observable { emitter ->
Expand All @@ -23,76 +24,94 @@ private class ConcatMapObserver<in T, in R>(
private val mapper: (T) -> Observable<R>
) : CompositeDisposable(), ObservableObserver<T>, ErrorCallback by callbacks {

private val state = AtomicReference(State<T>())
private val actor = serializer(::processEvent)
private val innerObserver = InnerObserver(callbacks, actor).addTo(this)
private val queue = SharedQueue<T>()
private val state = AtomicReference(State.IDLE)

override fun onSubscribe(disposable: Disposable) {
add(disposable)
}

override fun onNext(value: T) {
val oldState =
state.getAndUpdate {
it.copy(
queue = if (it.isMappedSourceActive) it.queue + value else it.queue,
isMappedSourceActive = true
)
}

if (!oldState.isMappedSourceActive) {
mapAndSubscribe(value)
}
actor.accept(value)
}

override fun onComplete() {
val newState =
state.updateAndGet {
it.copy(isUpstreamCompleted = true)
}
actor.accept(Event.UPSTREAM_COMPLETED)
}

@Suppress("UNCHECKED_CAST")
private fun processEvent(event: Any?): Boolean =
when (event) {
Event.UPSTREAM_COMPLETED -> onUpstreamCompleted()
Event.INNER_COMPLETED -> onInnerCompleted()
else -> onUpstreamValue(event as T)
}

private fun onUpstreamCompleted(): Boolean {
val oldState = state.value
state.value = State.UPSTREAM_COMPLETED

if (newState.isUpstreamCompleted && !newState.isMappedSourceActive) {
if (oldState == State.IDLE) {
callbacks.onComplete()
return false
}

return true
}

private fun mapAndSubscribe(value: T) {
callbacks.tryCatch(block = { mapper(value) }) {
it.subscribeSafe(InnerObserver())
private fun onInnerCompleted(): Boolean {
if (queue.isEmpty) {
if (state.value == State.UPSTREAM_COMPLETED) {
callbacks.onComplete()
return false
}

state.value = State.IDLE
} else {
@Suppress("UNCHECKED_CAST")
subscribe(queue.poll() as T)
}

return true
}

private data class State<out T>(
val queue: List<T> = emptyList(),
val isMappedSourceActive: Boolean = false,
val isUpstreamCompleted: Boolean = false
)
private fun onUpstreamValue(value: T): Boolean {
if (state.value == State.INNER_ACTIVE) {
queue.offer(value)
} else {
state.value = State.INNER_ACTIVE
subscribe(value)
}

return true
}

private inner class InnerObserver :
ObjectReference<Disposable?>(null),
ObservableObserver<R>,
ValueCallback<R> by callbacks,
ErrorCallback by callbacks {
private fun subscribe(value: T) {
tryCatch {
mapper(value).subscribe(innerObserver)
}
}

private class InnerObserver<R>(
private val callbacks: ObservableCallbacks<R>,
private val actor: Serializer<Any?>
) : ObservableObserver<R>, DisposableWrapper(), ValueCallback<R> by callbacks, ErrorCallback by callbacks {
override fun onSubscribe(disposable: Disposable) {
value = disposable
add(disposable)
set(disposable)
}

override fun onComplete() {
remove(value!!)

val oldState =
state.getAndUpdate {
it.copy(
queue = it.queue.drop(1),
isMappedSourceActive = it.queue.isNotEmpty()
)
}

if (oldState.queue.isNotEmpty()) {
mapAndSubscribe(oldState.queue[0])
} else if (oldState.isUpstreamCompleted) {
callbacks.onComplete()
}
actor.accept(Event.INNER_COMPLETED)
}
}

private enum class State {
IDLE, INNER_ACTIVE, UPSTREAM_COMPLETED
}

private enum class Event {
UPSTREAM_COMPLETED, INNER_COMPLETED
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.badoo.reaktive.single

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.asObservable
import com.badoo.reaktive.observable.concat
import com.badoo.reaktive.observable.concatMap

fun <T> Iterable<Single<T>>.concat(): Observable<T> =
map(Single<T>::asObservable)
.concat()
asObservable()
.concatMap { it.asObservable() }

fun <T> concat(vararg sources: Single<T>): Observable<T> =
sources
Expand Down