Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1aa3555
Dependencies updated
stepango Sep 6, 2016
3f2e99a
Initial implementation of rxKotlin for rxJava2
stepango Jan 7, 2017
0f2ee08
Code cleaning.
stepango Jan 8, 2017
54b37af
Whitespaces removed.
stepango Jan 8, 2017
ef24fa4
gitignore update
stepango Jan 8, 2017
2adeec2
subscribers replaced by named args extension
stepango Jan 13, 2017
5478cec
subscribeBy non-null params
stepango Jan 13, 2017
06f3f03
empty* methods removed
stepango Jan 24, 2017
8b1bbd6
dependencies updated
stepango Feb 12, 2017
02492c2
subscribeBy method for Flowable
stepango Feb 12, 2017
484cbab
tests updated
stepango Feb 12, 2017
e117048
minor tests refactoring
stepango Feb 12, 2017
564b043
* JoinToString method and tests ported from rxKotlin 1.0
stepango Mar 7, 2017
061899f
Merge branch '2.x' of https://github.com/ReactiveX/RxKotlin into 2.x
stepango Mar 7, 2017
48c0570
Minor formatting fix
stepango Mar 7, 2017
b677898
More formatting fixes
stepango Mar 7, 2017
c5d5aa6
Refactor "subscription" references to "disposable"
thomasnield Mar 8, 2017
816c227
refactor `flowable.kt` and `observable.kt` to match 1.x changes
thomasnield Mar 8, 2017
d8881f7
add `Flowable` support for operators
thomasnield Mar 8, 2017
307b228
update `single` to match 1.x changes
thomasnield Mar 8, 2017
2ca695a
rid Subject functions
thomasnield Mar 8, 2017
c5837a1
update readme to reflect `onComplete`
thomasnield Mar 8, 2017
657274b
Merge branch '2.x' into 2.x-dev
thomasnield Mar 8, 2017
1d81d6f
Fix 2.x extension tests
thomasnield Mar 8, 2017
6d62302
add Array<T>.toFlowable()
thomasnield Mar 8, 2017
2310f73
update and fix compile errors for tests in 2.x
thomasnield Mar 8, 2017
15d9e01
move swtichOnNext()
thomasnield Mar 8, 2017
3b623a9
git commit -m 'refactor package domain to io.reactivex.rxkotlin'
thomasnield Mar 11, 2017
ff43277
optimize imports
thomasnield Mar 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fun main(args: Array<String>) {
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onCompleted = { println("Done!") }
onComplete = { println("Done!") }
)

}
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
buildscript {
ext.kotlin_version = '1.0.6'
ext.kotlin_version = '1.1.0'
repositories { jcenter() }
dependencies {
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sun Feb 12 16:33:43 SGT 2017
#Sun Mar 05 07:47:21 SGT 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package rx.lang.kotlin.examples
package io.reactivex.rxkotlin.examples

import io.reactivex.Observable
import io.reactivex.disposables.CompositeDisposable
import rx.lang.kotlin.addTo
import rx.lang.kotlin.combineLatest
import rx.lang.kotlin.observable
import rx.lang.kotlin.plusAssign
import rx.lang.kotlin.subscribeBy
import rx.lang.kotlin.toObservable
import rx.lang.kotlin.zip
import io.reactivex.rxkotlin.*
import io.reactivex.rxkotlin.combineLatest
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import io.reactivex.rxkotlin.zip
import java.net.URL
import java.util.Scanner
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -50,37 +48,37 @@ fun main(args: Array<String>) {
addToCompositeSubscription()
}

private fun URL.toScannerObservable() = observable<String> { s ->
private fun URL.toScannerObservable() = Observable.create<String> { s ->
this.openStream().use { stream ->
Scanner(stream).useDelimiter("\\A")
.toObservable()
.subscribe { s.onNext(it) }
}
}

fun syncObservable(): Observable<String> = observable { subscriber ->
fun syncObservable(): Observable<String> = Observable.create { subscriber ->
(0..75).toObservable()
.map { "Sync value_$it" }
.subscribe { subscriber.onNext(it) }
}

fun asyncObservable(): Observable<String> = observable { subscriber ->
fun asyncObservable(): Observable<String> = Observable.create { subscriber ->
thread {
(0..75).toObservable()
.map { "Async value_$it" }
.subscribe { subscriber.onNext(it) }
}
}

fun asyncWiki(vararg articleNames: String): Observable<String> = observable { subscriber ->
fun asyncWiki(vararg articleNames: String): Observable<String> = Observable.create { subscriber ->
thread {
articleNames.toObservable()
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
.subscribe { subscriber.onNext(it) }
}
}

fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = observable { subscriber ->
fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = Observable.create { subscriber ->
thread {
articleNames.toObservable()
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rx.lang.kotlin.examples.retrofit
package io.reactivex.rxkotlin.examples.retrofit

import io.reactivex.Observable
import retrofit.RestAdapter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Completable
import io.reactivex.functions.Action
Expand Down
19 changes: 19 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/disposable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.reactivex.rxkotlin

import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

/**
* disposable += observable.subscribe()
*/
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
add(disposable)
}

/**
* Add the subscription to a CompositeSubscription.
* @param compositeDisposable CompositeDisposable to add this subscription to
* @return this instance
*/
fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable
= apply { compositeDisposable.add(this) }
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.FlowableEmitter
import io.reactivex.Single
import io.reactivex.functions.BiFunction

fun <T : Any> flowable(
strategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
body: (FlowableEmitter<in T>) -> Unit
): Flowable<T> = Flowable.create(body, strategy)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

fun BooleanArray.toFlowable(): Flowable<Boolean> = Flowable.fromArray(*this.toTypedArray())
fun ByteArray.toFlowable(): Flowable<Byte> = Flowable.fromArray(*this.toTypedArray())
fun ShortArray.toFlowable(): Flowable<Short> = Flowable.fromArray(*this.toTypedArray())
fun IntArray.toFlowable(): Flowable<Int> = Flowable.fromArray(*this.toTypedArray())
fun LongArray.toFlowable(): Flowable<Long> = Flowable.fromArray(*this.toTypedArray())
fun FloatArray.toFlowable(): Flowable<Float> = Flowable.fromArray(*this.toTypedArray())
fun DoubleArray.toFlowable(): Flowable<Double> = Flowable.fromArray(*this.toTypedArray())
fun BooleanArray.toFlowable(): Flowable<Boolean> = this.asIterable().toFlowable()
fun ByteArray.toFlowable(): Flowable<Byte> = this.asIterable().toFlowable()
fun ShortArray.toFlowable(): Flowable<Short> = this.asIterable().toFlowable()
fun IntArray.toFlowable(): Flowable<Int> = this.asIterable().toFlowable()
fun LongArray.toFlowable(): Flowable<Long> = this.asIterable().toFlowable()
fun FloatArray.toFlowable(): Flowable<Float> = this.asIterable().toFlowable()
fun DoubleArray.toFlowable(): Flowable<Double> = this.asIterable().toFlowable()
fun <T : Any> Array<T>.toFlowable(): Flowable<T> = Flowable.fromArray(*this)

fun IntProgression.toFlowable(): Flowable<Int> =
Expand All @@ -30,14 +19,11 @@ fun IntProgression.toFlowable(): Flowable<Int> =

fun <T : Any> Iterator<T>.toFlowable(): Flowable<T> = toIterable().toFlowable()
fun <T : Any> Iterable<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(this)
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(iterator().toIterable())
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = asIterable().toFlowable()

fun <T : Any> Iterable<Flowable<out T>>.merge(): Flowable<T> = Flowable.merge(this.toFlowable())
fun <T : Any> Iterable<Flowable<out T>>.mergeDelayError(): Flowable<T> = Flowable.mergeDelayError(this.toFlowable())

inline fun <T : Any, R : Any> Flowable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
= reduce(initial) { a, e -> body(a, e) }

/**
* Returns Flowable that wrap all values into [IndexedValue] and populates corresponding index value.
* Works similar to [kotlin.withIndex]
Expand All @@ -56,20 +42,19 @@ fun <T : Any> Flowable<T>.withIndex(): Flowable<IndexedValue<T>>
inline fun <T : Any, R : Any> Flowable<T>.flatMapSequence(crossinline body: (T) -> Sequence<R>): Flowable<R>
= flatMap { body(it).toFlowable() }

fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)

/**
* Flowable.combineLatest(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
inline fun <T, R> Iterable<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
= Flowable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }

/**
* Flowable.zip(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
inline fun <T, R> Iterable<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
= Flowable.zip(this) { zipFunction(it.asList().map { it as T }) }

/**
Expand All @@ -78,6 +63,10 @@ inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>)
inline fun <reified R : Any> Flowable<*>.cast(): Flowable<R> = cast(R::class.java)

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* Filters the items emitted by an Flowable, only emitting those of the specified type.
*/
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Maybe
import java.util.concurrent.Callable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.Single
import io.reactivex.functions.BiFunction

fun <T : Any> observable(body: (ObservableEmitter<in T>) -> Unit): Observable<T> = Observable.create(body)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

fun BooleanArray.toObservable(): Observable<Boolean> = Observable.fromArray(*this.toTypedArray())
fun ByteArray.toObservable(): Observable<Byte> = Observable.fromArray(*this.toTypedArray())
fun ShortArray.toObservable(): Observable<Short> = Observable.fromArray(*this.toTypedArray())
fun IntArray.toObservable(): Observable<Int> = Observable.fromArray(*this.toTypedArray())
fun LongArray.toObservable(): Observable<Long> = Observable.fromArray(*this.toTypedArray())
fun FloatArray.toObservable(): Observable<Float> = Observable.fromArray(*this.toTypedArray())
fun DoubleArray.toObservable(): Observable<Double> = Observable.fromArray(*this.toTypedArray())
fun BooleanArray.toObservable(): Observable<Boolean> = this.asIterable().toObservable()
fun ByteArray.toObservable(): Observable<Byte> = this.asIterable().toObservable()
fun ShortArray.toObservable(): Observable<Short> = this.asIterable().toObservable()
fun IntArray.toObservable(): Observable<Int> = this.asIterable().toObservable()
fun LongArray.toObservable(): Observable<Long> = this.asIterable().toObservable()
fun FloatArray.toObservable(): Observable<Float> = this.asIterable().toObservable()
fun DoubleArray.toObservable(): Observable<Double> = this.asIterable().toObservable()
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)

fun IntProgression.toObservable(): Observable<Int> =
Expand All @@ -26,14 +19,11 @@ fun IntProgression.toObservable(): Observable<Int> =

fun <T : Any> Iterator<T>.toObservable(): Observable<T> = toIterable().toObservable()
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = Observable.fromIterable(iterator().toIterable())
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = asIterable().toObservable()

fun <T : Any> Iterable<Observable<out T>>.merge(): Observable<T> = Observable.merge(this.toObservable())
fun <T : Any> Iterable<Observable<out T>>.mergeDelayError(): Observable<T> = Observable.mergeDelayError(this.toObservable())

inline fun <T : Any, R : Any> Observable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
= reduce(initial) { a, e -> body(a, e) }

/**
* Returns Observable that wrap all values into [IndexedValue] and populates corresponding index value.
* Works similar to [kotlin.withIndex]
Expand All @@ -52,20 +42,19 @@ fun <T : Any> Observable<T>.withIndex(): Observable<IndexedValue<T>>
inline fun <T : Any, R : Any> Observable<T>.flatMapSequence(crossinline body: (T) -> Sequence<R>): Observable<R>
= flatMap { body(it).toObservable() }

fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)

/**
* Observable.combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
inline fun <T, R> Iterable<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
= Observable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }

/**
* Observable.zip(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
inline fun <T, R> Iterable<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
= Observable.zip(this) { zipFunction(it.asList().map { it as T }) }

/**
Expand All @@ -77,3 +66,7 @@ inline fun <reified R : Any> Observable<*>.cast(): Observable<R> = cast(R::class
* Filters the items emitted by an Observable, only emitting those of the specified type.
*/
inline fun <reified R : Any> Observable<*>.ofType(): Observable<R> = ofType(R::class.java)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}
79 changes: 79 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/operators.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.reactivex.rxkotlin

import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.Single

/**
* Merges the emissions of an Observable<Observable<T>>. Same as calling `flatMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.mergeAll() = flatMap { it }

/**
* Merges the emissions of a Flowable<Flowable<T>>. Same as calling `flatMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.mergeAll() = flatMap { it }


/**
* Concatenates the emissions of an Observable<Observable<T>>. Same as calling `concatMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.concatAll() = concatMap { it }

/**
* Concatenates the emissions of an Flowable<Flowable<T>>. Same as calling `concatMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.concatAll() = concatMap { it }


fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)


fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)


/**
* Emits the latest `Observable<T>` emitted through an `Observable<Observable<T>>`. Same as calling `switchMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.switchLatest() = switchMap { it }


/**
* Emits the latest `Flowable<T>` emitted through an `Flowable<Flowable<T>>`. Same as calling `switchMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.switchLatest() = switchMap { it }


/**
* Joins the emissions of a finite `Observable` into a `String`.
*
* @param separator is the dividing character(s) between each element in the concatenated `String`
*
* @param prefix is the preceding `String` before the concatenated elements (optional)
*
* @param postfix is the succeeding `String` after the concatenated elements (optional)
*/
fun <T : Any> Observable<T>.joinToString(separator: String? = null,
prefix: String? = null,
postfix: String? = null
): Single<String> = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T ->
builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next)
}.map { it.append(postfix ?: "").toString() }



/**
* Joins the emissions of a finite `Flowable` into a `String`.
*
* @param separator is the dividing character(s) between each element in the concatenated `String`
*
* @param prefix is the preceding `String` before the concatenated elements (optional)
*
* @param postfix is the succeeding `String` after the concatenated elements (optional)
*/
fun <T : Any> Flowable<T>.joinToString(separator: String? = null,
prefix: String? = null,
postfix: String? = null
): Single<String> = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T ->
builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next)
}.map { it.append(postfix ?: "").toString() }
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Single
import io.reactivex.SingleEmitter
import java.util.concurrent.Callable
import java.util.concurrent.Future

inline fun <T : Any> single(crossinline body: (s: SingleEmitter<in T>) -> Unit): Single<T> = Single.create { body(it) }
fun <T : Any> T.toSingle(): Single<T> = Single.just(this)
fun <T : Any> Future<T>.toSingle(): Single<T> = Single.fromFuture(this)
fun <T : Any> Callable<T>.toSingle(): Single<T> = Single.fromCallable(this)
Expand Down
Loading