Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions language-adaptors/rxjava-kotlin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
Kotlin has support for SAM (Single Abstract Method) Interfaces as Functions (i.e. Java 8 Lambdas). So you could use Kotlin in RxJava whitout this adaptor

```kotlin
Observable.create<String>{ observer ->
observer!!.onNext("Hello")
observer.onCompleted()
Observable.create(OnSubscribeFunc<String> {
it!!.onNext("Hello")
it.onCompleted()
Subscriptions.empty()
}!!.subscribe { result ->
})!!.subscribe { result ->
a!!.received(result)
}
```
Expand All @@ -21,7 +21,7 @@ import rx.lang.kotlin.*
observer.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()!!
}.asObservable().subscribe { result ->
}.asObservableFunc().subscribe { result ->
a!!.received(result)
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@

package rx.lang.kotlin

import rx.Subscription
import rx.Observer
import rx.Observable
import rx.Observable.OnSubscribe
import rx.Subscription
import rx.Observable.OnSubscribeFunc


public fun<T> Function1<Observer<in T>, Unit>.asObservable(): Observable<T> {
return Observable.create(OnSubscribe<T>{ t1 ->
this(t1!!)
})!!
}

public fun<T> Function1<Observer<in T>, Subscription>.asObservable(): Observable<T> {
return Observable.create { this(it!!) }!!
public fun<T> Function1<Observer<in T>, Subscription>.asObservableFunc(): Observable<T> {
return Observable.create(OnSubscribeFunc<T>{ op ->
this(op!!)
})!!
}

public fun<T> Function0<Observable<out T>>.defer(): Observable<T> {
Expand All @@ -41,11 +52,11 @@ public fun<T> Throwable.asObservable(): Observable<T> {
}

public fun<T> Pair<T, T>.asObservable(): Observable<T> {
return Observable.from(this.component1(), this.component2())!!
return Observable.from(listOf(this.component1(), this.component2()))!!
}

public fun<T> Triple<T, T, T>.asObservable(): Observable<T> {
return Observable.from(this.component1(), this.component2(), this.component3())!!
return Observable.from(listOf(this.component1(), this.component2(), this.component3()))!!
}

public fun<T> Pair<Observable<T>, Observable<T>>.merge(): Observable<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,18 @@ import rx.lang.kotlin.BasicKotlinTests.AsyncObservable
/**
* This class use plain Kotlin without extensions from the language adaptor
*/
public class BasicKotlinTests {
public class BasicKotlinTests:KotlinTests() {

[Mock] var a: ScriptAssertion? = null
[Mock] var w: Observable<Int>? = null

[Before]
public fun before() {
MockitoAnnotations.initMocks(this)
}

fun received<T>(): (T?) -> Unit {
return {(result: T?) -> a!!.received(result) }
}

[Test]
public fun testCreate() {
Observable.create<String>{

Observable.create(OnSubscribeFunc<String> {
it!!.onNext("Hello")
it.onCompleted()
Subscriptions.empty()
}!!.subscribe { result ->
})!!.subscribe { result ->
a!!.received(result)
}

Expand All @@ -64,20 +55,20 @@ public class BasicKotlinTests {

[Test]
public fun testFilter() {
Observable.from(1, 2, 3)!!.filter { it!! >= 2 }!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.filter { it!! >= 2 }!!.subscribe(received())
verify(a, times(0))!!.received(1);
verify(a, times(1))!!.received(2);
verify(a, times(1))!!.received(3);
}

[Test]
public fun testLast() {
assertEquals("three", Observable.from("one", "two", "three")!!.toBlockingObservable()!!.last())
assertEquals("three", Observable.from(listOf("one", "two", "three"))!!.toBlockingObservable()!!.last())
}

[Test]
public fun testLastWithPredicate() {
assertEquals("two", Observable.from("one", "two", "three")!!.toBlockingObservable()!!.last { x -> x!!.length == 3 })
assertEquals("two", Observable.from(listOf("one", "two", "three"))!!.toBlockingObservable()!!.last { x -> x!!.length == 3 })
}

[Test]
Expand All @@ -88,29 +79,29 @@ public class BasicKotlinTests {

[Test]
public fun testMap2() {
Observable.from(1, 2, 3)!!.map { v -> "hello_$v" }!!.subscribe((received()))
Observable.from(listOf(1, 2, 3))!!.map { v -> "hello_$v" }!!.subscribe((received()))
verify(a, times(1))!!.received("hello_1")
verify(a, times(1))!!.received("hello_2")
verify(a, times(1))!!.received("hello_3")
}

[Test]
public fun testMaterialize() {
Observable.from(1, 2, 3)!!.materialize()!!.subscribe((received()))
Observable.from(listOf(1, 2, 3))!!.materialize()!!.subscribe((received()))
verify(a, times(4))!!.received(any(javaClass<Notification<Int>>()))
verify(a, times(0))!!.error(any(javaClass<Exception>()))
}

[Test]
public fun testMergeDelayError() {
Observable.mergeDelayError(
Observable.from(1, 2, 3),
Observable.from(listOf(1, 2, 3)),
Observable.merge(
Observable.from(6),
Observable.error(NullPointerException()),
Observable.from(7)
),
Observable.from(4, 5)
Observable.from(listOf(4, 5))
)!!.subscribe(received(), { e -> a!!.error(e) })
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
Expand All @@ -125,13 +116,13 @@ public class BasicKotlinTests {
[Test]
public fun testMerge() {
Observable.merge(
Observable.from(1, 2, 3),
Observable.from(listOf(1, 2, 3)),
Observable.merge(
Observable.from(6),
Observable.error(NullPointerException()),
Observable.from(7)
),
Observable.from(4, 5)
Observable.from(listOf(4, 5))
)!!.subscribe(received(), { e -> a!!.error(e) })
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
Expand Down Expand Up @@ -166,7 +157,7 @@ public class BasicKotlinTests {
[Test]
public fun testFromWithObjects() {
val list = listOf(1, 2, 3, 4, 5)
assertEquals(2, Observable.from(list, 6)!!.count()!!.toBlockingObservable()!!.single())
assertEquals(2, Observable.from(listOf(list, 6))!!.count()!!.toBlockingObservable()!!.single())
}

[Test]
Expand All @@ -185,23 +176,23 @@ public class BasicKotlinTests {

[Test]
public fun testSkipTake() {
Observable.from(1, 2, 3)!!.skip(1)!!.take(1)!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.skip(1)!!.take(1)!!.subscribe(received())
verify(a, times(0))!!.received(1)
verify(a, times(1))!!.received(2)
verify(a, times(0))!!.received(3)
}

[Test]
public fun testSkip() {
Observable.from(1, 2, 3)!!.skip(2)!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.skip(2)!!.subscribe(received())
verify(a, times(0))!!.received(1)
verify(a, times(0))!!.received(2)
verify(a, times(1))!!.received(3)
}

[Test]
public fun testTake() {
Observable.from(1, 2, 3)!!.take(2)!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.take(2)!!.subscribe(received())
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
verify(a, times(0))!!.received(3)
Expand All @@ -215,15 +206,15 @@ public class BasicKotlinTests {

[Test]
public fun testTakeWhile() {
Observable.from(1, 2, 3)!!.takeWhile { x -> x!! < 3 }!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.takeWhile { x -> x!! < 3 }!!.subscribe(received())
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
verify(a, times(0))!!.received(3)
}

[Test]
public fun testTakeWhileWithIndex() {
Observable.from(1, 2, 3)!!.takeWhileWithIndex { x, i -> i!! < 2 }!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.takeWhileWithIndex { x, i -> i!! < 2 }!!.subscribe(received())
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
verify(a, times(0))!!.received(3)
Expand Down Expand Up @@ -251,35 +242,35 @@ public class BasicKotlinTests {

[Test]
public fun testLastOrDefault() {
assertEquals("two", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length == 3 })
assertEquals("default", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
assertEquals("two", Observable.from(listOf("one", "two"))!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length == 3 })
assertEquals("default", Observable.from(listOf("one", "two"))!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
}

[Test(expected = javaClass<IllegalArgumentException>())]
public fun testSingle() {
assertEquals("one", Observable.from("one")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 })
Observable.from("one", "two")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }
Observable.from(listOf("one", "two"))!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }
fail()
}

[Test]
public fun testDefer() {
Observable.defer { Observable.from(1, 2) }!!.subscribe(received())
Observable.defer { Observable.from(listOf(1, 2)) }!!.subscribe(received())
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
}

[Test]
public fun testAll() {
Observable.from(1, 2, 3)!!.all { x -> x!! > 0 }!!.subscribe(received())
Observable.from(listOf(1, 2, 3))!!.all { x -> x!! > 0 }!!.subscribe(received())
verify(a, times(1))!!.received(true)
}

[Test]
public fun testZip() {
val o1 = Observable.from(1, 2, 3)!!
val o2 = Observable.from(4, 5, 6)!!
val o3 = Observable.from(7, 8, 9)!!
val o1 = Observable.from(listOf(1, 2, 3))!!
val o2 = Observable.from(listOf(4, 5, 6))!!
val o3 = Observable.from(listOf(7, 8, 9))!!

val values = Observable.zip(o1, o2, o3) { a, b, c -> listOf(a, b, c) }!!.toList()!!.toBlockingObservable()!!.single()!!
assertEquals(listOf(1, 4, 7), values[0])
Expand All @@ -289,9 +280,9 @@ public class BasicKotlinTests {

[Test]
public fun testZipWithIterable() {
val o1 = Observable.from(1, 2, 3)!!
val o2 = Observable.from(4, 5, 6)!!
val o3 = Observable.from(7, 8, 9)!!
val o1 = Observable.from(listOf(1, 2, 3))!!
val o2 = Observable.from(listOf(4, 5, 6))!!
val o3 = Observable.from(listOf(7, 8, 9))!!

val values = Observable.zip(listOf(o1, o2, o3)) { args -> listOf(*args) }!!.toList()!!.toBlockingObservable()!!.single()!!
assertEquals(listOf(1, 4, 7), values[0])
Expand All @@ -303,33 +294,28 @@ public class BasicKotlinTests {
public fun testGroupBy() {
var count = 0

Observable.from("one", "two", "three", "four", "five", "six")!!
Observable.from(listOf("one", "two", "three", "four", "five", "six"))!!
.groupBy { s -> s!!.length }!!
.mapMany { groupObervable ->
.flatMap { groupObervable ->
groupObervable!!.map { s ->
"Value: $s Group ${groupObervable.getKey()}"
}
}!!
.toBlockingObservable()!!.forEach { s ->
}!!.toBlockingObservable()!!.forEach { s ->
println(s)
count++
}

assertEquals(6, count)
}

public trait ScriptAssertion{
fun error(e: Throwable?)

fun received(e: Any?)
}

public class TestFactory(){
var counter = 1

val numbers: Observable<Int>
get(){
return Observable.from(1, 3, 2, 5, 4)!!
return Observable.from(listOf(1, 3, 2, 5, 4))!!
}

val onSubscribe: TestOnSubscribe
Expand All @@ -345,22 +331,22 @@ public class BasicKotlinTests {
}

class AsyncObservable : OnSubscribeFunc<Int>{
override fun onSubscribe(t1: Observer<in Int>?): Subscription? {
override fun onSubscribe(op: Observer<in Int>?): Subscription? {
thread {
Thread.sleep(50)
t1!!.onNext(1)
t1.onNext(2)
t1.onNext(3)
t1.onCompleted()
op!!.onNext(1)
op.onNext(2)
op.onNext(3)
op.onCompleted()
}
return Subscriptions.empty()
}
}

class TestOnSubscribe(val count: Int) : OnSubscribeFunc<String>{
override fun onSubscribe(t1: Observer<in String>?): Subscription? {
t1!!.onNext("hello_$count")
t1.onCompleted()
override fun onSubscribe(op: Observer<in String>?): Subscription? {
op!!.onNext("hello_$count")
op.onCompleted()
return Subscription { }
}

Expand Down
Loading