Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlatMap and subscribeOn #2925

Closed
vekexasia opened this issue Apr 29, 2015 · 8 comments
Closed

FlatMap and subscribeOn #2925

vekexasia opened this issue Apr 29, 2015 · 8 comments
Labels

Comments

@vekexasia
Copy link

Hello there. Consider this observable

Observable.just(1)
  .flatMap( i -> {
    Timber.d("FLATMAP PRE-1: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
    return Observable.create(s -> {
       Timber.d("FLATMAP PRE-1-CALL: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
       s.onNext(i);
       s.onCompleted();
    })
  .subscribeOn(Schedulers.io())
  .flatMap( i -> {
    Timber.d("FLATMAP POST-1: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
    return Observable.create(s -> {
       Timber.d("FLATMAP POST-1-CALL: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
       s.onNext(i);
       s.onCompleted();
    })
    .subscribe( i -> {
        Timber.d("Subscribe: Thread: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
    });

This will emit the following:

FLATMAP PRE-1: 1 - main
FLATMAP PRE-1-CALL: 1 - main
FLATMAP POST-1: 1 - main
FLATMAP POST-1-CALL: 1 - main
Subscribe: Thread: 1 - main

This means that it subscribeOn is "lost" when flatMap is called?

@benjchristensen
Copy link
Member

I don't understand this code as it doesn't compile as it is pasted. Here is my attempt to refactor it to work:

import rx.Observable;
import rx.schedulers.Schedulers;

public class FlatMapSubscribeOn {

    public static void main(String... args) {

        Observable.just(1)
                .flatMap(i -> {
                    Timber.d("FLATMAP PRE-1: %d - %s", Thread.currentThread());
                    return Observable.create(s -> {
                        Timber.d("FLATMAP PRE-1-CALL: %d - %s", Thread.currentThread());
                        s.onNext(i);
                        s.onCompleted();
                    });
                })
                .subscribeOn(Schedulers.io())
                .flatMap(i -> {
                    Timber.d("FLATMAP POST-1: %d - %s", Thread.currentThread());
                    return Observable.create(s -> {
                        Timber.d("FLATMAP POST-1-CALL: %d - %s", Thread.currentThread());
                        s.onNext(i);
                        s.onCompleted();
                    });
                }).toBlocking().forEach(i -> {
                    Timber.d("Subscribe: Thread: %d - %s", Thread.currentThread());
                });
        ;
    }

    public static class Timber {
        public static void d(String log, Thread thread) {
            System.out.printf(log + "\n", thread.getId(), thread.getName());
        }
    }
}

This outputs the following:

FLATMAP PRE-1: 12 - RxCachedThreadScheduler-1
FLATMAP PRE-1-CALL: 12 - RxCachedThreadScheduler-1
FLATMAP POST-1: 12 - RxCachedThreadScheduler-1
FLATMAP POST-1-CALL: 12 - RxCachedThreadScheduler-1
Subscribe: Thread: 12 - RxCachedThreadScheduler-1

@vekexasia
Copy link
Author

Exactly what i meant. Isn't it suppoused to run flatMap Observable code within another thread?
so that PRE-1-CALL and possibly (POST-1-CALL) are on another thread?

Did i misunderstand something?

@dwursteisen
Copy link

subscribeOn will run the code which perform the subscription into the schedculer given in subscribeOn.

So Observable.just(...) will executed in the io thread pool. And all sub sequent call too, if you don't give another Scheduler using observeOn.

ie:

Observable.just(1) // 1 will be emited in the IO thread pool
                  .subscribeOn(Schedulers.io())
                  .flatMap(...) // will be in the IO thread pool
                  .observeOn(Schedulers.computation())
                  .flatMap(...) // will be executed in the computation thread pool
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(); // will be executed in the Android main thread (if you're running your code on Android)

@nschwermann
Copy link

@dwursteisen thanks for your explanation I found this very helpful

@vekexasia
Copy link
Author

@dwursteisen does it change anything is if I move the .subscribeOn in your example just above the .subscribe call?

@dwursteisen
Copy link

It won't change anything.

Le mer. 20 mai 2015 11:11, Andrea Baccega notifications@github.com a
écrit :

@dwursteisen https://github.com/dwursteisen does it change anything is
if I move the .subscribeOn in your example just above the .subscribe call?


Reply to this email directly or view it on GitHub
#2925 (comment).

@esoxjem
Copy link

esoxjem commented Jul 1, 2016

Thanks @dwursteisen for the explanation

@soshial
Copy link
Contributor

soshial commented Oct 8, 2018

Important thing to note though, that if Observables inside flatMap would subscribe on some other thread, then these flatMaps would override initial thread. E.g.:

    public void main() {
        Observable.just(1)
                  .subscribeOn(Schedulers.io())
                  .observeOn(Schedulers.io()) // this is put just for demonstration
                  .doOnNext(this::printThread) // IO
                  .flatMap(i -> incrementOnIoThread1(i)) // or incrementOnIoThread2(i)
                  .subscribe(this::printThread // computation
                             , throwable -> {});
    }

    public Observable<Integer> incrementOnIoThread1(int i) {
        return Observable.just(i + 1)
                .observeOn(Schedulers.computation());
    }

    public Observable<Integer> incrementOnIoThread2(int i) {
        return Observable.just(i + 1)
                .subscribeOn(Schedulers.computation());
    }

The only thing we can to alleviate this, is to add observeOn after flatMap, as Karnok said here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants