Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
buildscript {
ext.kotlin_version = '1.0.6'
ext.kotlin_version = '1.1.0'
repositories { jcenter() }
dependencies {
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sun Feb 12 16:33:43 SGT 2017
#Sun Mar 05 07:47:21 SGT 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip
12 changes: 6 additions & 6 deletions src/main/kotlin/rx/lang/kotlin/disposable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

/**
* subscription += observable.subscribe()
* disposable += observable.subscribe()
*/
operator fun CompositeDisposable.plusAssign(subscription: Disposable) {
add(subscription)
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
add(disposable)
}

/**
* Add the subscription to a CompositeSubscription.
* @param compositeSubscription CompositeSubscription to add this subscription to
* @param compositeDisposable CompositeDisposable to add this subscription to
* @return this instance
*/
fun Disposable.addTo(compositeSubscription: CompositeDisposable): Disposable
= apply { compositeSubscription.add(this) }
fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable
= apply { compositeDisposable.add(this) }
43 changes: 16 additions & 27 deletions src/main/kotlin/rx/lang/kotlin/flowable.kt
Original file line number Diff line number Diff line change
@@ -1,43 +1,28 @@
package rx.lang.kotlin

import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.FlowableEmitter
import io.reactivex.Single
import io.reactivex.functions.BiFunction

fun <T : Any> flowable(
strategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
body: (FlowableEmitter<in T>) -> Unit
): Flowable<T> = Flowable.create(body, strategy)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

fun BooleanArray.toFlowable(): Flowable<Boolean> = Flowable.fromArray(*this.toTypedArray())
fun ByteArray.toFlowable(): Flowable<Byte> = Flowable.fromArray(*this.toTypedArray())
fun ShortArray.toFlowable(): Flowable<Short> = Flowable.fromArray(*this.toTypedArray())
fun IntArray.toFlowable(): Flowable<Int> = Flowable.fromArray(*this.toTypedArray())
fun LongArray.toFlowable(): Flowable<Long> = Flowable.fromArray(*this.toTypedArray())
fun FloatArray.toFlowable(): Flowable<Float> = Flowable.fromArray(*this.toTypedArray())
fun DoubleArray.toFlowable(): Flowable<Double> = Flowable.fromArray(*this.toTypedArray())
fun <T : Any> Array<T>.toFlowable(): Flowable<T> = Flowable.fromArray(*this)
fun BooleanArray.toFlowable(): Flowable<Boolean> = this.asIterable().toFlowable()
fun ByteArray.toFlowable(): Flowable<Byte> = this.asIterable().toFlowable()
fun ShortArray.toFlowable(): Flowable<Short> = this.asIterable().toFlowable()
fun IntArray.toFlowable(): Flowable<Int> = this.asIterable().toFlowable()
fun LongArray.toFlowable(): Flowable<Long> = this.asIterable().toFlowable()
fun FloatArray.toFlowable(): Flowable<Float> = this.asIterable().toFlowable()
fun DoubleArray.toFlowable(): Flowable<Double> = this.asIterable().toFlowable()

fun IntProgression.toFlowable(): Flowable<Int> =
if (step == 1 && last.toLong() - first < Integer.MAX_VALUE) Flowable.range(first, Math.max(0, last - first + 1))
else Flowable.fromIterable(this)

fun <T : Any> Iterator<T>.toFlowable(): Flowable<T> = toIterable().toFlowable()
fun <T : Any> Iterable<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(this)
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(iterator().toIterable())
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = asIterable().toFlowable()

fun <T : Any> Iterable<Flowable<out T>>.merge(): Flowable<T> = Flowable.merge(this.toFlowable())
fun <T : Any> Iterable<Flowable<out T>>.mergeDelayError(): Flowable<T> = Flowable.mergeDelayError(this.toFlowable())

inline fun <T : Any, R : Any> Flowable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
= reduce(initial) { a, e -> body(a, e) }

/**
* Returns Flowable that wrap all values into [IndexedValue] and populates corresponding index value.
* Works similar to [kotlin.withIndex]
Expand All @@ -62,14 +47,14 @@ fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switc
* Flowable.combineLatest(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
inline fun <T, R> Iterable<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
= Flowable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }

/**
* Flowable.zip(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
inline fun <T, R> Iterable<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
= Flowable.zip(this) { zipFunction(it.asList().map { it as T }) }

/**
Expand All @@ -78,6 +63,10 @@ inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>)
inline fun <reified R : Any> Flowable<*>.cast(): Flowable<R> = cast(R::class.java)

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* Filters the items emitted by an Flowable, only emitting those of the specified type.
*/
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}
32 changes: 14 additions & 18 deletions src/main/kotlin/rx/lang/kotlin/observable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@ import io.reactivex.ObservableEmitter
import io.reactivex.Single
import io.reactivex.functions.BiFunction

fun <T : Any> observable(body: (ObservableEmitter<in T>) -> Unit): Observable<T> = Observable.create(body)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

fun BooleanArray.toObservable(): Observable<Boolean> = Observable.fromArray(*this.toTypedArray())
fun ByteArray.toObservable(): Observable<Byte> = Observable.fromArray(*this.toTypedArray())
fun ShortArray.toObservable(): Observable<Short> = Observable.fromArray(*this.toTypedArray())
fun IntArray.toObservable(): Observable<Int> = Observable.fromArray(*this.toTypedArray())
fun LongArray.toObservable(): Observable<Long> = Observable.fromArray(*this.toTypedArray())
fun FloatArray.toObservable(): Observable<Float> = Observable.fromArray(*this.toTypedArray())
fun DoubleArray.toObservable(): Observable<Double> = Observable.fromArray(*this.toTypedArray())
fun BooleanArray.toObservable(): Observable<Boolean> = this.asIterable().toObservable()
fun ByteArray.toObservable(): Observable<Byte> = this.asIterable().toObservable()
fun ShortArray.toObservable(): Observable<Short> = this.asIterable().toObservable()
fun IntArray.toObservable(): Observable<Int> = this.asIterable().toObservable()
fun LongArray.toObservable(): Observable<Long> = this.asIterable().toObservable()
fun FloatArray.toObservable(): Observable<Float> = this.asIterable().toObservable()
fun DoubleArray.toObservable(): Observable<Double> = this.asIterable().toObservable()
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)

