Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong behavior of buffer( timespan, count ) ? #756

Closed
Acardiac opened this issue Jan 15, 2014 · 5 comments
Closed

Wrong behavior of buffer( timespan, count ) ? #756

Acardiac opened this issue Jan 15, 2014 · 5 comments

Comments

@Acardiac
Copy link
Contributor

According to the marble diagram, in case the buffer is full, all subsequent items are ignored till time interval has elapsed.
(green item -> https://github.com/Netflix/RxJava/wiki/Transforming-Observables#buffer )

But according to the description:

This version of buffer( ) emits a new bundle of items for every count items emitted by the source Observable, or, if timespan has elapsed SINCE ITS LAST BUNDLE EMISSION, it emits a bundle of however many items the source Observable has emitted in that span, even if this is less than count.

imho the timespan timer as well as counter should be restarted immediately after buffer emission, so none of the items are dropped.

Compare to Lee Campbell:
http://www.introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html#Buffer

Some systems may have a sweet spot for the size of a batch they can process, but also have a time constraint to ensure that data is not stale. In this case buffering by both time and count would be suitable.....
We never get a buffer containing more than COUNT elements, and we never wait more than TIMESPAN seconds.....
The variants we have looked at so far do not overlap and have no gaps between buffers, i.e. ALL VALUES FROM THE SOURCE ARE PROPAGATED THROUGH.

see also:
http://msdn.microsoft.com/en-us/library/hh229200.aspx

Indicates EACH element of an observable sequence into a buffer that’s sent out when either it’s full or a given amount of time has elapsed.

Bug or feature?

@Acardiac
Copy link
Contributor Author

Ok, the marble diagram was updated, but it still does not work.
git SHA ID: 10f172d

    public static void main(final String[] args) throws InterruptedException {

        final int maxRounds = 1000;
        final int maxCount = 1000;
        final Scheduler scheduler = Schedulers.computation();

        int round = 0;
        boolean pass = true;
        final AtomicInteger count = new AtomicInteger(0);
        do {
            ++round;
            count.set(0);
            Observable<List<Integer>> observable = Observable.range(1, maxCount)
                    .parallel(o -> o.<Integer>map(val -> Integer.parseInt(val.toString())), scheduler)
                    .buffer(42, TimeUnit.MICROSECONDS, 42, scheduler)
                    .filter(list -> !list.isEmpty());

            final CountDownLatch lock = new CountDownLatch(1);

            observable
                    .subscribeOn(Schedulers.newThread())
                    .subscribe( //
                            val -> count.addAndGet(val.size()), //
                            err -> {
                                System.err.println(err.getMessage());
                                lock.countDown();
                            },
                            lock::countDown
                    );
            lock.await();
            pass = maxCount == count.get();
            if (pass)
                System.out.println("round: " + round + " passed");
        } while (pass && round < maxRounds);

        System.out.println("in: " + maxCount + " out:" + count.get() + " round: " + round);
    }

Output:

in: 1000 out:954 round: 1

@benjchristensen
Copy link
Member

Can you refactor that as a unit test with necessary assertions so it can go inside https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java ?

@benjchristensen
Copy link
Member

Unit tests provided by @Acardiac in #761 demonstrating the issue:

@Test
    public void bufferTimeSpanEmitAllItems() throws InterruptedException {

        final int maxRounds = 100;
        final int maxCount = 1000;

        final int bufferTimeSpan = 42; // µs

        int round = 0;
        final AtomicInteger count = new AtomicInteger(0);
        do {
            ++round;
            count.set(0);

            Observable<List<Integer>> observable = Observable //
                    .range(1, maxCount) //
                    .buffer(bufferTimeSpan, TimeUnit.MICROSECONDS);

            final CountDownLatch lock = new CountDownLatch(1);

            observable.subscribe( // onNext
                    new Action1<List<Integer>>() {
                        @Override
                        public void call(List<Integer> val) {
                            count.addAndGet(val.size());
                        }
                    }, // onError
                    new Action1<Throwable>() {
                        @Override
                        public void call(Throwable err) {
                            lock.countDown();
                        }
                    }, // onComplete
                    new Action0() {
                        @Override
                        public void call() {
                            lock.countDown();
                        }
                    }
            );
            lock.await();
        } while ((maxCount == count.get()) && (round < maxRounds));

        assertEquals(maxCount, count.get());
        assertEquals(maxRounds, round);
    }

    @Test
    public void bufferCountEmitAllItems() throws InterruptedException {

        final int maxRounds = 100;
        final int maxCount = 1000;

        final int bufferCount = 42;

        int round = 0;
        final AtomicInteger count = new AtomicInteger(0);
        do {
            ++round;
            count.set(0);

            Observable<List<Integer>> observable = Observable //
                    .range(1, maxCount) //
                    .buffer(bufferCount);

            final CountDownLatch lock = new CountDownLatch(1);

            observable.subscribe( // onNext
                    new Action1<List<Integer>>() {
                        @Override
                        public void call(List<Integer> val) {
                            count.addAndGet(val.size());
                        }
                    }, // onError
                    new Action1<Throwable>() {
                        @Override
                        public void call(Throwable err) {
                            lock.countDown();
                        }
                    }, // onComplete
                    new Action0() {
                        @Override
                        public void call() {
                            lock.countDown();
                        }
                    }
            );
            lock.await();
        } while ((maxCount == count.get()) && (round < maxRounds));

        assertEquals(maxCount, count.get());
        assertEquals(maxRounds, round);
    }

@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

The rewritten exact buffer with count+timespan creates a new buffer if either count or timespan is reached. The inexact-timed version emits the full buffer but does not start a new window until the timespan ellapses. Do we need to ensure no values are left out?

@benjchristensen
Copy link
Member

Both of those unit tests are passing for me (v0.18.3+) now whereas at least 1 of them was failing still as of 0.17.6.

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants