diff --git a/build.gradle b/build.gradle index dd4ee5d..e12f734 100755 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ buildscript { - ext.kotlin_version = '1.0.6' + ext.kotlin_version = '1.1.0' repositories { jcenter() } dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+', diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b080e76..9bc0a69 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Sun Feb 12 16:33:43 SGT 2017 +#Sun Mar 05 07:47:21 SGT 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip diff --git a/src/main/kotlin/rx/lang/kotlin/disposable.kt b/src/main/kotlin/rx/lang/kotlin/disposable.kt index ca85585..d442e90 100644 --- a/src/main/kotlin/rx/lang/kotlin/disposable.kt +++ b/src/main/kotlin/rx/lang/kotlin/disposable.kt @@ -4,16 +4,16 @@ import io.reactivex.disposables.CompositeDisposable import io.reactivex.disposables.Disposable /** - * subscription += observable.subscribe() + * disposable += observable.subscribe() */ -operator fun CompositeDisposable.plusAssign(subscription: Disposable) { - add(subscription) +operator fun CompositeDisposable.plusAssign(disposable: Disposable) { + add(disposable) } /** * Add the subscription to a CompositeSubscription. - * @param compositeSubscription CompositeSubscription to add this subscription to + * @param compositeDisposable CompositeDisposable to add this subscription to * @return this instance */ -fun Disposable.addTo(compositeSubscription: CompositeDisposable): Disposable - = apply { compositeSubscription.add(this) } \ No newline at end of file +fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable + = apply { compositeDisposable.add(this) } \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/flowable.kt b/src/main/kotlin/rx/lang/kotlin/flowable.kt index 1a348ee..12ba34e 100644 --- a/src/main/kotlin/rx/lang/kotlin/flowable.kt +++ b/src/main/kotlin/rx/lang/kotlin/flowable.kt @@ -1,28 +1,16 @@ package rx.lang.kotlin -import io.reactivex.BackpressureStrategy import io.reactivex.Flowable -import io.reactivex.FlowableEmitter -import io.reactivex.Single import io.reactivex.functions.BiFunction -fun flowable( - strategy: BackpressureStrategy = BackpressureStrategy.BUFFER, - body: (FlowableEmitter) -> Unit -): Flowable = Flowable.create(body, strategy) -private fun Iterator.toIterable() = object : Iterable { - override fun iterator(): Iterator = this@toIterable -} - -fun BooleanArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun ByteArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun ShortArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun IntArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun LongArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun FloatArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun DoubleArray.toFlowable(): Flowable = Flowable.fromArray(*this.toTypedArray()) -fun Array.toFlowable(): Flowable = Flowable.fromArray(*this) +fun BooleanArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun ByteArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun ShortArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun IntArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun LongArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun FloatArray.toFlowable(): Flowable = this.asIterable().toFlowable() +fun DoubleArray.toFlowable(): Flowable = this.asIterable().toFlowable() fun IntProgression.toFlowable(): Flowable = if (step == 1 && last.toLong() - first < Integer.MAX_VALUE) Flowable.range(first, Math.max(0, last - first + 1)) @@ -30,14 +18,11 @@ fun IntProgression.toFlowable(): Flowable = fun Iterator.toFlowable(): Flowable = toIterable().toFlowable() fun Iterable.toFlowable(): Flowable = Flowable.fromIterable(this) -fun Sequence.toFlowable(): Flowable = Flowable.fromIterable(iterator().toIterable()) +fun Sequence.toFlowable(): Flowable = asIterable().toFlowable() fun Iterable>.merge(): Flowable = Flowable.merge(this.toFlowable()) fun Iterable>.mergeDelayError(): Flowable = Flowable.mergeDelayError(this.toFlowable()) -inline fun Flowable.fold(initial: R, crossinline body: (R, T) -> R): Single - = reduce(initial) { a, e -> body(a, e) } - /** * Returns Flowable that wrap all values into [IndexedValue] and populates corresponding index value. * Works similar to [kotlin.withIndex] @@ -62,14 +47,14 @@ fun Flowable>.switchOnNext(): Flowable = Flowable.switc * Flowable.combineLatest(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.combineLatest(crossinline combineFunction: (args: List) -> R): Flowable +inline fun Iterable>.combineLatest(crossinline combineFunction: (args: List) -> R): Flowable = Flowable.combineLatest(this) { combineFunction(it.asList().map { it as T }) } /** * Flowable.zip(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.zip(crossinline zipFunction: (args: List) -> R): Flowable +inline fun Iterable>.zip(crossinline zipFunction: (args: List) -> R): Flowable = Flowable.zip(this) { zipFunction(it.asList().map { it as T }) } /** @@ -78,6 +63,10 @@ inline fun List>.zip(crossinline zipFunction: (args: List) inline fun Flowable<*>.cast(): Flowable = cast(R::class.java) /** - * Filters the items emitted by an Observable, only emitting those of the specified type. + * Filters the items emitted by an Flowable, only emitting those of the specified type. */ -inline fun Flowable<*>.ofType(): Flowable = ofType(R::class.java) \ No newline at end of file +inline fun Flowable<*>.ofType(): Flowable = ofType(R::class.java) + +private fun Iterator.toIterable() = object : Iterable { + override fun iterator(): Iterator = this@toIterable +} \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/observable.kt b/src/main/kotlin/rx/lang/kotlin/observable.kt index 0c30616..eefd51e 100644 --- a/src/main/kotlin/rx/lang/kotlin/observable.kt +++ b/src/main/kotlin/rx/lang/kotlin/observable.kt @@ -5,19 +5,14 @@ import io.reactivex.ObservableEmitter import io.reactivex.Single import io.reactivex.functions.BiFunction -fun observable(body: (ObservableEmitter) -> Unit): Observable = Observable.create(body) -private fun Iterator.toIterable() = object : Iterable { - override fun iterator(): Iterator = this@toIterable -} - -fun BooleanArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun ByteArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun ShortArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun IntArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun LongArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun FloatArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) -fun DoubleArray.toObservable(): Observable = Observable.fromArray(*this.toTypedArray()) +fun BooleanArray.toObservable(): Observable = this.asIterable().toObservable() +fun ByteArray.toObservable(): Observable = this.asIterable().toObservable() +fun ShortArray.toObservable(): Observable = this.asIterable().toObservable() +fun IntArray.toObservable(): Observable = this.asIterable().toObservable() +fun LongArray.toObservable(): Observable = this.asIterable().toObservable() +fun FloatArray.toObservable(): Observable = this.asIterable().toObservable() +fun DoubleArray.toObservable(): Observable = this.asIterable().toObservable() fun Array.toObservable(): Observable = Observable.fromArray(*this) fun IntProgression.toObservable(): Observable = @@ -26,14 +21,11 @@ fun IntProgression.toObservable(): Observable = fun Iterator.toObservable(): Observable = toIterable().toObservable() fun Iterable.toObservable(): Observable = Observable.fromIterable(this) -fun Sequence.toObservable(): Observable = Observable.fromIterable(iterator().toIterable()) +fun Sequence.toObservable(): Observable = asIterable().toObservable() fun Iterable>.merge(): Observable = Observable.merge(this.toObservable()) fun Iterable>.mergeDelayError(): Observable = Observable.mergeDelayError(this.toObservable()) -inline fun Observable.fold(initial: R, crossinline body: (R, T) -> R): Single - = reduce(initial) { a, e -> body(a, e) } - /** * Returns Observable that wrap all values into [IndexedValue] and populates corresponding index value. * Works similar to [kotlin.withIndex] @@ -58,14 +50,14 @@ fun Observable>.switchOnNext(): Observable = Observab * Observable.combineLatest(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.combineLatest(crossinline combineFunction: (args: List) -> R): Observable +inline fun Iterable>.combineLatest(crossinline combineFunction: (args: List) -> R): Observable = Observable.combineLatest(this) { combineFunction(it.asList().map { it as T }) } /** * Observable.zip(List> sources, FuncN combineFunction) */ @Suppress("UNCHECKED_CAST") -inline fun List>.zip(crossinline zipFunction: (args: List) -> R): Observable +inline fun Iterable>.zip(crossinline zipFunction: (args: List) -> R): Observable = Observable.zip(this) { zipFunction(it.asList().map { it as T }) } /** @@ -77,3 +69,7 @@ inline fun Observable<*>.cast(): Observable = cast(R::class * Filters the items emitted by an Observable, only emitting those of the specified type. */ inline fun Observable<*>.ofType(): Observable = ofType(R::class.java) + +private fun Iterator.toIterable() = object : Iterable { + override fun iterator(): Iterator = this@toIterable +} \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/operators.kt b/src/main/kotlin/rx/lang/kotlin/operators.kt new file mode 100644 index 0000000..eda0e67 --- /dev/null +++ b/src/main/kotlin/rx/lang/kotlin/operators.kt @@ -0,0 +1,73 @@ +package rx.lang.kotlin + +import io.reactivex.Flowable +import io.reactivex.Observable +import io.reactivex.Single + +/** + * Merges the emissions of an Observable>. Same as calling `flatMap { it }`. + */ +fun Observable>.mergeAll() = flatMap { it } + +/** + * Merges the emissions of a Flowable>. Same as calling `flatMap { it }`. + */ +fun Flowable>.mergeAll() = flatMap { it } + + +/** + * Concatenates the emissions of an Observable>. Same as calling `concatMap { it }`. + */ +fun Observable>.concatAll() = concatMap { it } + +/** + * Concatenates the emissions of an Flowable>. Same as calling `concatMap { it }`. + */ +fun Flowable>.concatAll() = concatMap { it } + + +/** + * Emits the latest `Observable` emitted through an `Observable>`. Same as calling `switchMap { it }`. + */ +fun Observable>.switchLatest() = switchMap { it } + + +/** + * Emits the latest `Flowable` emitted through an `Flowable>`. Same as calling `switchMap { it }`. + */ +fun Flowable>.switchLatest() = switchMap { it } + + +/** + * Joins the emissions of a finite `Observable` into a `String`. + * + * @param separator is the dividing character(s) between each element in the concatenated `String` + * + * @param prefix is the preceding `String` before the concatenated elements (optional) + * + * @param postfix is the succeeding `String` after the concatenated elements (optional) + */ +fun Observable.joinToString(separator: String? = null, + prefix: String? = null, + postfix: String? = null +): Single = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T -> + builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next) +}.map { it.append(postfix ?: "").toString() } + + + +/** + * Joins the emissions of a finite `Flowable` into a `String`. + * + * @param separator is the dividing character(s) between each element in the concatenated `String` + * + * @param prefix is the preceding `String` before the concatenated elements (optional) + * + * @param postfix is the succeeding `String` after the concatenated elements (optional) + */ +fun Flowable.joinToString(separator: String? = null, + prefix: String? = null, + postfix: String? = null +): Single = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T -> + builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next) +}.map { it.append(postfix ?: "").toString() } \ No newline at end of file diff --git a/src/main/kotlin/rx/lang/kotlin/single.kt b/src/main/kotlin/rx/lang/kotlin/single.kt index c2e2de4..6f6f7be 100644 --- a/src/main/kotlin/rx/lang/kotlin/single.kt +++ b/src/main/kotlin/rx/lang/kotlin/single.kt @@ -5,7 +5,6 @@ import io.reactivex.SingleEmitter import java.util.concurrent.Callable import java.util.concurrent.Future -inline fun single(crossinline body: (s: SingleEmitter) -> Unit): Single = Single.create { body(it) } fun T.toSingle(): Single = Single.just(this) fun Future.toSingle(): Single = Single.fromFuture(this) fun Callable.toSingle(): Single = Single.fromCallable(this) diff --git a/src/main/kotlin/rx/lang/kotlin/subject.kt b/src/main/kotlin/rx/lang/kotlin/subject.kt deleted file mode 100644 index 7dbd24a..0000000 --- a/src/main/kotlin/rx/lang/kotlin/subject.kt +++ /dev/null @@ -1,13 +0,0 @@ -package rx.lang.kotlin - -import io.reactivex.Observable -import io.reactivex.subjects.AsyncSubject -import io.reactivex.subjects.BehaviorSubject -import io.reactivex.subjects.PublishSubject -import io.reactivex.subjects.ReplaySubject - -fun BehaviorSubject(): BehaviorSubject = BehaviorSubject.create() -fun BehaviorSubject(default: T): BehaviorSubject = BehaviorSubject.createDefault(default) -fun AsyncSubject(): AsyncSubject = AsyncSubject.create() -fun PublishSubject(): PublishSubject = PublishSubject.create() -fun ReplaySubject(capacity: Int = Observable.bufferSize()): ReplaySubject = ReplaySubject.create(capacity) diff --git a/src/main/kotlin/rx/lang/kotlin/subscription.kt b/src/main/kotlin/rx/lang/kotlin/subscribers.kt similarity index 100% rename from src/main/kotlin/rx/lang/kotlin/subscription.kt rename to src/main/kotlin/rx/lang/kotlin/subscribers.kt diff --git a/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt b/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt index c41b09c..fc93a33 100644 --- a/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt +++ b/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt @@ -52,12 +52,11 @@ class ExtensionTests : KotlinTests() { @Test fun testFilter() { listOf(1, 2, 3).toObservable().filter { it >= 2 }.subscribe(received()) - verify(a, times(0)).received(1); - verify(a, times(1)).received(2); - verify(a, times(1)).received(3); + verify(a, times(0)).received(1) + verify(a, times(1)).received(2) + verify(a, times(1)).received(3) } - @Test fun testLast() { assertEquals("three", listOf("one", "two", "three").toObservable().blockingLast()) } @@ -79,7 +78,6 @@ class ExtensionTests : KotlinTests() { verify(a, times(0)).error(any(Exception::class.java)) } - @Test fun testMerge() { listOf(listOf(1, 2, 3).toObservable(), listOf(Observable.just(6), @@ -110,7 +108,6 @@ class ExtensionTests : KotlinTests() { verify(a, times(1)).received("hello_2") } - @Test fun testFromWithIterable() { assertEquals(5, listOf(1, 2, 3, 4, 5).toObservable().count().blockingGet()) } @@ -123,45 +120,64 @@ class ExtensionTests : KotlinTests() { } @Test fun testScriptWithOnNext() { - TestFactory().observable.subscribe(received()) + TestFactory().observable + .subscribe(received()) verify(a, times(1)).received("hello_1") } @Test fun testSkipTake() { - listOf(1, 2, 3).toObservable().skip(1).take(1).subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .skip(1) + .take(1) + .subscribe(received()) verify(a, times(0)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) } @Test fun testSkip() { - listOf(1, 2, 3).toObservable().skip(2).subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .skip(2) + .subscribe(received()) verify(a, times(0)).received(1) verify(a, times(0)).received(2) verify(a, times(1)).received(3) } @Test fun testTake() { - listOf(1, 2, 3).toObservable().take(2).subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .take(2) + .subscribe(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) } @Test fun testTakeLast() { - TestFactory().observable.takeLast(1).subscribe(received()) + TestFactory().observable + .takeLast(1) + .subscribe(received()) verify(a, times(1)).received("hello_1") } @Test fun testTakeWhile() { - listOf(1, 2, 3).toObservable().takeWhile { x -> x < 3 }.subscribe(received()) + listOf(1, 2, 3) + .toObservable() + .takeWhile { x -> x < 3 } + .subscribe(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) } @Test fun testTakeWhileWithIndex() { - listOf(1, 2, 3).toObservable().takeWhile { x -> x < 3 }.zipWith((0..Integer.MAX_VALUE).toObservable(), BiFunction { x, i -> x }).subscribe(received()) + listOf(1, 2, 3).toObservable() + .takeWhile { x -> x < 3 } + .zipWith((0..Integer.MAX_VALUE).toObservable(), BiFunction { x: Int, i: Int -> x }) + .subscribe(received()) verify(a, times(1)).received(1) verify(a, times(1)).received(2) verify(a, times(0)).received(3) @@ -260,6 +276,23 @@ class ExtensionTests : KotlinTests() { } } + @Test + fun testJoinToString1() { + Observable.range(1, 5) + .joinToString(separator = ",") + .test() + .await() + .assertResult("1,2,3,4,5") + } + + @Test + fun testJoinToString2() { + Observable.range(1, 5) + .joinToString(separator = ",", prefix = "(", postfix = ")") + .test() + .await() + .assertResult("(1,2,3,4,5)") + } inner class TestFactory { var counter = 1 @@ -272,6 +305,5 @@ class ExtensionTests : KotlinTests() { val observable: Observable get() = observable(onSubscribe) - } }