fun IntProgression.toObservable(): Observable<Int> =
Expand All @@ -26,14 +21,11 @@ fun IntProgression.toObservable(): Observable<Int> =

fun <T : Any> Iterator<T>.toObservable(): Observable<T> = toIterable().toObservable()
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = Observable.fromIterable(iterator().toIterable())
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = asIterable().toObservable()

fun <T : Any> Iterable<Observable<out T>>.merge(): Observable<T> = Observable.merge(this.toObservable())
fun <T : Any> Iterable<Observable<out T>>.mergeDelayError(): Observable<T> = Observable.mergeDelayError(this.toObservable())

inline fun <T : Any, R : Any> Observable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
= reduce(initial) { a, e -> body(a, e) }

/**
* Returns Observable that wrap all values into [IndexedValue] and populates corresponding index value.
* Works similar to [kotlin.withIndex]
Expand All @@ -58,14 +50,14 @@ fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observab
* Observable.combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
inline fun <T, R> Iterable<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
= Observable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }

/**
* Observable.zip(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
inline fun <T, R> Iterable<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
= Observable.zip(this) { zipFunction(it.asList().map { it as T }) }

/**
Expand All @@ -77,3 +69,7 @@ inline fun <reified R : Any> Observable<*>.cast(): Observable<R> = cast(R::class
* Filters the items emitted by an Observable, only emitting those of the specified type.
*/
inline fun <reified R : Any> Observable<*>.ofType(): Observable<R> = ofType(R::class.java)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}
73 changes: 73 additions & 0 deletions src/main/kotlin/rx/lang/kotlin/operators.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package rx.lang.kotlin

import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.Single

/**
* Merges the emissions of an Observable<Observable<T>>. Same as calling `flatMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.mergeAll() = flatMap { it }

/**
* Merges the emissions of a Flowable<Flowable<T>>. Same as calling `flatMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.mergeAll() = flatMap { it }


/**
* Concatenates the emissions of an Observable<Observable<T>>. Same as calling `concatMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.concatAll() = concatMap { it }

/**
* Concatenates the emissions of an Flowable<Flowable<T>>. Same as calling `concatMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.concatAll() = concatMap { it }


/**
* Emits the latest `Observable<T>` emitted through an `Observable<Observable<T>>`. Same as calling `switchMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.switchLatest() = switchMap { it }


/**
* Emits the latest `Flowable<T>` emitted through an `Flowable<Flowable<T>>`. Same as calling `switchMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.switchLatest() = switchMap { it }


/**
* Joins the emissions of a finite `Observable` into a `String`.
*
* @param separator is the dividing character(s) between each element in the concatenated `String`
*
* @param prefix is the preceding `String` before the concatenated elements (optional)
*
* @param postfix is the succeeding `String` after the concatenated elements (optional)
*/
fun <T : Any> Observable<T>.joinToString(separator: String? = null,
prefix: String? = null,
postfix: String? = null
): Single<String> = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T ->
builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next)
}.map { it.append(postfix ?: "").toString() }



/**
* Joins the emissions of a finite `Flowable` into a `String`.
*
* @param separator is the dividing character(s) between each element in the concatenated `String`
*
* @param prefix is the preceding `String` before the concatenated elements (optional)
*
* @param postfix is the succeeding `String` after the concatenated elements (optional)
*/
fun <T : Any> Flowable<T>.joinToString(separator: String? = null,
prefix: String? = null,
postfix: String? = null
): Single<String> = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T ->
builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next)
}.map { it.append(postfix ?: "").toString() }
1 change: 0 additions & 1 deletion src/main/kotlin/rx/lang/kotlin/single.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import io.reactivex.SingleEmitter
import java.util.concurrent.Callable
import java.util.concurrent.Future

inline fun <T : Any> single(crossinline body: (s: SingleEmitter<in T>) -> Unit): Single<T> = Single.create { body(it) }
fun <T : Any> T.toSingle(): Single<T> = Single.just(this)
fun <T : Any> Future<T>.toSingle(): Single<T> = Single.fromFuture(this)
fun <T : Any> Callable<T>.toSingle(): Single<T> = Single.fromCallable(this)
Expand Down
13 changes: 0 additions & 13 deletions src/main/kotlin/rx/lang/kotlin/subject.kt

This file was deleted.

Loading