Skip to content

OperatorMerge does not request enough #1941

@davidmoten

Description

@davidmoten

Merging of async sources using the backpressure path can result in a stalled stream because not enough items are requested of the InnerSubscribers.

The test below fails 6 out of 10 runs.

The situation is this:

Given a merge of two slowish (1/ms) asynchronous sources that downstream has a take(N) where N >256, then 6 out 10 runs OperatorMerge requests 128 (RxRingBuffer.SIZE) emissions from each source but the operator fails to request more than 256 and the stream stalls.

        @Test
    public void testMergeKeepsRequesting() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        Observable.range(1, 2)
        // produce many integers per second
                .flatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(final Integer number) {
                        return Observable.range(1, Integer.MAX_VALUE)
                        // pause a bit
                                .doOnNext(pauseForMs(1))
                                // buffer on backpressure
                                .onBackpressureBuffer()
                                // do in parallel
                                .subscribeOn(Schedulers.computation());
                    }

                })
                // take a number bigger than 2* RxRingBuffer.SIZE (used by
                // OperatorMerge)
                .take(RxRingBuffer.SIZE * 2 + 1)
                // log count
                .doOnNext(printCount())
                // release latch
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        latch.countDown();
                    }
                }).subscribe();
        assertTrue(latch.await(1, TimeUnit.SECONDS));
    }

        private static Action1<Integer> printCount() {
        return new Action1<Integer>() {
            long count;

            @Override
            public void call(Integer t1) {
                count++;
                System.out.println("count=" + count);
            }
        };
    }

    private static Action1<Integer> pauseForMs(final long time) {
        return new Action1<Integer>() {
            @Override
            public void call(Integer s) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions