Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different behavior with Rx 4 and buffer #1610

Closed
matthewwithanm opened this issue Apr 12, 2016 · 18 comments
Closed

Different behavior with Rx 4 and buffer #1610

matthewwithanm opened this issue Apr 12, 2016 · 18 comments
Assignees

Comments

@matthewwithanm
Copy link

RxJS version: 5.0.0-beta5

Code to reproduce:

const a = Observable.of(1, 2, 3, 4).share();
a
  .buffer(a.filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

Expected behavior:

[1, 2]
[3, 4]
[]

Actual behavior:

[]
[]

Additional information:

The "expected behavior" is what Rx 4 does, but maybe there's a more correct way to do this?

@staltz
Copy link
Member

staltz commented Apr 13, 2016

The default scheduler in RxJS 5 is the null or recursive one, this is the breaking change with most impact when compared to RxJS 4 where the scheduler is trampoline by default. I think if you would change the source Observable from of to interval it would work as expected.

Buffer first subscribes to the closing notifier a.filter, consuming all its synchronous emissions, then it subscribes to the given source a, which happens to emit nothing because everything was emitted already.

If you do this

const a = Rx.Observable.of(1, 2, 3, 4)
  .subscribeOn(Rx.Scheduler.async)
  .share();
a
  .buffer(a.filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

You'll notice the output

[1]
[2, 3]

Which makes sense if you see that a.filter is subscribed to before a.buffer is. When 2 is emitted, the current buffer with [1] is closed, then 2 is inserted in the buffer.

You can do it with scan though:

Rx = Rx.KitchenSink;
const a = Rx.Observable.of(1, 2, 3, 4)
  .scan(({ready, arr}, i) => {
    if (i % 2 === 0) {
      return {ready: true, arr: arr.concat(i)};
    } else if (ready) {
      return {ready: false, arr: [i]};
    } else {
      return {ready: false, arr: arr.concat(i)}
    }
  }, {ready: false, arr: []})
  .filter(acc => acc.ready)
  .map(acc => acc.arr)
  .subscribe(x => console.log(x));
[1, 2]
[3, 4]

@mattpodwysocki
Copy link
Collaborator

@staltz I would argue that this is a breaking change for no good reason and is in fact a bug

@staltz
Copy link
Member

staltz commented Apr 13, 2016

@trxcllnt ?

ghost pushed a commit to facebookarchive/nuclide that referenced this issue Apr 13, 2016
Summary:This change upgrades us to the RxJS 5 beta. It includes:

* Update Flow definitions to reflect new APIs
* Add `DisposableSubscription` and `CompositeSubscription` for dealing with new Subscription API (See also ReactiveX/rxjs#1583)
* Add `bufferUntil` to replace some `buffer()` usages (See ReactiveX/rxjs#1610)
* Remove rx-dom
* Update package.json
* Update all code to use new APIs

Reviewed By: peterhal

Differential Revision: D3131543

fb-gh-sync-id: 437c7b90d6e1fd8e463a57eb341146bbc4e9bbd3
fbshipit-source-id: 437c7b90d6e1fd8e463a57eb341146bbc4e9bbd3
@benlesh
Copy link
Member

benlesh commented Apr 13, 2016

a breaking change for no good reason

There there were several really good reasons for this breaking change.

  1. 95% of observables are async, so default scheduling is unnecessary and this issue is outlining an edge-case.
  2. It's extremely rare that scheduling is needed to avoid a stack overflow, so default trampoline scheduling is unnecessary.
  3. recursive scheduling is much, much faster than trampoline scheduling. Given 1 & 2 above, recursive was a better choice as the default to insure optimal performance.
  4. To make all scheduling trampoline by default would throw out a lot of the benefit of this rewrite.

Other things:

of and from, per the es-observable spec are supposed to be scheduled on the next micro task. We're not currently doing that, but we should be. That would make this problem go away by itself.

@benlesh
Copy link
Member

benlesh commented Apr 13, 2016

Also, the sample code is really an illustration of an anti-pattern in Rx: shared synchronous observables with share() or refCount(). That's bad juju and a prime use case for publish() and connect().

@mattpodwysocki
Copy link
Collaborator

@trxcllnt I disagree

  1. In my many years of using RxJS since 2010, most of the sources from RxJS are NOT async at all, such as a range of numbers, some static data, etc
  2. The default scheduling with trampolining was two-fold, the first was to keep items in order for recursive scheduling (note not just regular recursive, but passing in a scheduler to schedule more items). The second was to avoid the stack overflow with really large sequences.
    js Rx.Observable.of(1,2,3).subscribe(x => { Rx.Scheduler.queue.schedule(42, (scheduler, state) => scheduler.schedule(null, () => console.log(state)) })
  3. By default using the queued scheduler, you do both, when you did a scheduleRecursive in RxJS v4 and before.
  4. I care more about correctness in this case in which the user's expectations once again were not met.

of and from are not the root causes here, because if I did create with a sync and using the right recursive scheduler, then I'd have a problem here.

@matthewwithanm
Copy link
Author

Setting aside the sync issue for a sec, the results are different for async too:

const a = Observable.interval(1000).take(5).share();
a
  .buffer(a.filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

4:

[ 0 ]
[ 1, 2 ]
[ 3, 4 ]
[]

5:

[]
[ 0, 1 ]
[ 2, 3 ]

@Igorbek
Copy link
Contributor

Igorbek commented Apr 13, 2016

In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa. What is correct? Is any reason to subscribe to notifier first?

@benlesh
Copy link
Member

benlesh commented Apr 14, 2016

In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa.

This is probably a source of the real issue here.

@benlesh benlesh added the bug Confirmed bug label Apr 14, 2016
@trxcllnt
Copy link
Member

@mattpodwysocki I whole-heartedly agree with you. Now that we've got the operator-subscribes-to-the-source PR merged in, we can fix the real issue, which is:

In v4 buffer first subscribes to the source then to the notifier, in v5 vice versa.

@benlesh
Copy link
Member

benlesh commented Apr 14, 2016

@trxcllnt so you're saying that you agree with @mattpodwysocki's assertion that we should be trampoline scheduling by default? (if I'm understanding his assertions correctly). Because I'm against that. It would be counter to many of the stated goals of this project

@trxcllnt
Copy link
Member

@Blesh oh no, just that this is a bug. the recursive vs. trampoline scheduling was settled a long time ago by RxJava.

@benlesh
Copy link
Member

benlesh commented Apr 15, 2016

Yeah, I think we all agree this is a bug.

@benlesh
Copy link
Member

benlesh commented Feb 21, 2017

Leaving this open, as we'll have this change in the next major version (as it's breaking for some users)

benlesh pushed a commit that referenced this issue Jun 14, 2017
…2195)

In buffer operator subscribe to source observable first, so that when
closingNotifier emits value, all source values emited before land in buffer

Closes #1610
BREAKING CHANGE:
When source and closingNotifier fire at the same time, it is expected
that value emitted by source will first land in buffer and then
closingNotifier will close it. Because of reversed subscription order,
closingNotifier emitted first, so source was not able to put value in
buffer before it was closed. Now source is subscribed before closingNotifier,
so if they fire at the same time, source value is put into buffer and then
closingNotifer closes it.
@Parakleta
Copy link

Just got tripped up by this. Three questions:

  1. Until this is released, is there a better work-around than using a Subject to cause an immediate subscription for the source? i.e.:
source$.publish(stream$ => stream$.buffer(stream$.throttle(100)));

becomes

source$.publish(stream$ => {
    const sub$ = new Subject();
    stream$.subscribe(sub$);
    return sub$.buffer(stream$.throttle(100));
});
  1. Should this also be applied to other operators with notifiers, such as bufferToggle(openings), delayWhen(subscriptionDelay), distinct(flushes), sample(notifier), skipUntil(notifier), takeUntil(notifier), window(windowBoundaries), and windowToggle(openings).
  2. Should this by extension also be applied to all operators that take observables as arguments, for example even combineLatest, onErrorResumeNext, and sequenceEqual.

I wonder however what the expected behaviour is of the notifier in some of these other cases given that it may be intended to be inclusive or exclusive (consider skipUntil with a filter for example). In light of this should there be a way to change the subscription order in the operator, rather than forcing the caller to use a Subject to reverse the order from the default?

Note that merge and zip already have the correct order because they just prepends their source to an array with their arguments and subscribes them in order.

Also, things like sequenceEqual technically don't care about the order unless there is an error, in which case we will get the error from compareTo before the source. This is probably has no chance of biting anyone ever, but it's still an inconsistency.

@benlesh
Copy link
Member

benlesh commented Aug 20, 2020

Related #5654

@benlesh benlesh self-assigned this Aug 20, 2020
@benlesh benlesh removed the help wanted Issues we wouldn't mind assistance with. label Aug 20, 2020
@XeniaSiskaki
Copy link

XeniaSiskaki commented Apr 12, 2023

We are migrating from v4 -> v5 and stumbled upon this behavior change. Is there a workaround for this (since judging by the comments it will most likely not be fixed in v5)?

(I'm sorry if it has already been stated, but it's not very clear to me)

@benlesh
Copy link
Member

benlesh commented Apr 13, 2023

@XeniaSiskaki :) This is just expected behavior and not a bug we'd be fixing. I guess it would really depend on your code and what your goal is.

It might be best to post a question about what you're trying to do on Stackoverflow with some code examples.

In RxJS 7.x you can use the scheduled observable creation function to get the expected outcome:

import { share, buffer, filter, scheduled, asapScheduler } from 'rxjs';

const a = scheduled([1, 2, 3, 4], asapScheduler).pipe(share());
a.pipe(buffer(a.pipe(filter((x) => x % 2 === 0)))).subscribe((x) =>
  console.log(x)
);

// [1, 2]
// [3, 4]
// []

See this Stackblitz to play around with it

(I'm going to close this issue though, because it's super old and not a bug. haha)

@benlesh benlesh closed this as completed Apr 13, 2023
@benlesh benlesh removed the bug Confirmed bug label Apr 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants