Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple Scheduler Question #3062

Closed
AlabamaJam opened this issue Jul 3, 2015 · 7 comments
Closed

Simple Scheduler Question #3062

AlabamaJam opened this issue Jul 3, 2015 · 7 comments

Comments

@AlabamaJam
Copy link

Trying to get started with RxJava and my very first attempt is failing. Can someone explain to me why the following code does not print anything out? If I remove the .subscribeOn(...), it works fine, but the goal here was to simulate subscribing to some long-running task on the IO thread and observing it back on the main thread. I tried adding an .observeOn(Schedulers.immediate()), but it didn't seem to matter if I included it or not.

Observable.from(new Integer[]{1, 2, 3, 4, 5})
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() { System.out.println("onCompleted"); }

            @Override
            public void onError(Throwable e) { e.printStackTrace(); }

            @Override
            public void onNext(Integer integer) { System.out.println("onNext: " + integer); }
        });

Can someone point me in the right direction, please? Thanks.

@davidmoten
Copy link
Collaborator

Here's one way using CountDownLatch:

final CountDownLatch latch = new CountDownLatch(1);
Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                latch.countDown();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext: " + integer);
            }
        });
latch.await();

and another way (which uses CountDownLatch under the covers):

Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
    .doOnNext(n -> System.out.println("onNext: " + n))
    .doOnCompleted(() -> System.out.println("onCompleted"))
    .doOnError(e -> e.printStackTrace())
    .count().toBlocking().single();

@davidmoten
Copy link
Collaborator

If you don't have java 8 at your disposal this might be convenient:

Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
        .doOnEach(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext: " + integer);
            }
        }).count().toBlocking().single();

@AlabamaJam
Copy link
Author

I appreciate the alternative approaches, but can you tell me why my original code doesn't work? You changed several operators and introduced new external things (latch). How/why did you get to those alternatives from the original?

Thanks.

@davidmoten
Copy link
Collaborator

The effect of subscribeOn in your case is to put all the processing on to one daemon thread created by the Schedulers.io(). If you did Thread.sleep(5000); straight after your subscribe call then you'd see the results but otherwise being a daemon thread it will be killed by the exiting JVM and you won't see anything.

.observeOn(Schedulers.immediate()) would have no affect because it would just ask for observation to happen on the current thread once the observable has subscribed thus would observe on that one daemon thread I mentioned above.

So the catch here is you are trying to observe on the main thread. If rather you were trying to observe on a non-current-thread scheduler (not immediate() or trampoline()) things would be different but the main thread would still have to block so that you saw the results.

@AlabamaJam
Copy link
Author

Ok, that's starting to help a bit. Would my code work if my observable was something (anything) more time intensive than static data? Reading a byte from disk, for example? Is it a race condition issue of sorts?

@davidmoten
Copy link
Collaborator

No, wouldn't help because as soon as you call subscribe a thread is started that does your stuff and if immediately after starting that thread you exit the main method then the thread will be killed regardless of what it is doing (the jvm doesn't block and wait for the thread to complete).

@AlabamaJam
Copy link
Author

Oh! Thank you, that's what I was missing. I think it just clicked. I appreciate your help. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants