Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Single.flatMapObservable does not respect observeOn #5550

Closed
nhaarman opened this issue Aug 11, 2017 · 20 comments
Closed

2.x: Single.flatMapObservable does not respect observeOn #5550

nhaarman opened this issue Aug 11, 2017 · 20 comments

Comments

@nhaarman
Copy link

RxJava version 2.1.2.

The following code:

Single.just("Test")
      .subscribeOn(Schedulers.computation())
      .flatMapObservable(
            s -> {
                System.out.println("1: " + Thread.currentThread());

                return Observable.just(1)
                      .observeOn(Schedulers.io())
                      //.doOnNext(o -> System.out.println("2: " + Thread.currentThread()))
                 ;
            }
      )
      .subscribe(o -> {
          System.out.println("3: " + Thread.currentThread());
      });

...produces the following output:

1: Thread[RxComputationThreadPool-1,5,main]
3: Thread[RxComputationThreadPool-1,5,main]

Expected output here is:

1: Thread[RxComputationThreadPool-1,5,main]
3: Thread[RxCachedThreadScheduler-1,5,main]

However, when the line containing doOnNext is uncommented, this is the output:

1: Thread[RxComputationThreadPool-1,5,main]
2: Thread[RxCachedThreadScheduler-1,5,main]
3: Thread[RxCachedThreadScheduler-1,5,main]

It looks like the thread is not switched in the first case.

@akarnokd
Copy link
Member

FlatMap merges the sources on one of the emitting threads, sometimes that means one thread will emit someone else's produced value. This could also be the thread the inner sources are generated, in case the inner source completes fast enough. The inner doOnNext clearly indicates that the 1 is processed on the desired scheduler.

@akarnokd
Copy link
Member

There is a flatMapAsync extensions transformer that collects and reemits values on a specific scheduler.

@artem-zinnatullin
Copy link
Contributor

flatMap indeed can emit downstream value on upstream's thread if upstream completes (or errors) and value from inner observable arrives fast enough, this is caused by drain() call in onComplete() callback from upstream.

@akarnokd is there any particular reason why we need to run drain when upstream completes? Threading is very important sometimes, more determinism here would be great.

@nhaarman your example can be rewritten with Observable.create() instead of Single.just() and removing/delaying completion event of upstream observable causes flatMap to emit on inner's observable thread.

        TestObserver<Object> ts = new TestObserver<Object>();

        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("Test");
                        // e.onComplete(); // Uncomment to reproduce emission on computation thread.
                    }
                })
                .subscribeOn(Schedulers.computation())
                .flatMap(
                        new Function<String, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(String s) throws Exception {
                                System.out.println("1: " + Thread.currentThread());

                                return Observable
                                        .just(1)
                                        .observeOn(Schedulers.io());
                                        
                            }
                        }
                )
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        System.out.println("3: " + Thread.currentThread());
                    }
                })
                .subscribe(ts);

        ts.awaitTerminalEvent();

@artem-zinnatullin
Copy link
Contributor

Ah I see, we need to complete downstream if upstream and all inner observables completed, that's why drain..

Well, I guess it's possible to rewrite flatMap to only "drain" complete events when upstream completes, but that can complicate operator implementation.

@nhaarman
Copy link
Author

I'm not sure if this is working as intended, or a bug. The somewhat non-deterministic behavior is quite confusing, and adding logging to find out what's happening actually fixes the 'issue', reminding of the Heisenbug.
Especially on Android, this behavior may lead to crashes if the value is not emitted on the main thread.

@akarnokd
Copy link
Member

Use observeOn to route events to the desired thread, for example, before subscribe().

@nhaarman
Copy link
Author

My real-world scenario currently requires observeOn to happen inside the flatMapObservable, due to abstractions.

@akarnokd
Copy link
Member

Especially on Android, this behavior may lead to crashes if the value is not emitted on the main thread

That's why peoply, by reflex, apply .observeOn(AndroidSchedulers.mainThread()) to route the results back to the main thread.

@JakeWharton
Copy link
Member

The problem with that is you end up delaying frames and breaking sources and emit synchronously. The general recommendation is to push observeOn as far "up" into the streams as possible so the fact that they may not be honored is a bit disturbing.

@artem-zinnatullin
Copy link
Contributor

That's why peoply, by reflex, apply .observeOn(mainThread()) to route the results back to the main thread.

And it's fine, because it feels kinda wrong for the downstream to rely on scheduling applied in inner observable of the flat(switch)Map.

