Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in zipWithIndex and set zip(that, selector) public in RxScala #1226

Merged
merged 2 commits into from
May 20, 2014

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented May 20, 2014

  • The function used in zipWithIndex is mutable, which make the Observable cannot be reused. Fixed it using zip((0 until Int.MaxValue).toObservable).
  • Make zip(that, selector) public and rename to zipWith. RxScala zip #1189

/cc @headinthebox, @samuelgruetter

@cloudbees-pull-request-builder

RxJava-pull-requests #1132 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request May 20, 2014
Fix bug in `zipWithIndex` and set `zip(that, selector)` public in RxScala
@benjchristensen benjchristensen merged commit b56f0a5 into ReactiveX:master May 20, 2014
@samuelgruetter
Copy link
Contributor

Here are some tests that I did on commit 1e07ccc :

  // passes
  @Test def testZipWithIndex2() {
    val o = Observable.interval(100 millis).map(_ * 100).take(3).zipWithIndex.map(_._2)
    assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
    assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
  }

  // 100% CPU usage on all cores until I abort the test
  @Test def testZipWithIndex3() {
    val o = Observable.interval(100 millis).map(_ * 100).take(3).doOnEach(println(_)).zipWithIndex.map(_._2)
    assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
    assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
  }

  // fails (as expected)
  // java.lang.AssertionError: expected:<List(0, 1, 2)> but was:<List(0, 100, 200)>
  @Test def testZipWithIndex4() {
    val o = Observable.interval(100 millis).map(_ * 100).take(3).doOnEach(println(_))
    assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
    assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
  }

I don't know exactly what's happening, but it looks like something weird's going on with testZipWithIndex3. @zsxwing can you reproduce this behavior? If yes, could you have a look what's going on here?

@zsxwing
Copy link
Member Author

zsxwing commented May 21, 2014

Weird. testZipWithIndex3 passed on my machine both in IDEA and gradle CLI. But I noticed a weird thing. When I first ran testZipWithIndex2, IDEA was frozen about 140 seconds. However, the test still passed. So I guess maybe some issue in scalac or IDEA causes 100% CPU usage.

Can you try to run tests in gradle CLI?

@zsxwing
Copy link
Member Author

zsxwing commented May 21, 2014

My test was on 566e892

@samuelgruetter
Copy link
Contributor

The problem is that as soon as we have several threads, everything becomes non-deterministic... I made another test:

  @Test def testZipWithIndex5() {
    for (i <- 1 to 10) {
      val start = System.currentTimeMillis()
      val o = Observable.interval(100 millis).map(_ * 100).take(3).zipWithIndex.map(_._2)
      assertEquals(List(0, 1, 2), o.subscribeOn(NewThreadScheduler()).toBlockingObservable.toList)
      println(s"iteration took ${System.currentTimeMillis() - start} ms")
    }
  }

And I ran it on 566e892, in gradle CLI using the command

./gradlew :language-adaptors:rxjava-scala:test --info

The output I got was this:

[ant:scalatest] iteration took 349 ms
[ant:scalatest] iteration took 300 ms
[ant:scalatest] iteration took 1482 ms
[ant:scalatest] iteration took 300 ms
[ant:scalatest] iteration took 301 ms
[ant:scalatest] iteration took 301 ms
[ant:scalatest] iteration took 300 ms
[ant:scalatest] iteration took 301 ms
[ant:scalatest] iteration took 3438 ms
[ant:scalatest] iteration took 300 ms

As you can see, usually it takes about 3 * 100 millis, as expected, but sometimes it takes much longer. I imagine this might be because the implementation of zipWithIndex feeds a lot of unused numbers to the zip Observer before the thread emitting onComplete and unsubscribing from the (0 until Int.MaxValue) Observable can do its job.

And if you replace the body of your zipWithIndex implementation by

zip((0 until Int.MaxValue).toObservable.doOnEach(println(_)))

you can see that this is indeed the case (I get thousands of numbers printed).

@zsxwing
Copy link
Member Author

zsxwing commented May 21, 2014

I imagine this might be because the implementation of zipWithIndex feeds a lot of unused numbers to the zip Observer before the thread emitting onComplete and unsubscribing from the (0 until Int.MaxValue) Observable can do its job.

Agreed. Maybe the best solution is getting mapWithIndex back to RxJava? @benjchristensen

@samuelgruetter
Copy link
Contributor

Or maybe zipWithIterable, such that the next item of the Iterable is produced only on-demand instead of eagerly. Or a zip operator with backpressure, but afair, this was considered impossible...

@samuelgruetter
Copy link
Contributor

Or add

def zip[U](other: Iterable[U]): Observable[(T, U)]

implemented using

public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction)

and then use it to implement zipWithIndex.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants