Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplaySubject Race Condition #1147

Closed
benjchristensen opened this issue May 5, 2014 · 7 comments
Closed

ReplaySubject Race Condition #1147

benjchristensen opened this issue May 5, 2014 · 7 comments

Comments

@benjchristensen
Copy link
Member

A race condition with ReplaySubject was found while debugging a bug report with Hystrix: Netflix/Hystrix#257

It appeared starting with v0.17 and still exists in the master branch.

I have tracked it down to this code in ReplaySubject.java:

    private static <T> int replayObserverFromIndex(History<T> history, Integer l, SubjectObserver<? super T> observer) {
        while (l < history.index.get()) {
            observer.onNext(history.list.get(l));
            l++;
        }
        if (history.terminalValue.get() != null) {
            if(l == 0) {
                System.out.println(">>>>>>>>>>>>> terminating without emitting value: ");
            }
            history.terminalValue.get().accept(observer);
        }

        return l;
    }

A terminal value is being emitted before the onNext is received, yet in the Hystrix case it is emitting a value. I'm working on a stand-alone test case for ReplaySubject.

@benjchristensen
Copy link
Member Author

It only happens very occasionally, but I reproduced it with this:

    @Test
    public void testReplayAsCache() {
        final List<Integer> expected = Arrays.asList(1);
        for (int i = 0; i < 1000000; i++) {
            TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
            Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(ts);
            ts.awaitTerminalEvent();
            ts.assertReceivedOnNext(expected);
            ts.assertTerminalEvent();
        }
    }

@akarnokd
Copy link
Member

akarnokd commented May 5, 2014

I think I found the issue:

private static <T> int replayObserverFromIndex(History<T> history, int l, SubjectObserver<? super T> observer) {
        while (l < history.index.get()) {
            observer.onNext(history.list.get(l));
            l++;
        }
// --> if the emission and termination event happens here, the client won't receive the value but see the termination event only.
        if (history.terminalValue.get() != null) {
            if (l == 0) {
                new IllegalStateException("Value expected here! Index: " + history.index.get()).printStackTrace();
            }
            history.terminalValue.get().accept(observer);
        }

        return l;
    }

@akarnokd
Copy link
Member

akarnokd commented May 5, 2014

Here is a ReplaySubject that did not fail for 100M iterations. It puts the terminal value onto the list. I haven't checked if performance is affected in some way.

@benjchristensen
Copy link
Member Author

Awesome, thank you @akarnokd. You got a fix before I even had time to start digging in!

It does indeed seem to have fixed it. I'm going to review it more closely and do performance testing, and if all looks well will move forward with this. Do you want to submit a PR with it so it's your merge?

@benjchristensen
Copy link
Member Author

Done reviewing, the code looks good. Using the NotificationLite mechanism makes it so there is a single list reference now, instead of having list + terminalValue which eliminates the race between the two.

With this fix in place I can no longer see the Hystrix bug and the Rx unit test now always passes.

Performance

I tested performance using the already existing JMH tests:

../gradlew benchmarks '-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ReplaySubjectPerf.*'

Before
Result : 30371290.196 ±(99.9%) 2404998.858 ns/op
  Statistics: (min, avg, max) = (29923647.059, 30371290.196, 31414333.333), stdev = 624570.566
  Confidence interval (99.9%): [27966291.338, 32776289.055]


Benchmark                                     (nextRuns)   Mode   Samples         Mean   Mean error    Units
r.s.ReplaySubjectPerf.subscribeAfterEvents             1   avgt         5      465.227        3.719    ns/op
r.s.ReplaySubjectPerf.subscribeAfterEvents           512   avgt         5    13806.833      254.181    ns/op
r.s.ReplaySubjectPerf.subscribeAfterEvents          1024   avgt         5    26448.178      654.800    ns/op
r.s.ReplaySubjectPerf.subscribeAfterEvents       1048576   avgt         5 28268927.778   869942.963    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents            1   avgt         5      648.489       12.827    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents          512   avgt         5    13053.491      214.858    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents         1024   avgt         5    25706.752      401.549    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents      1048576   avgt         5 30371290.196  2404998.858    ns/op
After
Result : 24400896.190 ±(99.9%) 2129385.605 ns/op
  Statistics: (min, avg, max) = (24041904.762, 24400896.190, 25358100.000), stdev = 552994.679
  Confidence interval (99.9%): [22271510.586, 26530281.795]


Benchmark                                     (nextRuns)   Mode   Samples         Mean   Mean error    Units
r.s.ReplaySubjectPerf.subscribeAfterEvents             1   avgt         5      461.561       11.843    ns/op
r.s.ReplaySubjectPerf.subscribeAfterEvents           512   avgt         5    16507.871     2031.269    ns/op
r.s.ReplaySubjectPerf.subscribeAfterEvents          1024   avgt         5    31841.435      470.984    ns/op
r.s.ReplaySubjectPerf.subscribeAfterEvents       1048576   avgt         5 29828781.599  3150737.584    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents            1   avgt         5      675.099       70.664    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents          512   avgt         5    11538.548      348.024    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents         1024   avgt         5    22572.613      203.016    ns/op
r.s.ReplaySubjectPerf.subscribeBeforeEvents      1048576   avgt         5 24400896.190  2129385.605    ns/op

The changes slightly impact performance ... not really sure why ... but they are small. I'm going to proceed with this fix.

@akarnokd
Copy link
Member

akarnokd commented May 5, 2014

I guess it is the small overhead from NotificationLite, perhaps adding final to History would enable some JVM optimizations. I'll have a look tomorrow.

@benjchristensen
Copy link
Member Author

Thanks for your help on this, it made it possible to quickly release a fix for Hystrix: https://github.com/Netflix/Hystrix/releases/tag/1.3.16

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants