Skip to content

Commit d13dbb4

Browse files
benleshjayphelps
authored andcommitted
fix(bufferCount): will behave as expected when startBufferEvery is less than bufferSize (#2076)
- fixed issue where internal `buffers` store was keeping an additional buffer for no good reason - improved logic and performance around updating internal `buffers` list - adds a test to ensure proper behavior fixes #2062
1 parent 0271fab commit d13dbb4

File tree

2 files changed

+28
-15
lines changed

2 files changed

+28
-15
lines changed

spec/operators/bufferCount-spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as Rx from '../../dist/cjs/Rx';
2+
import { expect } from 'chai';
23
declare const {hot, asDiagram, expectObservable, expectSubscriptions};
34

45
const Observable = Rx.Observable;
@@ -31,6 +32,26 @@ describe('Observable.prototype.bufferCount', () => {
3132
expectObservable(e1.bufferCount(2)).toBe(expected, values);
3233
});
3334

35+
it('should buffer properly (issue #2062)', () => {
36+
const item$ = new Rx.Subject();
37+
const results = [];
38+
item$
39+
.bufferCount(3, 1)
40+
.subscribe(value => {
41+
results.push(value);
42+
43+
if (value.join() === '1,2,3') {
44+
item$.next(4);
45+
}
46+
});
47+
48+
item$.next(1);
49+
item$.next(2);
50+
item$.next(3);
51+
52+
expect(results).to.deep.equal([[1, 2, 3], [2, 3, 4]]);
53+
});
54+
3455
it('should emit partial buffers if source completes before reaching specified buffer count', () => {
3556
const e1 = hot('--a--b--c--d--|');
3657
const expected = '--------------(x|)';

src/operator/bufferCount.ts

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,38 +62,30 @@ class BufferCountOperator<T> implements Operator<T, T[]> {
6262
* @extends {Ignored}
6363
*/
6464
class BufferCountSubscriber<T> extends Subscriber<T> {
65-
private buffers: Array<T[]> = [[]];
65+
private buffers: Array<T[]> = [];
6666
private count: number = 0;
6767

6868
constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
6969
super(destination);
7070
}
7171

7272
protected _next(value: T) {
73-
const count = (this.count += 1);
74-
const destination = this.destination;
75-
const bufferSize = this.bufferSize;
76-
const startBufferEvery = (this.startBufferEvery == null) ? bufferSize : this.startBufferEvery;
77-
const buffers = this.buffers;
78-
const len = buffers.length;
79-
let remove = -1;
73+
const count = this.count++;
74+
const { destination, bufferSize, startBufferEvery, buffers } = this;
75+
const startOn = (startBufferEvery == null) ? bufferSize : startBufferEvery;
8076

81-
if (count % startBufferEvery === 0) {
77+
if (count % startOn === 0) {
8278
buffers.push([]);
8379
}
8480

85-
for (let i = 0; i < len; i++) {
81+
for (let i = buffers.length; i--; ) {
8682
const buffer = buffers[i];
8783
buffer.push(value);
8884
if (buffer.length === bufferSize) {
89-
remove = i;
85+
buffers.splice(i, 1);
9086
destination.next(buffer);
9187
}
9288
}
93-
94-
if (remove !== -1) {
95-
buffers.splice(remove, 1);
96-
}
9789
}
9890

9991
protected _complete() {

0 commit comments

Comments
 (0)