Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException in merge #2542

Closed
abersnaze opened this issue Jan 28, 2015 · 3 comments
Closed

NullPointerException in merge #2542

abersnaze opened this issue Jan 28, 2015 · 3 comments
Labels

Comments

@abersnaze
Copy link
Contributor

This may be related to #1845 but @neerajrj and I were able to create a unit test that causes an NPE in merge.

Exception in thread "RxComputationThreadPool-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:52)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorNotImplementedException
    at rx.Observable$31.onError(Observable.java:7126)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:463)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.access$6(OperatorMerge.java:442)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:586)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:463)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.access$6(OperatorMerge.java:442)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:586)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:672)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:579)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:100)
    at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
    at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
    at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
    at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:151)
    at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:164)
    at rx.internal.operators.OperatorWindowWithTime$ExactSubscriber.replaceSubject(OperatorWindowWithTime.java:200)
    at rx.internal.operators.OperatorWindowWithTime$ExactSubscriber.nextWindow(OperatorWindowWithTime.java:308)
    at rx.internal.operators.OperatorWindowWithTime$ExactSubscriber$1.call(OperatorWindowWithTime.java:282)
    at rx.Scheduler$Worker$1.call(Scheduler.java:120)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
    ... 7 more
Caused by: java.lang.NullPointerException
    at rx.internal.util.RxRingBuffer.poll(RxRingBuffer.java:397)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.drainAll(OperatorMerge.java:754)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.drainQueue(OperatorMerge.java:775)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.access$1(OperatorMerge.java:771)
    at rx.internal.operators.OperatorMerge$MergeSubscriber$1.call(OperatorMerge.java:420)
    at rx.internal.operators.OperatorMerge$MergeSubscriber$1.call(OperatorMerge.java:1)
    at rx.internal.util.IndexedRingBuffer.forEach(IndexedRingBuffer.java:471)
    at rx.internal.util.IndexedRingBuffer.forEach(IndexedRingBuffer.java:437)
    at rx.internal.util.SubscriptionIndexedRingBuffer.forEach(SubscriptionIndexedRingBuffer.java:131)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.drainChildrenQueues(OperatorMerge.java:376)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.drainQueuesIfNeeded(OperatorMerge.java:351)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.access$2(OperatorMerge.java:344)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:705)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:579)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:669)
    ... 21 more
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: java.lang.Long.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:672)
    ... 21 more

unit test that causes the error

    @Test
    public void test() {
        Observable<Long> src = Observable.create(new OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> t1) {
                Random rand = new Random();
                while (true) {
                    t1.onNext(rand.nextLong());
                }
            }
        });

        Observable<Long> x = src.groupBy(new Func1<Long, Long>() {
            @Override
            public Long call(Long t1) {
                return t1 % 100000;
            }
        }).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
            @Override
            public Observable<Long> call(GroupedObservable<Long, Long> group) {
                return group.observeOn(Schedulers.computation()).timeout(1, TimeUnit.SECONDS, Observable.empty()).window(100, 100, TimeUnit.MILLISECONDS).flatMap(new Func1<Observable<Long>, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(Observable<Long> window) {
                        return window.countLong();
                    }
                });
            }
        });

        x.subscribe(new Action1<Long>() {
            @Override
            public void call(Long t1) {
                System.out.println(t1);
            }
        });
    }
@akarnokd
Copy link
Member

This is the usual concurrent unsubscription bug with the current RxRingBuffer. There is a fix (#2333) but it degrades some perf numbers (cost of correctness) and is unlikely to be merged.

@akarnokd akarnokd added the Bug label Jan 28, 2015
@abersnaze
Copy link
Contributor Author

rebased the test on top of the latest 1.x branch and the unit test is no longer failing.

@benjchristensen
Copy link
Member

Excellent! This is a good validation of #2553

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants