Skip to content

Commit

Permalink
Merge pull request #781 from Applied-Duality/join
Browse files Browse the repository at this point in the history
Fixed buglet in join binding, simplified types
  • Loading branch information
benjchristensen committed Feb 6, 2014
2 parents 64b984e + e359f3c commit 29162d7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object Olympics {
// So we don't use this:
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
// But we just return empty, which completes immediately
Observable.empty[Medal]
Observable.empty
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,10 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def groupByUntilExample() {
val numbers = Observable.interval(250 millis) take 14
val grouped = numbers.groupByUntil[Long, Long](
{case x => x % 2},
{case (key, obs) => obs filter {case x => x == 7}}
)
val sequenced = (grouped map {case (key, obs) => obs.toSeq}).flatten
sequenced subscribe {x => println(s"Emitted group: $x")}
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil[Long](x => x % 2, {case (key, obs) => obs.filter(x => x == 7)})
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
sequenced.subscribe(x => println(s"Emitted group: $x"))
}


Expand Down Expand Up @@ -312,7 +309,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def averageExample() {
println(doubleAverage(Observable.empty[Double]).toBlockingObservable.single)
println(doubleAverage(Observable.empty).toBlockingObservable.single)
println(doubleAverage(List(0.0).toObservable).toBlockingObservable.single)
println(doubleAverage(List(4.44).toObservable).toBlockingObservable.single)
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlockingObservable.single)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ trait Observable[+T]
* @return
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
def buffer[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]): Observable[Seq[T]] = {
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Any]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Any](opening, closing)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}

Expand Down Expand Up @@ -540,9 +540,9 @@ trait Observable[+T]
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
*/
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
def window(closings: () => Observable[Any]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Any](func)
val o2 = Observable.items(o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
toScalaObservable[T](x2)
Expand All @@ -566,9 +566,9 @@ trait Observable[+T]
* @return
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
def window[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]) = {
Observable.jObsOfJObsToScObsOfScObs(
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
asJavaObservable.window[Opening, Any](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
: Observable[Observable[T]] // SI-7818
}

Expand Down Expand Up @@ -1336,19 +1336,61 @@ trait Observable[+T]
* an observable that emits a single Closing when the group should be closed.
* @tparam K
* the type of the keys returned by the discriminator function.
* @tparam Closing
* the type of the element emitted from the closings observable.
* @return an Observable that emits `(key, observable)` pairs, where `observable`
* contains all items for which `f` returned `key` before `closings` emits a value.
*/
def groupByUntil[K, Closing](f: T => K, closings: (K, Observable[T])=>Observable[Closing]): Observable[(K, Observable[T])] = {
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Closing]] =
def groupByUntil[K](f: T => K, closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}

/**
* Correlates the items emitted by two Observables based on overlapping durations.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/join_.png">
*
* @param other
* the second Observable to join items from
* @param leftDurationSelector
* a function to select a duration for each item emitted by the source Observable,
* used to determine overlap
* @param rightDurationSelector
* a function to select a duration for each item emitted by the inner Observable,
* used to determine overlap
* @param resultSelector
* a function that computes an item to be emitted by the resulting Observable for any
* two overlapping items emitted by the two Observables
* @return
* an Observable that emits items correlating to items emitted by the source Observables
* that have overlapping durations
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#join">RxJava Wiki: join()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229750.aspx">MSDN: Observable.Join</a>
*/
def join[S, R] (
other: Observable[S],
leftDurationSelector: T => Observable[Any],
rightDurationSelector: S => Observable[Any],
resultSelector: (T,S) => R
): Observable[R] = {

val outer : rx.Observable[_ <: T] = this.asJavaObservable
val inner : rx.Observable[_ <: S] = other.asJavaObservable
val left: Func1[_ >: T, _<: rx.Observable[_ <: Any]] = (t: T) => leftDurationSelector(t).asJavaObservable
val right: Func1[_ >: S, _<: rx.Observable[_ <: Any]] = (s: S) => rightDurationSelector(s).asJavaObservable
val f: Func2[_>: T, _ >: S, _ <: R] = resultSelector

toScalaObservable[R](
outer.asInstanceOf[rx.Observable[T]].join[S, Any, Any, R](
inner.asInstanceOf[rx.Observable[S]],
left. asInstanceOf[Func1[T, rx.Observable[Any]]],
right.asInstanceOf[Func1[S, rx.Observable[Any]]],
f.asInstanceOf[Func2[T,S,R]])
)
}

/**
* Given an Observable that emits Observables, creates a single Observable that
* emits the items emitted by the most recently published of those Observables.
Expand Down Expand Up @@ -2136,18 +2178,15 @@ object Observable {
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.s.png">
*
* @param scheduler the scheduler to call the
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
* @param T the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
* immediately invokes the [[rx.lang.scala.Observer]]r's
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
* specified scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
*/
def empty[T]: Observable[T] = {
toScalaObservable(rx.Observable.empty[T]())
def empty: Observable[Nothing] = {
toScalaObservable(rx.Observable.empty[Nothing]())
}

/**
Expand All @@ -2159,16 +2198,15 @@ object Observable {
*
* @param scheduler the scheduler to call the
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
* @param T the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
* immediately invokes the [[rx.lang.scala.Observer]]r's
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
* specified scheduler
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
*/
def empty[T](scheduler: Scheduler): Observable[T] = {
toScalaObservable(rx.Observable.empty[T](scalaSchedulerToJavaScheduler(scheduler)))
def empty(scheduler: Scheduler): Observable[Nothing] = {
toScalaObservable(rx.Observable.empty[Nothing](scalaSchedulerToJavaScheduler(scheduler)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class ObservableTests extends JUnitSuite {
assertEquals(6, o.toBlockingObservable.single)
}

@Test def testJoin() {
val xs = Observable.items(1,2,3)
val ys = Observable.items("a")
val zs = xs.join[String,String](ys, x => Observable.never, y => Observable.never, (x,y) => y+x)
assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList)
}

/*
@Test def testHead() {
val observer = mock(classOf[Observer[Int]])
Expand Down

0 comments on commit 29162d7

Please sign in to comment.