Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bufferTime and bufferCount will mutate current buffer if a source is a subject that emits in the subscription for that buffer #3001

Open
MorleyDev opened this issue Oct 26, 2017 · 6 comments
Labels
bug Confirmed bug

Comments

@MorleyDev
Copy link

MorleyDev commented Oct 26, 2017

RxJS version:
5.5.2

Code to reproduce:
rxfiddle.net

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject();
Rx.Observable.merge(x, z)
	.bufferTime(5000)
	.subscribe((value) => {
  		console.log("Before", value)
                z.next("HELLO")
  		console.log("After", value)
	});

rxfiddle.net

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject();
Rx.Observable.merge(x, z)
	.bufferCount(3)
	.subscribe((value) => {
  		console.log("Before", value)
                z.next("HELLO")
  		console.log("After", value)
	});

Expected behavior:
That "HELLO" would be logged out as one of the buffered values for every console.log(value) barring the first, and the array of value would remain a constant set through each individual subscribe.

Actual behavior:
The value of "HELLO" is pushed onto the end of the current buffer array, mutating it.

Additional information:

This came up whilst attempting to use rxjs to develop simple physics simulations with a Redux-like scan/reducer. A step for that scan is to emit any follow-up events to be processed in the next 'frame', and I discovered those events were being silently thrown away by the bufferTime operator. Since discovered they aren't so 'thrown away', but 'mutated onto the current buffer'. Which isn't useful if emitting them is the last part of the next step.

Whilst there are better ways to architect the code that avoid doing a subject emit like this, which is what I did after I confirmed this was the current behaviour, this still seems like either a bug or a 'gotcha' which I could not find documented.

@MorleyDev MorleyDev changed the title Buffers will silently swallow values emitted to a subject that also feeds into that buffer bufferTime will silently swallow values emitted to a subject that also feeds into that buffer Oct 26, 2017
@MorleyDev MorleyDev changed the title bufferTime will silently swallow values emitted to a subject that also feeds into that buffer bufferTime will silently ignore values emitted to a subject that also feeds into that buffer Oct 26, 2017
@Jerry-Hong
Copy link

@MorleyDev

because you execute the subject's next('Hello') is synchronous

you can try this code to see it

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject();
Rx.Observable.merge(x, z)
	.bufferTime(5000)
	.subscribe((value) => {
                console.log(JSON.stringify(value)) // [0,1,2,3]
  		z.next("HELLO")
  		console.log(JSON.stringify(value)) // [0,1,2,3, 'HELLO']
	});

maybe this code as below is what you want

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject().observeOn(Rx.Scheduler.async);
Rx.Observable.merge(x, z)
	.bufferTime(5000)
	.subscribe((value) => {
                console.log(JSON.stringify(value))
  		z.next("HELLO")
  		console.log(JSON.stringify(value))
	});

@MorleyDev
Copy link
Author

MorleyDev commented Oct 27, 2017

So if a synchronous update happens inside the buffered observables subscription, it will mutate that buffer? That does not feel right to me. Is this really the desired and specified behaviour for rx?

I'd expect when the array gets past the observable and into my subscription, that array isn't going to be touched anymore by Rx and any emitted values would go into the next buffer.

Mutating the buffer seems to break the 'functional' part of 'functional reactive programming', and considering a buffer emits upon the end of it's time window, I'd expect any new values to come in as considered a part of the next time window instead of a mutation of that emitted time window, which I would have thought would be considered as 'finished' at the point it is emitted.

A key reason to use FRP and RX is to avoid that kind of stateful behaviour.

I've written my actual code to recursive instead of emitting for the next 'frame' (which I think is cleaner, anyway), but this definitely seems like a 'gotcha situation' to me that goes against what I think people would expect the behaviour of the buffer operators to be.

I've updated the title and my original post to reflect.

@MorleyDev MorleyDev changed the title bufferTime will silently ignore values emitted to a subject that also feeds into that buffer bufferTime will mutate current buffer if a source is a subject that also feeds into that buffer Oct 27, 2017
@MorleyDev MorleyDev changed the title bufferTime will mutate current buffer if a source is a subject that also feeds into that buffer bufferTime and bufferCount will mutate current buffer if a source is a subject that also feeds into that buffer Oct 27, 2017
@MorleyDev MorleyDev changed the title bufferTime and bufferCount will mutate current buffer if a source is a subject that also feeds into that buffer bufferTime and bufferCount will mutate current buffer if a source is a subject that emits in the subscription for that buffer Oct 27, 2017
@martinsik
Copy link
Contributor

I personally think this is expected behavior and it's a tradeoff to ensure performance consistency.

Of course, this doesn't matter if you have an array of 5 items but if you had one million items it would mean to create a copy of this entire array on every emission from bufferCount/bufferTime. This would degrade performance very quickly and you wouldn't be able to do anything about it. However when it's passing just an array reference it'll be still as fast as with 5 items.

Note that you can actually "fix" this for yourself by manually creating a copy of the buffer:

Rx.Observable.merge(x, z)
  .bufferCount(3)
  .map(buffer => Array.from(buffer))  
  .subscribe((value) => { ... });

@MorleyDev
Copy link
Author

MorleyDev commented Nov 3, 2017

I understand if it's a performance optimisation, though I do think it is something that goes against expectations of a user even if it's the expected behaviour of the people who wrote it. If so, a mention in the docs, at least, I think would be valuable to prevent people tripping up.

I actually ended up writing my own noddy little lettable to do this, though rather than copy the buffer, I 'swap' the current one with an empty one and emit that, that way any next's go onto that empty buffer. So whilst there isn't the performance hit of a copy, you obviously are creating multiple Arrays that need to be garbage collected.

@benlesh benlesh added the bug Confirmed bug label Oct 2, 2019
@benlesh
Copy link
Member

benlesh commented Oct 2, 2019

This is a bug, and is still a bug in 6.x and 7.x alpha. Should be an easy fix.

@jdussouillez
Copy link

Any updates ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed bug
Projects
None yet
Development

No branches or pull requests

5 participants