Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishSubject does not honour subscribeOn #2805

Closed
ashish-tyagi opened this issue Mar 5, 2015 · 5 comments
Closed

PublishSubject does not honour subscribeOn #2805

ashish-tyagi opened this issue Mar 5, 2015 · 5 comments
Labels

Comments

@ashish-tyagi
Copy link

In the code below, the subscriber gets called in main thread itself. If we remove the sleep, subscriber is not even called. Tried version 1.0.7, 1.0.6 and 1.0.4. My understanding is that the subscriber will be called in the passed executor.

  public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Scheduler scheduler = Schedulers.from(executor);

        Subject<Integer, Integer> sub = PublishSubject.create();
        // BufferUntilSubscriber.create();
        sub.subscribeOn(scheduler).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer t1) {
                // This prints 'main'
                System.out.println(Thread.currentThread());
            }
        });

        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        sub.onNext(1);
    }
@ashish-tyagi
Copy link
Author

If we use BufferUntilSubscriber, the behavior is even more puzzling. If there is no sleep, the subscriber gets called in the executor thread. If we put some sleep, the subscriber gets called in main thread. Should not the behavior be consistent, sleep or no sleep?

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Scheduler scheduler = Schedulers.from(executor);

        Subject<Integer, Integer> sub = BufferUntilSubscriber.create();
        sub.subscribeOn(scheduler).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer t1) {
                System.out.println(Thread.currentThread());
            }
        });

        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        sub.onNext(1);
    }

@akarnokd
Copy link
Member

akarnokd commented Mar 5, 2015

Your first example subscribes on the given thread but receives values from the same thread your PublishSubject is emitting. You need observeOn to put those values to the desired thread.

@ashish-tyagi
Copy link
Author

Yes, observeOn works fine.

The behavior in second example is still inconsistent.

@akarnokd
Copy link
Member

akarnokd commented Mar 5, 2015

In the second example, there is a race between the main thread emission and when the BufferUntilSubscriber starts to replay any buffered value. If the main thread is slow, the BufferUntilSubscriber wins but is empty at that point and just relays any value. If the main thread is fast, the BufferUntilSubscriber receives the value first, then it is subscribed to on the specified thread and immediately replays this buffered value.

@ashish-tyagi
Copy link
Author

Thanks for the info. I had a misunderstanding on how onSubscribe() should work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants