Skip to content
This repository has been archived by the owner on Apr 24, 2019. It is now read-only.

Commit

Permalink
Expose cancelation via boolean return value.
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeWharton committed Feb 3, 2018
1 parent c06a961 commit 23c78c6
Show file tree
Hide file tree
Showing 33 changed files with 184 additions and 21 deletions.
Expand Up @@ -7,6 +7,7 @@ internal class JobDisposable(private val job: Job) : Disposable {
override fun isDisposed() = job.isCancelled

override fun dispose() {
println("DISPOSED")
job.cancel()
}
}
@@ -1,6 +1,7 @@
package reagent.rxjava2

import io.reactivex.Observer
import io.reactivex.disposables.Disposables
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.exceptions.UndeliverableException
Expand All @@ -19,6 +20,8 @@ internal class ObservableReagentToRx<I : Any>(
try {
upstream.subscribe {
observer.onNext(it)
println("ISACTIVE $isActive")
return@subscribe isActive
}
} catch (ignored: JobCancellationException) {
return@launch
Expand Down
Expand Up @@ -2,6 +2,7 @@ package reagent.rxjava2

import io.reactivex.Single
import io.reactivex.SingleObserver
import io.reactivex.disposables.Disposables
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.UndeliverableException
import io.reactivex.plugins.RxJavaPlugins
Expand Down
Expand Up @@ -3,8 +3,10 @@ package reagent.rxjava2
import org.junit.Test
import reagent.Observable
import reagent.source.emptyObservable
import reagent.source.observable
import reagent.source.observableOf
import reagent.source.toOne
import kotlin.test.assertEquals

class ObservableReagentToRxTest {
@Test fun empty() {
Expand Down Expand Up @@ -37,4 +39,16 @@ class ObservableReagentToRxTest {
.test()
.assertError(exception)
}

@Test fun upstreamNotified() {
println("START")
val results = mutableListOf<Boolean>()
observable<Int> {
for (i in 1..2) {
results.add(it(i))
}
}.toRx().take(2).test().assertValues(1, 2).assertComplete()
println("STOP")
assertEquals(listOf(true, false), results)
}
}
1 change: 1 addition & 0 deletions reagent-rxjs6/src/main/kotlin/reagent/rxjs6/converters.kt
Expand Up @@ -12,6 +12,7 @@ fun <I> Observable<I>.toRx(): dynamic {
try {
this@toRx.subscribe {
observer.onNext(it)
true // TODO
}
} catch (ignored: JobCancellationException) {
return@launch
Expand Down
2 changes: 1 addition & 1 deletion reagent/common/src/main/kotlin/reagent/Observable.kt
Expand Up @@ -20,4 +20,4 @@ expect abstract class Observable<out I>() {
abstract suspend fun subscribe(emit: Emitter<I>)
}

typealias Emitter<I> = suspend (item: I) -> Unit
typealias Emitter<I> = suspend (item: I) -> Boolean
3 changes: 2 additions & 1 deletion reagent/common/src/main/kotlin/reagent/operator/all.kt
Expand Up @@ -14,8 +14,9 @@ internal class ObservableAll<out I>(
upstream.subscribe {
if (result && !predicate(it)) {
result = false
// TODO this needs to be return@produce false
return@subscribe false
}
return@subscribe true
}
return result
}
Expand Down
3 changes: 2 additions & 1 deletion reagent/common/src/main/kotlin/reagent/operator/any.kt
Expand Up @@ -14,8 +14,9 @@ internal class ObserableAny<out I>(
upstream.subscribe {
if (predicate(it)) {
result = true
// TODO this needs to be return@produce true
return@subscribe false
}
return@subscribe true
}
return result
}
Expand Down
12 changes: 10 additions & 2 deletions reagent/common/src/main/kotlin/reagent/operator/concatMap.kt
Expand Up @@ -26,8 +26,16 @@ internal class ObservableConcatMapObservable<U, out D>(
private val func: (U) -> Observable<D>
) : Observable<D>() {
override suspend fun subscribe(emit: Emitter<D>) {
upstream.subscribe {
func(it).subscribe(emit)
upstream.subscribe upstream@ {
var more = true
func(it).subscribe inner@ {
if (!emit(it)) {
more = false
return@inner false
}
return@inner true
}
return@upstream more
}
}
}
1 change: 1 addition & 0 deletions reagent/common/src/main/kotlin/reagent/operator/count.kt
Expand Up @@ -12,6 +12,7 @@ internal class ObservableCount<out I>(
var count = 0
upstream.subscribe {
count++
return@subscribe true
}
return count
}
Expand Down
5 changes: 3 additions & 2 deletions reagent/common/src/main/kotlin/reagent/operator/distinct.kt
Expand Up @@ -12,10 +12,11 @@ internal class ObservableDistinct<out I>(
) : Observable<I>() {
override suspend fun subscribe(emit: Emitter<I>) {
val seen = mutableSetOf<Any?>()
upstream.subscribe {
upstream.subscribe upstream@ {
if (seen.add(selector(it))) {
emit(it)
return@upstream emit(it)
}
return@upstream true
}
}
}
Expand Up @@ -22,6 +22,8 @@ internal class ObservableDistinctUntilChanged<out I>(
} else if (selected != previous) {
previous = selected
emit(it)
} else {
true
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions reagent/common/src/main/kotlin/reagent/operator/drop.kt
Expand Up @@ -18,6 +18,8 @@ internal class ObservableDrop<out I>(
upstream.subscribe {
if (++seen > count) {
emit(it)
} else {
true
}
}
if (require && seen < count) {
Expand All @@ -36,11 +38,11 @@ internal class ObservableDropWhile<out I>(
upstream.subscribe upstream@ {
if (!taking) {
if (predicate(it)) {
return@upstream
return@upstream true
}
taking = true
}
emit(it)
return@upstream emit(it)
}
}
}
2 changes: 2 additions & 0 deletions reagent/common/src/main/kotlin/reagent/operator/filter.kt
Expand Up @@ -32,6 +32,8 @@ internal class ObservableFilter<out I>(
upstream.subscribe {
if (predicate(it)) {
emit(it)
} else {
true
}
}
}
Expand Down
1 change: 1 addition & 0 deletions reagent/common/src/main/kotlin/reagent/operator/fold.kt
Expand Up @@ -29,6 +29,7 @@ internal class ObservableFold<out I, R>(
var value = initial
upstream.subscribe {
value = operation(value, it)
return@subscribe true
}
return value
}
Expand Down
Expand Up @@ -9,6 +9,6 @@ internal class ObservableIgnoreElements(
private val upstream: Observable<*>
): Observable<Nothing>() {
override suspend fun subscribe(emit: Emitter<Nothing>) {
upstream.subscribe { }
upstream.subscribe { true }
}
}
1 change: 1 addition & 0 deletions reagent/common/src/main/kotlin/reagent/operator/minMax.kt
Expand Up @@ -25,6 +25,7 @@ internal class ObservableComparing<out I, in S : Comparable<S>>(
} else if (selector(it).compareTo(selector(max as I)).sign == order) {
max = it
}
return@subscribe true
}
if (first) {
throw NoSuchElementException("No elements to compare")
Expand Down
3 changes: 2 additions & 1 deletion reagent/common/src/main/kotlin/reagent/operator/none.kt
Expand Up @@ -14,8 +14,9 @@ internal class ObservableNone<out I>(
upstream.subscribe {
if (result && predicate(it)) {
result = false
// TODO this needs to be return@produce false
return@subscribe false
}
return@subscribe true
}
return result
}
Expand Down
1 change: 1 addition & 0 deletions reagent/common/src/main/kotlin/reagent/operator/reduce.kt
Expand Up @@ -40,6 +40,7 @@ internal class ObservableReduce<out R, out I : R>(
} else {
accumulator = operation(accumulator as R, it)
}
return@subscribe true
}
if (first) {
throw NoSuchElementException("Reduce requires a non-empty Observable")
Expand Down
8 changes: 5 additions & 3 deletions reagent/common/src/main/kotlin/reagent/operator/take.kt
Expand Up @@ -18,6 +18,8 @@ internal class ObservableTake<out I>(
upstream.subscribe {
if (seen++ < count) {
emit(it)
} else {
false
}
}
if (require && seen < count) {
Expand All @@ -33,15 +35,15 @@ internal class ObservableTakeWhile<out I>(
) : Observable<I>() {
override suspend fun subscribe(emit: Emitter<I>) {
var taking = true
upstream.subscribe {
upstream.subscribe upstream@ {
if (taking) {
if (predicate(it)) {
emit(it)
return@upstream emit(it)
} else {
taking = false
// TODO this should break out
}
}
return@upstream false
}
}
}
Expand Up @@ -5,6 +5,10 @@ import reagent.Observable

internal class ObservableArray<out I>(private val items: Array<out I>) : Observable<I>() {
override suspend fun subscribe(emit: Emitter<I>) {
items.forEach { emit(it) }
for (item in items) {
if (!emit(item)) {
return
}
}
}
}
Expand Up @@ -5,6 +5,10 @@ import reagent.Observable

internal class ObservableIterable<out I>(private val iterable: Iterable<I>): Observable<I>() {
override suspend fun subscribe(emit: Emitter<I>) {
iterable.forEach { emit(it) }
for (item in iterable) {
if (!emit(item)) {
return
}
}
}
}
Expand Up @@ -3,8 +3,12 @@ package reagent.source
import reagent.Emitter
import reagent.Observable

internal class ObservableSequence<out I>(private val iterable: Sequence<I>): Observable<I>() {
internal class ObservableSequence<out I>(private val sequence: Sequence<I>): Observable<I>() {
override suspend fun subscribe(emit: Emitter<I>) {
iterable.forEach { emit(it) }
for (item in sequence) {
if (!emit(item)) {
return
}
}
}
}
@@ -1,8 +1,10 @@
package reagent.operator

import reagent.runTest
import reagent.source.observable
import reagent.source.observableOf
import reagent.source.test.emptyActualObservable
import reagent.tester.testObservable
import reagent.tester.testOne
import kotlin.test.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -36,4 +38,17 @@ class ObservableAllTest {
}
assertEquals(2, called)
}

@Test fun emitterReturnValue() = runTest {
var result: Boolean? = null
observable<Int> {
result = it(1)
}.all {
false
}.testObservable {
item(false)
complete()
}
assertEquals(false, result)
}
}
Expand Up @@ -2,9 +2,12 @@ package reagent.operator

import reagent.runTest
import reagent.source.observableOf
import reagent.source.observable
import reagent.source.test.emptyActualObservable
import reagent.tester.testObservable
import reagent.tester.testOne
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.fail

class ObservableAnyTest {
Expand All @@ -31,4 +34,17 @@ class ObservableAnyTest {
item(false)
}
}

@Test fun emitterReturnValue() = runTest {
var result: Boolean? = null
observable<Int> {
result = it(1)
}.any {
true
}.testObservable {
item(true)
complete()
}
assertEquals(false, result)
}
}
Expand Up @@ -2,6 +2,7 @@ package reagent.operator

import reagent.runTest
import reagent.source.emptyObservable
import reagent.source.observable
import reagent.source.observableOf
import reagent.tester.testObservable
import kotlin.test.Test
Expand Down Expand Up @@ -61,4 +62,17 @@ class ObservableDropTest {
complete()
}
}

@Test fun emitterReturnValue() = runTest {
val emits = mutableListOf<Boolean>()
observable<Int> {
for (i in 1..4) {
emits.add(it(i))
}
}.drop(2).take(1).testObservable {
item(3)
complete()
}
assertEquals(listOf(true, true, true, false), emits)
}
}

0 comments on commit 23c78c6

Please sign in to comment.