Skip to content

Commit

Permalink
Renamed apply(items: T*) to items.
Browse files Browse the repository at this point in the history
Fixed all tests.
  • Loading branch information
AppliedDuality committed Dec 10, 2013
1 parent 83b0760 commit 335ec87
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 66 deletions.
16 changes: 12 additions & 4 deletions language-adaptors/rxjava-scala/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@ object Observable {

The major changes in `Observable` are wrt to the factory methods where too libral use of overloading of the `apply`
method hindered type inference and made Scala code look unnecessarily different than that in other language bindings.
In fact the only occurrence left of `apply` is for the varargs case. All other factory methods now have their own name.
All factory methods now have their own name corresponding to the Java and .NET operators
(plus overloads that take a `Scheduler`).

* `def apply[T](items: T*): Observable[T]`
* `def from[T](f: Future[T]): Observable[T]`
* `def from[T](future: Future[T]): Observable[T]`
* `def from[T](iterable: Iterable[T]): Observable[T]`
* `def create[T](subscribe: Observer[T] => Subscription): Observable[T]`
* `def error[T](exception: Throwable): Observable[T]`
* `def empty[T]: Observable[T]`
* `def items[T](items: T*): Observable[T]

In the *pre-release* of this version, we expose both `apply` and `create` for the mother of all creation functions.
We would like to solicit feedback which of these two names is preferred
(or both, but there is a high probability that only one will be chosen).

* `def apply[T](subscribe: Observer[T]=>Subscription): Observable[T]`
* `def create[T](subscribe: Observer[T] => Subscription): Observable[T]`

Subject
-------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.concurrent.duration._
object Olympics {
case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String)

def mountainBikeMedals: Observable[Medal] = Observable(
Observable(
def mountainBikeMedals: Observable[Medal] = Observable.items(
Observable.items(
Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"),
Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"),
Expand All @@ -31,7 +31,7 @@ object Olympics {
Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America")
),
fourYearsEmpty,
Observable(
Observable.items(
Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"),
Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"),
Expand All @@ -40,7 +40,7 @@ object Olympics {
Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland")
),
fourYearsEmpty,
Observable(
Observable.items(
Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"),
Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"),
Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"),
Expand All @@ -49,7 +49,7 @@ object Olympics {
Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany")
),
fourYearsEmpty,
Observable(
Observable.items(
Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"),
Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"),
Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"),
Expand All @@ -58,7 +58,7 @@ object Olympics {
Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland")
),
fourYearsEmpty,
Observable(
Observable.items(
Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"),
Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"),
Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"),
Expand All @@ -80,7 +80,7 @@ object Olympics {
// So we don't use this:
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
// But we just return empty, which completes immediately
Observable()
Observable.empty[Medal]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testObservableComparison() {
val first = Observable(10, 11, 12)
val second = Observable(10, 11, 12)
val first = Observable.from(List(10, 11, 12))
val second = Observable.from(List(10, 11, 12))

val b1 = (first zip second) map (p => p._1 == p._2) forall (b => b)

Expand All @@ -88,8 +88,8 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testObservableComparisonWithForComprehension() {
val first = Observable(10, 11, 12)
val second = Observable(10, 11, 12)
val first = Observable.from(List(10, 11, 12))
val second = Observable.from(List(10, 11, 12))

val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)

Expand All @@ -99,8 +99,8 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testStartWithIsUnnecessary() {
val before = Observable(-2, -1, 0)
val source = Observable(1, 2, 3)
val before = List(-2, -1, 0).toObservable
val source = List(1, 2, 3).toObservable
println((before ++ source).toBlockingObservable.toList)
}

Expand All @@ -124,11 +124,11 @@ class RxScalaDemo extends JUnitSuite {

@Test def fattenSomeExample() {
// To merge some observables which are all known already:
Observable(
List(
Observable.interval(200 millis),
Observable.interval(400 millis),
Observable.interval(800 millis)
).flatten.take(12).toBlockingObservable.foreach(println(_))
).toObservable.flatten.take(12).toBlockingObservable.foreach(println(_))
}

@Test def rangeAndBufferExample() {
Expand All @@ -143,7 +143,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testReduce() {
assertEquals(10, Observable(1, 2, 3, 4).reduce(_ + _).toBlockingObservable.single)
assertEquals(10, List(1, 2, 3, 4).toObservable.reduce(_ + _).toBlockingObservable.single)
}

@Test def testForeach() {
Expand All @@ -157,7 +157,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testForComprehension() {
val observables = Observable(Observable(1, 2, 3), Observable(10, 20, 30))
val observables = List(List(1, 2, 3).toObservable, List(10, 20, 30).toObservable).toObservable
val squares = (for (o <- observables; i <- o if i % 2 == 0) yield i*i)
assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900))
}
Expand Down Expand Up @@ -185,14 +185,14 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testGroupByThenFlatMap() {
val m = Observable(1, 2, 3, 4)
val m = List(1, 2, 3, 4).toObservable
val g = m.groupBy(i => i % 2)
val t = g.flatMap((p: (Int, Observable[Int])) => p._2)
assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList)
}

@Test def testGroupByThenFlatMapByForComprehension() {
val m = Observable(1, 2, 3, 4)
val m = List(1, 2, 3, 4).toObservable
val g = m.groupBy(i => i % 2)
val t = for ((i, o) <- g; n <- o) yield n
assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList)
Expand Down Expand Up @@ -250,13 +250,13 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def exampleWithoutPublish() {
val unshared = Observable(1 to 4)
val unshared = List(1 to 4).toObservable
unshared.subscribe(n => println(s"subscriber 1 gets $n"))
unshared.subscribe(n => println(s"subscriber 2 gets $n"))
}

@Test def exampleWithPublish() {
val unshared = Observable(1 to 4)
val unshared = List(1 to 4).toObservable
val (startFunc, shared) = unshared.publish
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
Expand Down Expand Up @@ -288,9 +288,9 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def testSingleOption() {
assertEquals(None, Observable(1, 2).toBlockingObservable.singleOption)
assertEquals(Some(1), Observable(1) .toBlockingObservable.singleOption)
assertEquals(None, Observable() .toBlockingObservable.singleOption)
assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption)
assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption)
assertEquals(None, List().toObservable.toBlockingObservable.singleOption)
}

// We can't put a general average method into Observable.scala, because Scala's Numeric
Expand All @@ -301,58 +301,58 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def averageExample() {
println(doubleAverage(Observable()).toBlockingObservable.single)
println(doubleAverage(Observable(0)).toBlockingObservable.single)
println(doubleAverage(Observable(4.44)).toBlockingObservable.single)
println(doubleAverage(Observable(1, 2, 3.5)).toBlockingObservable.single)
println(doubleAverage(Observable.empty[Double]).toBlockingObservable.single)
println(doubleAverage(List(0.0).toObservable).toBlockingObservable.single)
println(doubleAverage(List(4.44).toObservable).toBlockingObservable.single)
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlockingObservable.single)
}

@Test def testSum() {
assertEquals(10, Observable(1, 2, 3, 4).sum.toBlockingObservable.single)
assertEquals(6, Observable(4, 2).sum.toBlockingObservable.single)
assertEquals(0, Observable[Int]().sum.toBlockingObservable.single)
assertEquals(10, List(1, 2, 3, 4).toObservable.sum.toBlockingObservable.single)
assertEquals(6, List(4, 2).toObservable.sum.toBlockingObservable.single)
assertEquals(0, List[Int]().toObservable.sum.toBlockingObservable.single)
}

@Test def testProduct() {
assertEquals(24, Observable(1, 2, 3, 4).product.toBlockingObservable.single)
assertEquals(8, Observable(4, 2).product.toBlockingObservable.single)
assertEquals(1, Observable[Int]().product.toBlockingObservable.single)
assertEquals(24, List(1, 2, 3, 4).toObservable.product.toBlockingObservable.single)
assertEquals(8, List(4, 2).toObservable.product.toBlockingObservable.single)
assertEquals(1, List[Int]().toObservable.product.toBlockingObservable.single)
}

@Test def mapWithIndexExample() {
// We don't need mapWithIndex because we already have zipWithIndex, which we can easily
// combine with map:
Observable("a", "b", "c").zipWithIndex.map(pair => pair._1 + " has index " + pair._2)
List("a", "b", "c").toObservable.zipWithIndex.map(pair => pair._1 + " has index " + pair._2)
.toBlockingObservable.foreach(println(_))

// Or even nicer with for-comprehension syntax:
(for ((letter, index) <- Observable("a", "b", "c").zipWithIndex) yield letter + " has index " + index)
(for ((letter, index) <- List("a", "b", "c").toObservable.zipWithIndex) yield letter + " has index " + index)
.toBlockingObservable.foreach(println(_))
}

// source Observables are all known:
@Test def zip3Example() {
val o = Observable.zip(Observable(1, 2), Observable(10, 20), Observable(100, 200))
val o = Observable.zip(List(1, 2).toObservable, List(10, 20).toObservable, List(100, 200).toObservable)
(for ((n1, n2, n3) <- o) yield s"$n1, $n2 and $n3")
.toBlockingObservable.foreach(println(_))
}

// source Observables are in an Observable:
@Test def zipManyObservableExample() {
val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200))
val observables = List(List(1, 2).toObservable, List(10, 20).toObservable, List(100, 200).toObservable).toObservable
(for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")"))
.toBlockingObservable.foreach(println(_))
}

@Test def takeFirstWithCondition() {
val condition: Int => Boolean = _ >= 3
assertEquals(3, Observable(1, 2, 3, 4).filter(condition).first.toBlockingObservable.single)
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single)
}

@Test def firstOrDefaultWithCondition() {
val condition: Int => Boolean = _ >= 3
assertEquals(3, Observable(1, 2, 3, 4).filter(condition).firstOrElse(10).toBlockingObservable.single)
assertEquals(10, Observable(-1, 0, 1).filter(condition).firstOrElse(10).toBlockingObservable.single)
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).firstOrElse(10).toBlockingObservable.single)
assertEquals(10, List(-1, 0, 1).toObservable.filter(condition).firstOrElse(10).toBlockingObservable.single)
}

def square(x: Int): Int = {
Expand All @@ -379,9 +379,9 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def toSortedList() {
assertEquals(Seq(7, 8, 9, 10), Observable(10, 7, 8, 9).toSeq.map(_.sorted).toBlockingObservable.single)
assertEquals(Seq(7, 8, 9, 10), List(10, 7, 8, 9).toObservable.toSeq.map(_.sorted).toBlockingObservable.single)
val f = (a: Int, b: Int) => b < a
assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single)
assertEquals(Seq(10, 9, 8, 7), List(10, 7, 8, 9).toObservable.toSeq.map(_.sortWith(f)).toBlockingObservable.single)
}

@Test def timestampExample() {
Expand Down Expand Up @@ -410,25 +410,25 @@ class RxScalaDemo extends JUnitSuite {

@Test def materializeExample2() {
import Notification._
Observable(1, 2, 3).materialize.subscribe(n => n match {
List(1, 2, 3).toObservable.materialize.subscribe(n => n match {
case OnNext(v) => println("Got value " + v)
case OnCompleted() => println("Completed")
case OnError(err) => println("Error: " + err.getMessage)
})
}

@Test def elementAtReplacement() {
assertEquals("b", Observable("a", "b", "c").drop(1).first.toBlockingObservable.single)
assertEquals("b", List("a", "b", "c").toObservable.drop(1).first.toBlockingObservable.single)
}

@Test def elementAtOrDefaultReplacement() {
assertEquals("b", Observable("a", "b", "c").drop(1).firstOrElse("!").toBlockingObservable.single)
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
assertEquals("b", List("a", "b", "c").toObservable.drop(1).firstOrElse("!").toBlockingObservable.single)
assertEquals("!!", List("a", "b", "c").toObservable.drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def takeWhileWithIndexAlternative {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
List("a", "b").toObservable.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
}

@Test def createExample() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ trait Observable[+T]
* @return $subscribeAllReturn
*/
def apply(observer: Observer[T]): Subscription = subscribe(observer)
def apply(): Subscription = subscribe()

/**
* $subscribeCallbacksMainNoNotifications
Expand Down Expand Up @@ -544,7 +543,7 @@ trait Observable[+T]
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val o2 = Observable.items(o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
toScalaObservable[T](x2)
})
Expand Down Expand Up @@ -1985,6 +1984,48 @@ object Observable {
toScalaObservable[T](rx.Observable.error(exception))
}

/**
* Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and
* immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method
* with the specified scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.s.png">
*
* @param scheduler the scheduler to call the
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
* @param T the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
* immediately invokes the [[rx.lang.scala.Observer]]r's
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
* specified scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
*/
def empty[T]: Observable[T] = {
toScalaObservable(rx.Observable.empty[T]())
}

/**
* Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and
* immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method
* with the specified scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.s.png">
*
* @param scheduler the scheduler to call the
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
* @param T the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
* immediately invokes the [[rx.lang.scala.Observer]]r's
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
* specified scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
*/
def empty[T](scheduler: Scheduler): Observable[T] = {
toScalaObservable(rx.Observable.empty[T](scalaSchedulerToJavaScheduler(scheduler)))
}

/**
* Converts a sequence of values into an Observable.
*
Expand All @@ -2001,10 +2042,6 @@ object Observable {
* resulting Observable
* @return an Observable that emits each item in the source Array
*/
def apply[T](items: T*): Observable[T] = {
toScalaObservable[T](rx.Observable.from(items.toIterable.asJava))
}

def items[T](items: T*): Observable[T] = {
toScalaObservable[T](rx.Observable.from(items.toIterable.asJava))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package object scala {
* Placeholder for extension methods into Observable[T] from other types
*/
implicit class ObservableExtensions[T](val source: Iterable[T]) extends AnyVal {
def toObservable(): Observable[T] = { Observable.from(source) }
def toObservable: Observable[T] = { Observable.from(source) }
def toObservable(scheduler: Scheduler): Observable[T] = { Observable.from(source, scheduler) }
}

Expand Down
Loading

0 comments on commit 335ec87

Please sign in to comment.