From e761a0813724efd36b28860bff1ee0e5e6e9e51f Mon Sep 17 00:00:00 2001 From: Laimonas Turauskas Date: Mon, 19 Oct 2015 14:59:56 +0300 Subject: [PATCH 1/2] Added extension method for static Observable.switchOnNext method --- src/main/kotlin/rx/lang/kotlin/observables.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/kotlin/rx/lang/kotlin/observables.kt b/src/main/kotlin/rx/lang/kotlin/observables.kt index 917e889..85f535f 100644 --- a/src/main/kotlin/rx/lang/kotlin/observables.kt +++ b/src/main/kotlin/rx/lang/kotlin/observables.kt @@ -106,3 +106,5 @@ public inline fun Observable.subscribeWith( body : FunctionSubscriberModi modifier.body() return subscribe(modifier.subscriber) } + +public fun Observable>.switchOnNext(): Observable = Observable.switchOnNext(this) From 5334bbf0896056b3b4009bb6f762516372d7771b Mon Sep 17 00:00:00 2001 From: Laimonas Turauskas Date: Tue, 20 Oct 2015 12:58:35 +0300 Subject: [PATCH 2/2] Added a unit test for Observable.switchOnNext() extension method --- .../kotlin/rx/lang/kotlin/ExtensionTests.kt | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt b/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt index 8f22c6f..fed5801 100644 --- a/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt +++ b/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt @@ -16,15 +16,15 @@ package rx.lang.kotlin -import rx.Observable import org.junit.Test import org.mockito.Mockito.* import org.mockito.Matchers.* import org.junit.Assert.* -import rx.Notification import kotlin.concurrent.thread -import rx.Subscriber import org.funktionale.partials.* +import rx.* +import rx.schedulers.TestScheduler +import java.util.concurrent.TimeUnit /** * This class contains tests using the extension functions provided by the language adaptor. @@ -236,6 +236,41 @@ public class ExtensionTests : KotlinTests() { assertEquals(listOf(3, 6, 9), values[2]) } + @Test + public fun testSwitchOnNext() { + val testScheduler = TestScheduler() + val worker = testScheduler.createWorker() + + val observable = observable> { s -> + fun at(delay: Long, func : () -> Unit){ + worker.schedule({ + func() + }, delay, TimeUnit.MILLISECONDS) + } + + val first = Observable.interval(5, TimeUnit.MILLISECONDS, testScheduler).take(3) + at(0, { s.onNext(first) }) + + val second = Observable.interval(5, TimeUnit.MILLISECONDS, testScheduler).take(3) + at(11, { s.onNext(second) }) + + at(40, { s.onCompleted() }) + } + + observable.switchOnNext().subscribe(received) + + val inOrder = inOrder(a) + testScheduler.advanceTimeTo(10, TimeUnit.MILLISECONDS) + inOrder.verify(a, times(1)).received(0L) + inOrder.verify(a, times(1)).received(1L) + + testScheduler.advanceTimeTo(40, TimeUnit.MILLISECONDS) + inOrder.verify(a, times(1)).received(0L) + inOrder.verify(a, times(1)).received(1L) + inOrder.verify(a, times(1)).received(2L) + inOrder.verifyNoMoreInteractions() + } + val funOnSubscribe: (Int, Subscriber) -> Unit = { counter, subscriber -> subscriber.onNext("hello_$counter") subscriber.onCompleted()