diff --git a/README.md b/README.md index 3bb9d36..8241804 100755 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ fun main(args: Array) { .subscribeBy( // named arguments for lambda Subscribers onNext = { println(it) }, onError = { it.printStackTrace() }, - onCompleted = { println("Done!") } + onComplete = { println("Done!") } ) } diff --git a/build.gradle b/build.gradle index dd4ee5d..e12f734 100755 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ buildscript { - ext.kotlin_version = '1.0.6' + ext.kotlin_version = '1.1.0' repositories { jcenter() } dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+', diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b080e76..9bc0a69 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Sun Feb 12 16:33:43 SGT 2017 +#Sun Mar 05 07:47:21 SGT 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip diff --git a/src/examples/kotlin/rx/lang/kotlin/examples/examples.kt b/src/examples/kotlin/io/reactivex/rxkotlin/examples/examples.kt similarity index 85% rename from src/examples/kotlin/rx/lang/kotlin/examples/examples.kt rename to src/examples/kotlin/io/reactivex/rxkotlin/examples/examples.kt index 22dd9d4..eb2a919 100644 --- a/src/examples/kotlin/rx/lang/kotlin/examples/examples.kt +++ b/src/examples/kotlin/io/reactivex/rxkotlin/examples/examples.kt @@ -1,14 +1,12 @@ -package rx.lang.kotlin.examples +package io.reactivex.rxkotlin.examples import io.reactivex.Observable import io.reactivex.disposables.CompositeDisposable -import rx.lang.kotlin.addTo -import rx.lang.kotlin.combineLatest -import rx.lang.kotlin.observable -import rx.lang.kotlin.plusAssign -import rx.lang.kotlin.subscribeBy -import rx.lang.kotlin.toObservable -import rx.lang.kotlin.zip +import io.reactivex.rxkotlin.* +import io.reactivex.rxkotlin.combineLatest +import io.reactivex.rxkotlin.subscribeBy +import io.reactivex.rxkotlin.toObservable +import io.reactivex.rxkotlin.zip import java.net.URL import java.util.Scanner import java.util.concurrent.TimeUnit @@ -50,7 +48,7 @@ fun main(args: Array) { addToCompositeSubscription() } -private fun URL.toScannerObservable() = observable { s -> +private fun URL.toScannerObservable() = Observable.create { s -> this.openStream().use { stream -> Scanner(stream).useDelimiter("\\A") .toObservable() @@ -58,13 +56,13 @@ private fun URL.toScannerObservable() = observable { s -> } } -fun syncObservable(): Observable = observable { subscriber -> +fun syncObservable(): Observable = Observable.create { subscriber -> (0..75).toObservable() .map { "Sync value_$it" } .subscribe { subscriber.onNext(it) } } -fun asyncObservable(): Observable = observable { subscriber -> +fun asyncObservable(): Observable = Observable.create { subscriber -> thread { (0..75).toObservable() .map { "Async value_$it" } @@ -72,7 +70,7 @@ fun asyncObservable(): Observable = observable { subscriber -> } } -fun asyncWiki(vararg articleNames: String): Observable = observable { subscriber -> +fun asyncWiki(vararg articleNames: String): Observable = Observable.create { subscriber -> thread { articleNames.toObservable() .flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() } @@ -80,7 +78,7 @@ fun asyncWiki(vararg articleNames: String): Observable = observable { su } } -fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable = observable { subscriber -> +fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable = Observable.create { subscriber -> thread { articleNames.toObservable() .flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() } diff --git a/src/examples/kotlin/rx/lang/kotlin/examples/retrofit/retrofit.kt b/src/examples/kotlin/io/reactivex/rxkotlin/examples/retrofit/retrofit.kt similarity index 95% rename from src/examples/kotlin/rx/lang/kotlin/examples/retrofit/retrofit.kt rename to src/examples/kotlin/io/reactivex/rxkotlin/examples/retrofit/retrofit.kt index 5e77a44..d32b886 100644 --- a/src/examples/kotlin/rx/lang/kotlin/examples/retrofit/retrofit.kt +++ b/src/examples/kotlin/io/reactivex/rxkotlin/examples/retrofit/retrofit.kt @@ -1,4 +1,4 @@ -package rx.lang.kotlin.examples.retrofit +package io.reactivex.rxkotlin.examples.retrofit import io.reactivex.Observable import retrofit.RestAdapter diff --git a/src/main/kotlin/rx/lang/kotlin/completable.kt b/src/main/kotlin/io/reactivex/rxkotlin/completable.kt similarity index 93% rename from src/main/kotlin/rx/lang/kotlin/completable.kt rename to src/main/kotlin/io/reactivex/rxkotlin/completable.kt index 8fe11e1..2200e78 100644 --- a/src/main/kotlin/rx/lang/kotlin/completable.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/completable.kt @@ -1,4 +1,4 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Completable import io.reactivex.functions.Action diff --git a/src/main/kotlin/io/reactivex/rxkotlin/disposable.kt b/src/main/kotlin/io/reactivex/rxkotlin/disposable.kt new file mode 100644 index 0000000..b56298b --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxkotlin/disposable.kt @@ -0,0 +1,19 @@ +package io.reactivex.rxkotlin + +import io.reactivex.disposables.CompositeDisposable +import io.reactivex.disposables.Disposable + +/** + * disposable += observable.subscribe() + */ +operator fun CompositeDisposable.plusAssign(disposable: Disposable) { + add(disposable) +} + +/** + * Add the subscription to a CompositeSubscription. + * @param compositeDisposable CompositeDisposable to add this subscription to + * @return this instance + */ +fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable + = apply { compositeDisposable.add(this) } \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/flowable.kt b/src/main/kotlin/io/reactivex/rxkotlin/flowable.kt similarity index 61% rename from src/main/kotlin/rx/lang/kotlin/flowable.kt rename to src/main/kotlin/io/reactivex/rxkotlin/flowable.kt index 1a348ee..1ccdd4b 100644 --- a/src/main/kotlin/rx/lang/kotlin/flowable.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/flowable.kt @@ -1,27 +1,16 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin -import io.reactivex.BackpressureStrategy import io.reactivex.Flowable -import io.reactivex.FlowableEmitter -import io.reactivex.Single import io.reactivex.functions.BiFunction -fun flowable( - strategy: BackpressureStrategy = BackpressureStrategy.BUFFER, - body: (FlowableEmitter) -> Unit -): Flowable = Flowable.create(body, strategy) -private fun Iterator.toIterable() = object : Iterable { - override fun iterator(): Iterator = this@toIterable -} - -fun BooleanArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun ByteArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun ShortArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun IntArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun LongArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun FloatArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun DoubleArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) +fun BooleanArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun ByteArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun ShortArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun IntArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun LongArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun FloatArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun DoubleArray.toFlowable(): Flowable = this.asIterable().toFlowable() fun Array.toFlowable(): Flowable = Flowable.fromArray(*this) fun IntProgression.toFlowable(): Flowable = @@ -30,14 +19,11 @@ fun IntProgression.toFlowable(): Flowable = fun Iterator.toFlowable(): Flowable = toIterable().toFlowable() fun Iterable.toFlowable(): Flowable = Flowable.fromIterable(this) -fun Sequence.toFlowable(): Flowable = Flowable.fromIterable(iterator().toIterable()) +fun Sequence.toFlowable(): Flowable = asIterable().toFlowable() fun Iterable>.merge(): Flowable = Flowable.merge(this.toFlowable()) fun Iterable>.mergeDelayError(): Flowable = Flowable.mergeDelayError(this.toFlowable()) -inline fun Flowable.fold(initial: R, crossinline body: (R, T) -> R): Single - = reduce(initial) { a, e -> body(a, e) } - /** * Returns Flowable that wrap all values into [IndexedValue] and populates corresponding index value. * Works similar to [kotlin.withIndex] @@ -56,20 +42,19 @@ fun Flowable.withIndex(): Flowable> inline fun Flowable.flatMapSequence(crossinline body: (T) -> Sequence): Flowable = flatMap { body(it).toFlowable() } -fun Flowable>.switchOnNext(): Flowable = Flowable.switchOnNext(this) /** * Flowable.combineLatest(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.combineLatest(crossinline combineFunction: (args: List) -> R): Flowable +inline fun Iterable>.combineLatest(crossinline combineFunction: (args: List) -> R): Flowable = Flowable.combineLatest(this) { combineFunction(it.asList().map { it as T }) } /** * Flowable.zip(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.zip(crossinline zipFunction: (args: List) -> R): Flowable +inline fun Iterable>.zip(crossinline zipFunction: (args: List) -> R): Flowable = Flowable.zip(this) { zipFunction(it.asList().map { it as T }) } /** @@ -78,6 +63,10 @@ inline fun List>.zip(crossinline zipFunction: (args: List) inline fun Flowable<*>.cast(): Flowable = cast(R::class.java) /** - * Filters the items emitted by an Observable, only emitting those of the specified type. + * Filters the items emitted by an Flowable, only emitting those of the specified type. */ -inline fun Flowable<*>.ofType(): Flowable = ofType(R::class.java) \ No newline at end of file +inline fun Flowable<*>.ofType(): Flowable = ofType(R::class.java) + +private fun Iterator.toIterable() = object : Iterable { + override fun iterator(): Iterator = this@toIterable +} \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/maybe.kt b/src/main/kotlin/io/reactivex/rxkotlin/maybe.kt similarity index 95% rename from src/main/kotlin/rx/lang/kotlin/maybe.kt rename to src/main/kotlin/io/reactivex/rxkotlin/maybe.kt index 35479bf..b1c3c5d 100644 --- a/src/main/kotlin/rx/lang/kotlin/maybe.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/maybe.kt @@ -1,4 +1,4 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Maybe import java.util.concurrent.Callable diff --git a/src/main/kotlin/rx/lang/kotlin/observable.kt b/src/main/kotlin/io/reactivex/rxkotlin/observable.kt similarity index 65% rename from src/main/kotlin/rx/lang/kotlin/observable.kt rename to src/main/kotlin/io/reactivex/rxkotlin/observable.kt index 0c30616..2b2c32d 100644 --- a/src/main/kotlin/rx/lang/kotlin/observable.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/observable.kt @@ -1,23 +1,16 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Observable -import io.reactivex.ObservableEmitter -import io.reactivex.Single import io.reactivex.functions.BiFunction -fun observable(body: (ObservableEmitter) -> Unit): Observable = Observable.create(body) -private fun Iterator.toIterable() = object : Iterable { - override fun iterator(): Iterator = this@toIterable -} - -fun BooleanArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun ByteArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun ShortArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun IntArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun LongArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun FloatArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun DoubleArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) +fun BooleanArray.toObservable(): Observable = this.asIterable().toObservable() +fun ByteArray.toObservable(): Observable = this.asIterable().toObservable() +fun ShortArray.toObservable(): Observable = this.asIterable().toObservable() +fun IntArray.toObservable(): Observable = this.asIterable().toObservable() +fun LongArray.toObservable(): Observable = this.asIterable().toObservable() +fun FloatArray.toObservable(): Observable = this.asIterable().toObservable() +fun DoubleArray.toObservable(): Observable = this.asIterable().toObservable() fun Array.toObservable(): Observable = Observable.fromArray(*this) fun IntProgression.toObservable(): Observable = @@ -26,14 +19,11 @@ fun IntProgression.toObservable(): Observable = fun Iterator.toObservable(): Observable = toIterable().toObservable() fun Iterable.toObservable(): Observable = Observable.fromIterable(this) -fun Sequence.toObservable(): Observable = Observable.fromIterable(iterator().toIterable()) +fun Sequence.toObservable(): Observable = asIterable().toObservable() fun Iterable>.merge(): Observable = Observable.merge(this.toObservable()) fun Iterable>.mergeDelayError(): Observable = Observable.mergeDelayError(this.toObservable()) -inline fun Observable.fold(initial: R, crossinline body: (R, T) -> R): Single - = reduce(initial) { a, e -> body(a, e) } - /** * Returns Observable that wrap all values into [IndexedValue] and populates corresponding index value. * Works similar to [kotlin.withIndex] @@ -52,20 +42,19 @@ fun Observable.withIndex(): Observable> inline fun Observable.flatMapSequence(crossinline body: (T) -> Sequence): Observable = flatMap { body(it).toObservable() } -fun Observable>.switchOnNext(): Observable = Observable.switchOnNext(this) /** * Observable.combineLatest(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.combineLatest(crossinline combineFunction: (args: List) -> R): Observable +inline fun Iterable>.combineLatest(crossinline combineFunction: (args: List) -> R): Observable = Observable.combineLatest(this) { combineFunction(it.asList().map { it as T }) } /** * Observable.zip(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.zip(crossinline zipFunction: (args: List) -> R): Observable +inline fun Iterable>.zip(crossinline zipFunction: (args: List) -> R): Observable = Observable.zip(this) { zipFunction(it.asList().map { it as T }) } /** @@ -77,3 +66,7 @@ inline fun Observable<*>.cast(): Observable = cast(R::class * Filters the items emitted by an Observable, only emitting those of the specified type. */ inline fun Observable<*>.ofType(): Observable = ofType(R::class.java) + +private fun Iterator.toIterable() = object : Iterable { + override fun iterator(): Iterator = this@toIterable +} \ No newline at end of file diff --git a/src/main/kotlin/io/reactivex/rxkotlin/operators.kt b/src/main/kotlin/io/reactivex/rxkotlin/operators.kt new file mode 100644 index 0000000..08dd148 --- /dev/null +++ b/src/main/kotlin/io/reactivex/rxkotlin/operators.kt @@ -0,0 +1,79 @@ +package io.reactivex.rxkotlin + +import io.reactivex.Flowable +import io.reactivex.Observable +import io.reactivex.Single + +/** + * Merges the emissions of an Observable>. Same as calling `flatMap { it }`. + */ +fun Observable>.mergeAll() = flatMap { it } + +/** + * Merges the emissions of a Flowable>. Same as calling `flatMap { it }`. + */ +fun Flowable>.mergeAll() = flatMap { it } + + +/** + * Concatenates the emissions of an Observable>. Same as calling `concatMap { it }`. + */ +fun Observable>.concatAll() = concatMap { it } + +/** + * Concatenates the emissions of an Flowable>. Same as calling `concatMap { it }`. + */ +fun Flowable>.concatAll() = concatMap { it } + + +fun Observable>.switchOnNext(): Observable = Observable.switchOnNext(this) + + +fun Flowable>.switchOnNext(): Flowable = Flowable.switchOnNext(this) + + +/** + * Emits the latest `Observable` emitted through an `Observable>`. Same as calling `switchMap { it }`. + */ +fun Observable>.switchLatest() = switchMap { it } + + +/** + * Emits the latest `Flowable` emitted through an `Flowable>`. Same as calling `switchMap { it }`. + */ +fun Flowable>.switchLatest() = switchMap { it } + + +/** + * Joins the emissions of a finite `Observable` into a `String`. + * + * @param separator is the dividing character(s) between each element in the concatenated `String` + * + * @param prefix is the preceding `String` before the concatenated elements (optional) + * + * @param postfix is the succeeding `String` after the concatenated elements (optional) + */ +fun Observable.joinToString(separator: String? = null, + prefix: String? = null, + postfix: String? = null +): Single = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T -> + builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next) +}.map { it.append(postfix ?: "").toString() } + + + +/** + * Joins the emissions of a finite `Flowable` into a `String`. + * + * @param separator is the dividing character(s) between each element in the concatenated `String` + * + * @param prefix is the preceding `String` before the concatenated elements (optional) + * + * @param postfix is the succeeding `String` after the concatenated elements (optional) + */ +fun Flowable.joinToString(separator: String? = null, + prefix: String? = null, + postfix: String? = null +): Single = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T -> + builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next) +}.map { it.append(postfix ?: "").toString() } \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/single.kt b/src/main/kotlin/io/reactivex/rxkotlin/single.kt similarity index 72% rename from src/main/kotlin/rx/lang/kotlin/single.kt rename to src/main/kotlin/io/reactivex/rxkotlin/single.kt index c2e2de4..6d3a4e9 100644 --- a/src/main/kotlin/rx/lang/kotlin/single.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/single.kt @@ -1,11 +1,9 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Single -import io.reactivex.SingleEmitter import java.util.concurrent.Callable import java.util.concurrent.Future -inline fun single(crossinline body: (s: SingleEmitter) -> Unit): Single = Single.create { body(it) } fun T.toSingle(): Single = Single.just(this) fun Future.toSingle(): Single = Single.fromFuture(this) fun Callable.toSingle(): Single = Single.fromCallable(this) diff --git a/src/main/kotlin/rx/lang/kotlin/subscription.kt b/src/main/kotlin/io/reactivex/rxkotlin/subscribers.kt similarity index 90% rename from src/main/kotlin/rx/lang/kotlin/subscription.kt rename to src/main/kotlin/io/reactivex/rxkotlin/subscribers.kt index 69f0e2e..6432591 100644 --- a/src/main/kotlin/rx/lang/kotlin/subscription.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/subscribers.kt @@ -1,10 +1,6 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.Maybe -import io.reactivex.Observable -import io.reactivex.Single +import io.reactivex.* import io.reactivex.disposables.Disposable private val onNextStub: (Any) -> Unit = {} diff --git a/src/main/kotlin/rx/lang/kotlin/disposable.kt b/src/main/kotlin/rx/lang/kotlin/disposable.kt deleted file mode 100644 index ca85585..0000000 --- a/src/main/kotlin/rx/lang/kotlin/disposable.kt +++ /dev/null @@ -1,19 +0,0 @@ -package rx.lang.kotlin - -import io.reactivex.disposables.CompositeDisposable -import io.reactivex.disposables.Disposable - -/** - * subscription += observable.subscribe() - */ -operator fun CompositeDisposable.plusAssign(subscription: Disposable) { - add(subscription) -} - -/** - * Add the subscription to a CompositeSubscription. - * @param compositeSubscription CompositeSubscription to add this subscription to - * @return this instance - */ -fun Disposable.addTo(compositeSubscription: CompositeDisposable): Disposable - = apply { compositeSubscription.add(this) } \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/subject.kt b/src/main/kotlin/rx/lang/kotlin/subject.kt deleted file mode 100644 index 7dbd24a..0000000 --- a/src/main/kotlin/rx/lang/kotlin/subject.kt +++ /dev/null @@ -1,13 +0,0 @@ -package rx.lang.kotlin - -import io.reactivex.Observable -import io.reactivex.subjects.AsyncSubject -import io.reactivex.subjects.BehaviorSubject -import io.reactivex.subjects.PublishSubject -import io.reactivex.subjects.ReplaySubject - -fun BehaviorSubject(): BehaviorSubject = BehaviorSubject.create() -fun BehaviorSubject(default: T): BehaviorSubject = BehaviorSubject.createDefault(default) -fun AsyncSubject(): AsyncSubject = AsyncSubject.create() -fun PublishSubject(): PublishSubject = PublishSubject.create() -fun ReplaySubject(capacity: Int = Observable.bufferSize()): ReplaySubject = ReplaySubject.create(capacity) diff --git a/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt b/src/test/kotlin/io/reactivex/rxkotlin/BasicKotlinTests.kt similarity index 97% rename from src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt rename to src/test/kotlin/io/reactivex/rxkotlin/BasicKotlinTests.kt index 80fcd0d..7c930e5 100644 --- a/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/BasicKotlinTests.kt @@ -14,21 +14,15 @@ * limitations under the License. */ -package rx.lang.kotlin +package io.reactivex.rxkotlin -import io.reactivex.Notification -import io.reactivex.Observable -import io.reactivex.ObservableEmitter -import io.reactivex.ObservableOnSubscribe -import io.reactivex.Single +import io.reactivex.* import io.reactivex.functions.BiFunction import io.reactivex.functions.Function3 import org.junit.Assert.assertEquals import org.junit.Assert.fail import org.junit.Test -import org.mockito.Mockito.any -import org.mockito.Mockito.times -import org.mockito.Mockito.verify +import org.mockito.Mockito.* import kotlin.concurrent.thread /** diff --git a/src/test/kotlin/rx/lang/kotlin/CompletableTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/CompletableTest.kt similarity index 94% rename from src/test/kotlin/rx/lang/kotlin/CompletableTest.kt rename to src/test/kotlin/io/reactivex/rxkotlin/CompletableTest.kt index 110714a..c61c26b 100644 --- a/src/test/kotlin/rx/lang/kotlin/CompletableTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/CompletableTest.kt @@ -1,11 +1,11 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Single import io.reactivex.functions.Action import org.junit.Assert.assertEquals import org.junit.Assert.assertNotNull import org.junit.Test -import java.util.NoSuchElementException +import java.util.* import java.util.concurrent.Callable class CompletableTest { diff --git a/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt b/src/test/kotlin/io/reactivex/rxkotlin/ExtensionTests.kt similarity index 81% rename from src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt rename to src/test/kotlin/io/reactivex/rxkotlin/ExtensionTests.kt index c41b09c..08ebde8 100644 --- a/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/ExtensionTests.kt @@ -1,5 +1,5 @@ /** - * Copyright 2013 Netflix, Inc. + * Copyright 2017 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Notification import io.reactivex.Observable @@ -26,10 +26,7 @@ import org.funktionale.partials.invoke import org.junit.Assert.assertEquals import org.junit.Assert.fail import org.junit.Test -import org.mockito.Mockito.any -import org.mockito.Mockito.inOrder -import org.mockito.Mockito.times -import org.mockito.Mockito.verify +import org.mockito.Mockito.* import java.util.concurrent.TimeUnit import kotlin.concurrent.thread @@ -40,7 +37,7 @@ class ExtensionTests : KotlinTests() { @Test fun testCreate() { - observable { subscriber -> + Observable.create { subscriber -> subscriber.onNext("Hello") subscriber.onComplete() }.subscribe { result -> @@ -52,12 +49,11 @@ class ExtensionTests : KotlinTests() { @Test fun testFilter() { listOf(1, 2, 3).toObservable().filter { it >= 2 }.subscribe(received()) - verify(a, times(0)).received(1); - verify(a, times(1)).received(2); - verify(a, times(1)).received(3); + verify(a, times(0)).received(1) + verify(a, times(1)).received(2) + verify(a, times(1)).received(3) } - @Test fun testLast() { assertEquals("three", listOf("one", "two", "three").toObservable().blockingLast()) } @@ -79,7 +75,6 @@ class ExtensionTests : KotlinTests() { verify(a, times(0)).error(any(Exception::class.java)) } - @Test fun testMerge() { listOf(listOf(1, 2, 3).toObservable(), listOf(Observable.just(6), @@ -110,7 +105,6 @@ class ExtensionTests : KotlinTests() { verify(a, times(1)).received("hello_2") } - @Test fun testFromWithIterable() { assertEquals(5, listOf(1, 2, 3, 4, 5).toObservable().count().blockingGet()) } @@ -123,45 +117,64 @@ class ExtensionTests : KotlinTests() { } @Test fun testScriptWithOnNext() { - TestFactory().observable.subscribe(received()) + TestFactory().observable + .subscribe(received()) verify(a, times(1)).received("hello_1") } @Test fun testSkipTake() { - listOf(1, 2, 3).toObservable().skip(1).take(1).subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .skip(1) + .take(1) + .subscribe(received()) verify(a, times(0)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) } @Test fun testSkip() { - listOf(1, 2, 3).toObservable().skip(2).subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .skip(2) + .subscribe(received()) verify(a, times(0)).received(1) verify(a, times(0)).received(2) verify(a, times(1)).received(3) } @Test fun testTake() { - listOf(1, 2, 3).toObservable().take(2).subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .take(2) + .subscribe(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) } @Test fun testTakeLast() { - TestFactory().observable.takeLast(1).subscribe(received()) + TestFactory().observable + .takeLast(1) + .subscribe(received()) verify(a, times(1)).received("hello_1") } @Test fun testTakeWhile() { - listOf(1, 2, 3).toObservable().takeWhile { x -> x < 3 }.subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .takeWhile { x -> x < 3 } + .subscribe(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) } @Test fun testTakeWhileWithIndex() { - listOf(1, 2, 3).toObservable().takeWhile { x -> x < 3 }.zipWith((0..Integer.MAX_VALUE).toObservable(), BiFunction { x, i -> x }).subscribe(received()) + listOf(1, 2, 3).toObservable() + .takeWhile { x -> x < 3 } + .zipWith((0..Integer.MAX_VALUE).toObservable(), BiFunction { x: Int, i: Int -> x }) + .subscribe(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) @@ -173,14 +186,14 @@ class ExtensionTests : KotlinTests() { } @Test fun testForEach() { - observable(asyncObservable).blockingForEach(received()) + Observable.create(asyncObservable).blockingForEach(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(1)).received(3) } @Test(expected = RuntimeException::class) fun testForEachWithError() { - observable(asyncObservable).blockingForEach { throw RuntimeException("err") } + Observable.create(asyncObservable).blockingForEach { throw RuntimeException("err") } fail("we expect an exception to be thrown") } @@ -215,7 +228,7 @@ class ExtensionTests : KotlinTests() { val testScheduler = TestScheduler() val worker = testScheduler.createWorker() - val observable = observable> { s -> + val observable = Observable.create> { s -> fun at(delay: Long, func: () -> Unit) { worker.schedule({ func() @@ -260,6 +273,23 @@ class ExtensionTests : KotlinTests() { } } + @Test + fun testJoinToString1() { + Observable.range(1, 5) + .joinToString(separator = ",") + .test() + .await() + .assertResult("1,2,3,4,5") + } + + @Test + fun testJoinToString2() { + Observable.range(1, 5) + .joinToString(separator = ",", prefix = "(", postfix = ")") + .test() + .await() + .assertResult("(1,2,3,4,5)") + } inner class TestFactory { var counter = 1 @@ -271,7 +301,6 @@ class ExtensionTests : KotlinTests() { get() = funOnSubscribe(p1 = counter++) // partial applied function val observable: Observable - get() = observable(onSubscribe) - + get() = Observable.create(onSubscribe) } } diff --git a/src/test/kotlin/rx/lang/kotlin/FlowableTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt similarity index 74% rename from src/test/kotlin/rx/lang/kotlin/FlowableTest.kt rename to src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt index 2f9f981..8e210b8 100644 --- a/src/test/kotlin/rx/lang/kotlin/FlowableTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt @@ -1,4 +1,4 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Flowable import org.junit.Assert @@ -8,31 +8,34 @@ import java.util.concurrent.atomic.AtomicInteger class FlowableTest { - @Test fun testCreation() { - val o0: Flowable = Flowable.empty() - val list = flowable { s -> + private fun bufferedFlowable(source: (io.reactivex.FlowableEmitter) -> Unit) = + io.reactivex.Flowable.create(source, io.reactivex.BackpressureStrategy.BUFFER) + + @org.junit.Test fun testCreation() { + val o0: io.reactivex.Flowable = io.reactivex.Flowable.empty() + val list = bufferedFlowable { s -> s.onNext(1) s.onNext(777) s.onComplete() }.toList().blockingGet() - Assert.assertEquals(listOf(1, 777), list) - val o1: Flowable = listOf(1, 2, 3).toFlowable() - val o2: Flowable> = Flowable.just(listOf(1, 2, 3)) + org.junit.Assert.assertEquals(listOf(1, 777), list) + val o1: io.reactivex.Flowable = listOf(1, 2, 3).toFlowable() + val o2: io.reactivex.Flowable> = io.reactivex.Flowable.just(listOf(1, 2, 3)) - val o3: Flowable = Flowable.defer { flowable { s -> s.onNext(1) } } - val o4: Flowable = Array(3) { 0 }.toFlowable() - val o5: Flowable = IntArray(3).toFlowable() + val o3: io.reactivex.Flowable = io.reactivex.Flowable.defer { bufferedFlowable { s -> s.onNext(1) } } + val o4: io.reactivex.Flowable = Array(3) { 0 }.toFlowable() + val o5: io.reactivex.Flowable = IntArray(3).toFlowable() - Assert.assertNotNull(o0) - Assert.assertNotNull(o1) - Assert.assertNotNull(o2) - Assert.assertNotNull(o3) - Assert.assertNotNull(o4) - Assert.assertNotNull(o5) + org.junit.Assert.assertNotNull(o0) + org.junit.Assert.assertNotNull(o1) + org.junit.Assert.assertNotNull(o2) + org.junit.Assert.assertNotNull(o3) + org.junit.Assert.assertNotNull(o4) + org.junit.Assert.assertNotNull(o5) } - @Test fun testExampleFromReadme() { - val result = flowable { subscriber -> + @org.junit.Test fun testExampleFromReadme() { + val result = bufferedFlowable { subscriber -> subscriber.onNext("H") subscriber.onNext("e") subscriber.onNext("l") @@ -41,7 +44,7 @@ class FlowableTest { subscriber.onNext("o") subscriber.onComplete() }.filter(String::isNotEmpty). - fold(StringBuilder(), StringBuilder::append). + reduce(StringBuilder(), StringBuilder::append). map { it.toString() }. blockingGet() @@ -90,7 +93,7 @@ class FlowableTest { } @Test fun testFold() { - val result = listOf(1, 2, 3).toFlowable().fold(0) { acc, e -> acc + e }.blockingGet() + val result = listOf(1, 2, 3).toFlowable().reduce(0) { acc, e -> acc + e }.blockingGet() Assert.assertEquals(6, result) } diff --git a/src/test/kotlin/rx/lang/kotlin/KotlinTests.kt b/src/test/kotlin/io/reactivex/rxkotlin/KotlinTests.kt similarity index 97% rename from src/test/kotlin/rx/lang/kotlin/KotlinTests.kt rename to src/test/kotlin/io/reactivex/rxkotlin/KotlinTests.kt index 1f3cb39..0c9eb40 100644 --- a/src/test/kotlin/rx/lang/kotlin/KotlinTests.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/KotlinTests.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package rx.lang.kotlin +package io.reactivex.rxkotlin import org.junit.Before import org.mockito.Mock diff --git a/src/test/kotlin/rx/lang/kotlin/ObservableTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt similarity index 93% rename from src/test/kotlin/rx/lang/kotlin/ObservableTest.kt rename to src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt index 90e727b..45fac28 100644 --- a/src/test/kotlin/rx/lang/kotlin/ObservableTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt @@ -1,4 +1,4 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Observable import io.reactivex.observers.TestObserver @@ -12,7 +12,7 @@ import java.util.concurrent.atomic.AtomicInteger class ObservableTest { @Test fun testCreation() { - val observable = observable { s -> + val observable = Observable.create { s -> s.apply { onNext(1) onNext(777) @@ -26,7 +26,7 @@ class ObservableTest { val o1: Observable = listOf(1, 2, 3).toObservable() val o2: Observable> = Observable.just(listOf(1, 2, 3)) - val o3: Observable = Observable.defer { observable { s -> s.onNext(1) } } + val o3: Observable = Observable.defer { Observable.create { s -> s.onNext(1) } } val o4: Observable = Array(3) { 0 }.toObservable() val o5: Observable = IntArray(3).toObservable() @@ -39,7 +39,7 @@ class ObservableTest { } @Test fun testExampleFromReadme() { - val observable = observable { s -> + val observable = Observable.create { s -> s.apply { onNext("H") onNext("e") @@ -52,7 +52,7 @@ class ObservableTest { } val result = observable .filter(String::isNotEmpty) - .fold(StringBuilder(), StringBuilder::append) + .reduce(StringBuilder(), StringBuilder::append) .map { it.toString() } .blockingGet() @@ -109,8 +109,8 @@ class ObservableTest { subscriber2.assertValues(IndexedValue(0, "a"), IndexedValue(1, "b"), IndexedValue(2, "c")) } - @Test fun testFold() { - val result = listOf(1, 2, 3).toObservable().fold(0) { acc, e -> acc + e }.blockingGet() + @Test fun testReduce() { + val result = listOf(1, 2, 3).toObservable().reduce(0) { acc, e -> acc + e }.blockingGet() assertEquals(6, result) } diff --git a/src/test/kotlin/rx/lang/kotlin/SingleTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/SingleTest.kt similarity index 93% rename from src/test/kotlin/rx/lang/kotlin/SingleTest.kt rename to src/test/kotlin/io/reactivex/rxkotlin/SingleTest.kt index 7fc571c..e97b470 100644 --- a/src/test/kotlin/rx/lang/kotlin/SingleTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/SingleTest.kt @@ -1,6 +1,7 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Observable +import io.reactivex.Single import org.junit.Test import org.mockito.Mockito import org.mockito.Mockito.mock @@ -9,7 +10,7 @@ import java.util.concurrent.Callable class SingleTest : KotlinTests() { @Test fun testCreate() { - single { s -> + Single.create { s -> s.onSuccess("Hello World!") }.subscribe { result -> a.received(result) diff --git a/src/test/kotlin/rx/lang/kotlin/SubscriptionTests.kt b/src/test/kotlin/io/reactivex/rxkotlin/SubscriptionTests.kt similarity index 96% rename from src/test/kotlin/rx/lang/kotlin/SubscriptionTests.kt rename to src/test/kotlin/io/reactivex/rxkotlin/SubscriptionTests.kt index 01c528d..a1bddab 100644 --- a/src/test/kotlin/rx/lang/kotlin/SubscriptionTests.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/SubscriptionTests.kt @@ -1,4 +1,4 @@ -package rx.lang.kotlin +package io.reactivex.rxkotlin import io.reactivex.Observable import io.reactivex.disposables.CompositeDisposable