The problem with that is you end up delaying frames and breaking sources and emit synchronously.

Delaying is understandable and usually fine since upstream is async anyways and we're talking milliseconds here, but can you please elaborate on "breaking sources and emit synchronously"?

The general recommendation is to push observeOn as far "up" into the streams as possible so the fact that they may not be honored is a bit disturbing.

I would say that you need to apply observeOn(scheduler) as soon as you actually need to notify downstream/consumer on that particular scheduler, which is kinda opposite to "as far up into the streams as possible".

Especially with Main thread because you usually want to perform as less work on that thread as possible.

So I would say that this issue is actually not a problem, but more something people should know about.

Also looks like there is no alternative way of implementing flat(switch)Map operator so it would only emit on scheduler of the inner observable(s) keeping all existing properties of the operator. Because operator's behavior relies on upstream events (completion and error in particular). @akarnokd, please correct me if I'm wrong.

@JakeWharton
Copy link
Member

If you bind data, delaying a frame will cause a noticeable UI problem.

@akarnokd
Copy link
Member

Well said @artem-zinnatullin.

@artem-zinnatullin
Copy link
Contributor

@JakeWharton you can bind like that:

upstream
 .observeOn(mainScheduler)
 .startWith(initialValueOrObservable)
 .subscribe { }

If you can guarantee that subscribe is called from main thread (which usually true for UI related code) and observable in startWith() doesn't switch thread.

This should be possible to implement pretty elegantly with Kotlin type system in a way, that'll remove startWith on consumer side.

Another option is to create "fast-path" version of AndroidSchedulers.mainScheduler() that runs action without scheduling if execution already happens on the main thread.

However @akarnokd and others had good arguments against it. ReactiveX/RxAndroid#228

But generally speaking skipping one frame during initial value binding doesn't look like a big issue, especially with transition animations (maybe I'm wrong). And whenever it's a big issue — Rx is probably not a good solution for precise frame by frame rendering.

@JakeWharton
Copy link
Member

JakeWharton commented Aug 15, 2017 via email

@akarnokd
Copy link
Member

flatMap is an asynchronous boundary operator and as such, there is no guarantee what thread it will emit on, especially if its main input and inner sources have their own scheduler. That's why, in general, when there is doubt, apply observeOn to ensure the desired thread does the emission.

@artem-zinnatullin

is there any particular reason why we need to run drain when upstream completes?

Because otherwise items would remain indefinitely in the internal buffers and your stream would hang. If backpressure is involved, that's wosre as the downstream won't even request more elements until it received the previously requested amount, which livelocks the stream.

Threading is very important sometimes, more determinism here would be great.

That's why you need observeOn (or flatMapAsync) to give that deterministic emission thread.

@artem-zinnatullin
Copy link
Contributor

If you bind data, delaying a frame will cause a noticeable UI problem.

The issue is that flatMap() can emit on upstream thread instead of inner.

@JakeWharton if you "bind data", what is the scenario when this particular "feature" of flatMap() causes delay that would not occur if flatMap() would guarantee emission on inner thread?

Pseudo-code:

upstream
  .flatMap { inner }
  .subscribe { bind(it) }

You are wrong, yes

Ah, my favorite passive-aggressive open source communication style. Definitely encourages people to collaborate and look for solutions :trollface: 😽

@JakeWharton
Copy link
Member

I don't have time to pick up this thread right now, but just wanted to clarify there's nothing passive about that comment. You postulated that you might be wrong, and I did you the service of clarifying.

@JakeWharton
Copy link
Member

Sorry one more thing and I'll stop being off-topic. There's a beautiful irony in your comment as it's thus far the only thing that's passive-aggressive.

@artem-zinnatullin
Copy link
Contributor

there's nothing passive about that comment

So… just "aggressive" instead of "passive-aggressive"? Okay then :trollface:

There's a beautiful irony in your comment as it's thus far the only thing that's passive-aggressive

You were obviously reading this on device that doesn't render emojis, there is :kissing_cat: at the end!

And there is a slight, but noticeable difference between being sarcastic and passive-aggressive. (like this dot at the end here or "I did you the service of clarifying.")

😽 (:kissing_cat:)


But seriously speaking, I wasn't able to find upstream.flatMap { inner }.bind() scenario that fits to possible problem described by you and could be fixed by guaranteed emission on inner thread.

@akarnokd
Copy link
Member

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants