Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does subscribeOn call order matter? #5459

Closed
Joseph82 opened this issue Jul 1, 2017 · 7 comments
Closed

Does subscribeOn call order matter? #5459

Joseph82 opened this issue Jul 1, 2017 · 7 comments

Comments

@Joseph82
Copy link

Joseph82 commented Jul 1, 2017

Hi, I am using RxJava 2.1.1. I thought I knew how subscribeOn works, but sometimes I still have some doubts about it.

As far as I know, for subscribeOn it doesn't metter the position where you use it (in the chain).

Assuming that I am executing this code from my MAIN THREAD.

Observable.just("")
                .doOnSubscribe(/* NEW THREAD */)
                .subscribeOn(Schedulers.newThread())
                .subscribe(/* NEW THREAD */);

As expected the doOnSubscribe() is called on the NEW THREAD, because of subscribeOn(Schedulers.newThread())

But, if I call subscribeOn before doOnSubscribe the result will be different:

Observable.just("")
                .subscribeOn(Schedulers.newThread())
                .doOnSubscribe(/* MAIN THREAD */)
                .subscribe(/* NEW THREAD */);

Now the doOnSubscirbe() seems to be executed in the MAIN THREAD, regardless of subscribeOn(Schedulers.newThread()).

Why is this happening?

@akarnokd
Copy link
Member

akarnokd commented Jul 1, 2017

subscribeOn moves the subscription side-effects of the upstream to another thread but doesn't affect the downstream's subscription side-effects. Therefore, it matters were you put it if you have subscription side-effecting operators before it.

Let's walk through what happens in the first case. The chain consist of Just -> DoOnSubscribe -> SubscribeOn -> LambdaObserver operators but the subscription process walks in reverse:

Step Thread Action
1 main Creation of LambdaObserver
2 main SubscribeOn.subscribe(LambdaObserver)
3 main SubscribeOn calls onSubscribe on LambdaObserver
4 main SubscribeOn schedules a subscribe action on newThread
5 newthread DoOnSubscribe.subscribe(SubscribeOnObserver)
6 newthread Just.subscribe(DoOnSubscribeObserver)
7 newthread Just calls onSubscribe on DoOnSubscribeObserver
8 newthread DoOnSubscribe calls the lambda and then onSubscribe on SubscribeOnObserver
9 newthread Since SubscribeOnObserver already called onSubscribe, the call returns to Just
10 newthread Just emits onNext

Now if doOnSubscribe is after subscribeOn:

Step Thread Action
1 main Creation of LambdaObserver
2 main DoOnSubscribe.subscribe(SubscribeOnObserver)
3 main SubscribeOn.subscribe(DoOnSubscribeObserver)
4 main SubscribeOn calls onSubscribe on DoOnSubscribeObserver
5 main DoOnSubscribe calls the lambda and then onSubscribe on LambdaObserver
6 main SubscribeOn schedules a subscribe action on newThread
7 newthread Just.subscribe(SubscribeOnObserver)
8 newthread Just calls onSubscribe on SubscribeOnObserver
9 newthread Since SubscribeOnObserver already called onSubscribe, the call returns to Just
10 newthread Just emits onNext

@Joseph82
Copy link
Author

Joseph82 commented Jul 1, 2017

That's a perfect explanation. Thank you very much.

@Joseph82 Joseph82 changed the title Does subscribeOn call order matter? Does subscribeOn call order matter? Jul 2, 2017
@akarnokd
Copy link
Member

akarnokd commented Aug 6, 2017

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@akarnokd akarnokd closed this as completed Aug 6, 2017
@dakshj
Copy link

dakshj commented Mar 17, 2018

Hi @akarnokd,

I too had doubts on the ordering of subscribeOn when there are multiple side effects and chains (using a flatMap) involved, I wanted to confirm if I got the logic correctly. Your help would be appreciated! I've tried to hopefully keep the code understandable.

In the below code, the general requirement is that any calls to readingDao should occur on rxSchedulers.database; and any calls to api should occur on rxSchedulers.network:

    Single.fromCallable { readingDao.getNextUploadBatch() }

            // Switch to the database thread, because of the Timber log of readingDao.getCount()
            .subscribeOn(rxSchedulers.database)

            // Run the 2nd task using the 1st task's result
            .flatMapCompletable {
                readings = it

                // If readings is empty, then finish the job, and run an empty Completable
                // so that it directly jumps to onComplete
                if (readings.isEmpty()) {
                    Timber.d("Actual readings count in DB = ${readingDao.getCount()}")
                    jobFinished(job, false)

                    // Return a no-action Completable that immediately completes
                    Completable.complete()
                }

                // Else, upload the readings; run it on the network thread
                else {
                    api.uploadSensorReadings(readings)
                            // Switch to the database thread for the below side effect tasks
                            .subscribeOn(rxSchedulers.database)

                            .doFinally {
                                jobFinished(job, !readingDao.isEmpty())
                            }
                            .doOnComplete {
                                readingDao.delete(*readings.toTypedArray())
                            }

                            // Switch to the network thread for the Completable API call
                            .subscribeOn(rxSchedulers.network)
                }
            }

            // Run the Single on the database thread
            .subscribeOn(rxSchedulers.database)

            // Execute the below observer code on the database thread again
            .observeOn(rxSchedulers.database)
            .subscribe(
                    { /* Already handled in doOnComplete */ },
                    {
                        if (readings.isNotEmpty()) {
                            readingDao.markCurrentReadingsAsNotUploading()
                        }
                    }
            )

The original question (regarding conditionally chaining a Single and a Completable) was answered by you on this StackOverflow question.

Thanks for all your help in assisting me to understand Rx!
Daksh.

@akarnokd
Copy link
Member

@dakshj Does it work as you expected? -> You are done! Does it behave unexpectedly? -> Isolate the problematic part and post a new question on StackOverflow.

@dakshj
Copy link

dakshj commented Mar 17, 2018

It is working as expected! However I'm not sure how to programmatically check what the current Scheduler is. So, I wanted to run it by you if I indeed have understood the logic correctly!
I've also posted a StackOverflow question here. Thanks!

@leinli03
Copy link

https://stackoverflow.com/questions/50075574/doonsubscribe-gets-called-on-main-thread
the answer here seems not quite match the explaintion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants