Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishSubject with subscribeOn ==> loss of data during onNext calls #5575

Closed
OleksandrKucherenko opened this issue Aug 30, 2017 · 6 comments

Comments

@OleksandrKucherenko
Copy link

OleksandrKucherenko commented Aug 30, 2017

Library version: 1.3.0

Run Test N times (I try at least 10), flaky behavior. Very often subscribers do not receive the value:

    private final Executor mExecutor = Executors.newFixedThreadPool(20);

    @Test
    public void testMultiThreadingAssignment() throws Exception {
        final Subject<Object, Object> subject = PublishSubject.create();

        final CountDownLatch countdown = new CountDownLatch(2 /* subscribers */ * 2 /* values */);
        final List<Subscription> subscriptions = new ArrayList<>();

        final Observable<Object> caller = subject.asObservable()
                .subscribeOn(Schedulers.from(mExecutor))
                .doOnNext(new Action1<Object>() {
                    @Override
                    public void call(final Object lookup) {
                        logTo(_log).info("-- @%s", lookup.hashCode());
                    }
                })
                .observeOn(Schedulers.from(mExecutor))
                ;

        final Action1<Object> onNext = new Action1<Object>() {
            @Override
            public void call(final Object lookup) {
                countdown.countDown();
                logTo(_log).info("%s @%s", lookup, lookup.hashCode());
            }
        };

        // 2 subscriptions
        subscriptions.add(caller.subscribe(onNext));
        subscriptions.add(caller.subscribe(onNext));

        // 2 values
        subject.onNext(new Object());
        subject.onNext(new Object());

        assertThat(countdown.await(1, TimeUnit.SECONDS))
                .describedAs("counter %s", countdown.getCount())
                .isTrue();

        // verify that subscriptions are still in active state
        assertThat(subscriptions.get(0).isUnsubscribed()).isFalse();
        assertThat(subscriptions.get(1).isUnsubscribed()).isFalse();

        // cleanup, free threads
        for (final Subscription s : subscriptions) {
            s.unsubscribe();
        }
    }

I play with test and found that the reason of failure is only one line:

                .subscribeOn(Schedulers.from(mExecutor))

replacement of subscribeOn by observeOn make test working without any problem and return expected result.

@OleksandrKucherenko
Copy link
Author

looks like the same issue: #2805

@OleksandrKucherenko
Copy link
Author

screen shot 2017-08-30 at 10 21 10

@akarnokd
Copy link
Member

akarnokd commented Aug 30, 2017

And the answer is the same. There is a race between PublishSubject getting subscribed to asynchronously and sending the subject values. subscribeOn delays the subscription a tiny bit so probabilistically you are concurrently onNext-ing a subject that has no subscribers.

@OleksandrKucherenko
Copy link
Author

Is it possible to use something similar to PublishSubject but with fixed race condition behavior ?!

@akarnokd
Copy link
Member

Don't use subscribeOn with PublishSubject use ReplaySubject or UnicastSubject to hold onto the data until there is a subscriber.

@OleksandrKucherenko
Copy link
Author

Thanks!

@akarnokd akarnokd closed this as completed Sep 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants