Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rxscala improvement #1159

Merged
merged 7 commits into from
May 6, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.implicitConversions

import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.assertFalse
import org.junit.Ignore
import org.junit.Test
import org.scalatest.junit.JUnitSuite
Expand Down Expand Up @@ -296,6 +297,28 @@ class RxScalaDemo extends JUnitSuite {
shared.connect
}

@Test def exampleWithPublish2() {
val unshared = Observable.from(1 to 4)
val shared = unshared.publish(0)
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
shared.connect
}

@Test def exampleWithPublish3() {
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2))
o.subscribe(n => println(s"subscriber 1 gets $n"))
o.subscribe(n => println(s"subscriber 2 gets $n"))
Thread.sleep(1000)
}

@Test def exampleWithPublish4() {
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2), -1L)
o.subscribe(n => println(s"subscriber 1 gets $n"))
o.subscribe(n => println(s"subscriber 2 gets $n"))
Thread.sleep(1000)
}

def doLater(waitTime: Duration, action: () => Unit): Unit = {
Observable.interval(waitTime).take(1).subscribe(_ => action())
}
Expand Down Expand Up @@ -426,6 +449,15 @@ class RxScalaDemo extends JUnitSuite {
)
}

@Test def dropUntilExample() {
val o = List("Alice", "Bob", "Carlos").toObservable.zip(
Observable.interval(700 millis, IOScheduler())).map(_._1) // emit every 700 millis
val other = List(1).toObservable.delay(1 seconds)
println(
o.dropUntil(other).toBlockingObservable.toList // output List("Bob", "Carlos")
)
}

def square(x: Int): Int = {
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
Thread.sleep(100) // calculating a square is heavy work :)
Expand Down Expand Up @@ -550,6 +582,26 @@ class RxScalaDemo extends JUnitSuite {
obs.toBlockingObservable.toIterable.last
}

@Test def doOnTerminateExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnTerminate(() => println("terminate"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
// blud
// terminate
// onCompleted
}

@Test def finallyDoExample(): Unit = {
val o = List("red", "green", "blue").toObservable.finallyDo(() => println("finally"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
// blud
// onCompleted
// finally
}

@Test def timeoutExample(): Unit = {
val other = List(100L, 200L, 300L).toObservable
val result = Observable.interval(100 millis).timeout(50 millis, other).toBlockingObservable.toList
Expand Down Expand Up @@ -636,6 +688,24 @@ class RxScalaDemo extends JUnitSuite {
println(m.toBlockingObservable.single)
}

@Test def containsExample(): Unit = {
val o1 = List(1, 2, 3).toObservable.contains(2)
assertTrue(o1.toBlockingObservable.single)

val o2 = List(1, 2, 3).toObservable.contains(4)
assertFalse(o2.toBlockingObservable.single)
}

@Test def repeatExample1(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6)
assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList)
}

@Test def repeatExample2(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat(2)
assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList)
}

@Test def retryExample1(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol").toObservable
assertEquals(List("alice", "bob", "carol"), o.retry.toBlockingObservable.toList)
Expand Down Expand Up @@ -708,4 +778,27 @@ class RxScalaDemo extends JUnitSuite {
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
}
}

@Test def startWithExample1(): Unit = {
val o1 = List(3, 4).toObservable
val o2 = 1 :: 2 :: o1
assertEquals(List(1, 2, 3, 4), o2.toBlockingObservable.toList)
}

@Test def startWithExample2(): Unit = {
val prepended = List(2, 4).toObservable
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(prepended)
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
}

@Test def startWithExample3(): Unit = {
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(List(2, 4))
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
}

@Test def startWithExample4(): Unit = {
val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(Array(2, 4))
assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList)
}

}
Loading