Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed buglet in join binding, simplified types #781

Merged
merged 2 commits into from
Feb 6, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object Olympics {
// So we don't use this:
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
// But we just return empty, which completes immediately
Observable.empty[Medal]
Observable.empty
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,10 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def groupByUntilExample() {
val numbers = Observable.interval(250 millis) take 14
val grouped = numbers.groupByUntil[Long, Long](
{case x => x % 2},
{case (key, obs) => obs filter {case x => x == 7}}
)
val sequenced = (grouped map {case (key, obs) => obs.toSeq}).flatten
sequenced subscribe {x => println(s"Emitted group: $x")}
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil[Long](x => x % 2, {case (key, obs) => obs.filter(x => x == 7)})
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
sequenced.subscribe(x => println(s"Emitted group: $x"))
}


Expand Down Expand Up @@ -312,7 +309,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def averageExample() {
println(doubleAverage(Observable.empty[Double]).toBlockingObservable.single)
println(doubleAverage(Observable.empty).toBlockingObservable.single)
println(doubleAverage(List(0.0).toObservable).toBlockingObservable.single)
println(doubleAverage(List(4.44).toObservable).toBlockingObservable.single)
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlockingObservable.single)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,10 @@ trait Observable[+T]
* @return
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
def buffer[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]): Observable[Seq[T]] = {
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Any]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Any](opening, closing)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}

Expand Down Expand Up @@ -539,9 +539,9 @@ trait Observable[+T]
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
*/
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
def window(closings: () => Observable[Any]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Any](func)
val o2 = Observable.items(o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
toScalaObservable[T](x2)
Expand All @@ -565,9 +565,9 @@ trait Observable[+T]
* @return
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
def window[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]) = {
Observable.jObsOfJObsToScObsOfScObs(
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
asJavaObservable.window[Opening, Any](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
: Observable[Observable[T]] // SI-7818
}

Expand Down Expand Up @@ -1335,19 +1335,61 @@ trait Observable[+T]
* an observable that emits a single Closing when the group should be closed.
* @tparam K
* the type of the keys returned by the discriminator function.
* @tparam Closing
* the type of the element emitted from the closings observable.
* @return an Observable that emits `(key, observable)` pairs, where `observable`
* contains all items for which `f` returned `key` before `closings` emits a value.
*/
def groupByUntil[K, Closing](f: T => K, closings: (K, Observable[T])=>Observable[Closing]): Observable[(K, Observable[T])] = {
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Closing]] =
def groupByUntil[K](f: T => K, closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}

/**
* Correlates the items emitted by two Observables based on overlapping durations.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/join_.png">
*
* @param other
* the second Observable to join items from
* @param leftDurationSelector
* a function to select a duration for each item emitted by the source Observable,
* used to determine overlap
* @param rightDurationSelector
* a function to select a duration for each item emitted by the inner Observable,
* used to determine overlap
* @param resultSelector
* a function that computes an item to be emitted by the resulting Observable for any
* two overlapping items emitted by the two Observables
* @return
* an Observable that emits items correlating to items emitted by the source Observables
* that have overlapping durations
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#join">RxJava Wiki: join()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229750.aspx">MSDN: Observable.Join</a>
*/
def join[S, R] (
other: Observable[S],
leftDurationSelector: T => Observable[Any],
rightDurationSelector: S => Observable[Any],
resultSelector: (T,S) => R
): Observable[R] = {

val outer : rx.Observable[_ <: T] = this.asJavaObservable
val inner : rx.Observable[_ <: S] = other.asJavaObservable
val left: Func1[_ >: T, _<: rx.Observable[_ <: Any]] = (t: T) => leftDurationSelector(t).asJavaObservable
val right: Func1[_ >: S, _<: rx.Observable[_ <: Any]] = (s: S) => rightDurationSelector(s).asJavaObservable
val f: Func2[_>: T, _ >: S, _ <: R] = resultSelector

toScalaObservable[R](
outer.asInstanceOf[rx.Observable[T]].join[S, Any, Any, R](
inner.asInstanceOf[rx.Observable[S]],
left. asInstanceOf[Func1[T, rx.Observable[Any]]],
right.asInstanceOf[Func1[S, rx.Observable[Any]]],
f.asInstanceOf[Func2[T,S,R]])
)
}

/**
* Given an Observable that emits Observables, creates a single Observable that
* emits the items emitted by the most recently published of those Observables.
Expand Down Expand Up @@ -2135,18 +2177,15 @@ object Observable {
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.s.png">
*
* @param scheduler the scheduler to call the
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
* @param T the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
* immediately invokes the [[rx.lang.scala.Observer]]r's
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
* specified scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
*/
def empty[T]: Observable[T] = {
toScalaObservable(rx.Observable.empty[T]())
def empty: Observable[Nothing] = {
toScalaObservable(rx.Observable.empty[Nothing]())
}

/**
Expand All @@ -2158,16 +2197,15 @@ object Observable {
*
* @param scheduler the scheduler to call the
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
* @param T the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
* immediately invokes the [[rx.lang.scala.Observer]]r's
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
* specified scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
*/
def empty[T](scheduler: Scheduler): Observable[T] = {
toScalaObservable(rx.Observable.empty[T](scalaSchedulerToJavaScheduler(scheduler)))
def empty(scheduler: Scheduler): Observable[Nothing] = {
toScalaObservable(rx.Observable.empty[Nothing](scalaSchedulerToJavaScheduler(scheduler)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class ObservableTests extends JUnitSuite {
assertEquals(6, o.toBlockingObservable.single)
}

@Test def testJoin() {
val xs = Observable.items(1,2,3)
val ys = Observable.items("a")
val zs = xs.join[String,String](ys, x => Observable.never, y => Observable.never, (x,y) => y+x)
assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList)
}

/*
@Test def testHead() {
val observer = mock(classOf[Observer[Int]])
Expand Down