Skip to content

Commit 4ef41b0

Browse files
committed
fix(bufferTime): inner intervals will now clean up properly
- adds marble tests around bufferTime - adds `maxFrames` property to `VirtualTimeScheduler` that will limit the execution of tests
1 parent 97ce36c commit 4ef41b0

File tree

3 files changed

+90
-32
lines changed

3 files changed

+90
-32
lines changed

spec/operators/bufferTime-spec.js

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,81 @@
1-
/* globals describe, it, expect */
1+
/* globals describe, it, expect, hot, cold, rxTestScheduler, expectObservable */
22
var Rx = require('../../dist/cjs/Rx');
33
var Observable = Rx.Observable;
44

5-
describe('Observable.prototype.bufferTime', function () {
6-
it('should emit buffers at intervals', function (done) {
7-
var expected = [
8-
[0, 1, 2],
9-
[3, 4, 5],
10-
[6, 7, 8]
11-
];
12-
Observable.interval(100)
13-
.bufferTime(320)
14-
.take(3)
15-
.subscribe(function (w) {
16-
expect(w).toEqual(expected.shift())
17-
}, null, done);
18-
}, 2000);
5+
describe('Observable.prototype.bufferTime', function () {
6+
it('should emit buffers at intervals', function (){
7+
var values = {
8+
w: ['a','b'],
9+
x: ['c','d','e'],
10+
y: ['f', 'g'],
11+
z: []
12+
};
13+
var e1 = hot('---a---b---c---d---e---f---g---|');
14+
var expected = '----------w---------x---------y(z|)';
15+
16+
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values);
17+
});
1918

19+
it('should emit buffers at intervals test 2', function() {
20+
var e1 = hot('---------a---------b---------c---------d---------e---------g--------|')
21+
var expected = '--------------------------------x-------------------------------y---(z|)';
22+
23+
expectObservable(e1.bufferTime(320, null, rxTestScheduler)).toBe(expected, { x: ['a','b','c'], y: ['d', 'e', 'g'], z: []});
24+
});
2025

21-
it('should emit buffers that have been created at intervals and close after the specified delay', function (done) {
22-
var expected = [
23-
[0, 1, 2, 3, 4],
24-
[2, 3, 4, 5, 6],
25-
[4, 5, 6, 7, 8]
26-
];
27-
Observable.interval(100)
28-
.bufferTime(520, 220)
29-
.take(3)
30-
.subscribe(function (w) {
31-
expect(w).toEqual(expected.shift())
32-
}, null, done);
33-
}, 2000);
26+
it('should emit buffers that have been created at intervals and close after the specified delay', function (){
27+
var e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)');
28+
// --------------------*--------------------*---- start interval
29+
// ---------------------| timespans
30+
// ---------------------|
31+
// -----|
32+
var expected = '---------------------x-------------------y----(z|)';
33+
var values = {
34+
x: ['a', 'b', 'c', 'd', 'e'],
35+
y: ['e', 'f', 'g', 'h', 'i'],
36+
z: ['i', 'k']
37+
};
38+
expectObservable(e1.bufferTime(210, 200, rxTestScheduler)).toBe(expected, values);
39+
});
40+
41+
it('should handle empty', function (){
42+
var e1 = Observable.empty();
43+
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe('(a|)', { a: [] });
44+
});
45+
46+
it('should handle never', function () {
47+
var e1 = Observable.never();
48+
var expected = '----------a---------a---------a---------a---------a---------a---------a-----'; // 750 frame limit
49+
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, { a: [] });
50+
});
51+
52+
it('should handle throw', function (){
53+
var e1 = Observable.throw(new Error('haha'));
54+
var expected = '#';
55+
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, undefined, new Error('haha'));
56+
});
57+
58+
it('should handle errors', function () {
59+
var values = {
60+
w: ['a','b']
61+
};
62+
var e1 = hot('---a---b---c---#---e---f---g---|');
63+
var expected = '----------w----#';
64+
65+
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values);
66+
});
67+
68+
it('should emit buffers that have been created at intervals and close after the specified delay with errors', function (){
69+
var e1 = hot('---a---b---c----d----e----f----g----h----i--#');
70+
// --------------------*--------------------*---- start interval
71+
// ---------------------| timespans
72+
// ---------------------|
73+
// -----|
74+
var expected = '---------------------x-------------------y--#';
75+
var values = {
76+
x: ['a', 'b', 'c', 'd', 'e'],
77+
y: ['e', 'f', 'g', 'h', 'i']
78+
};
79+
expectObservable(e1.bufferTime(210, 200, rxTestScheduler)).toBe(expected, values);
80+
});
3481
});

src/operators/bufferTime.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,19 @@ function dispatchBufferTimeSpanOnly(state) {
8282
}
8383

8484
state.buffer = subscriber.openBuffer();
85-
(<any>this).schedule(state, state.bufferTimeSpan);
85+
if(!subscriber.isUnsubscribed) {
86+
(<any>this).schedule(state, state.bufferTimeSpan);
87+
}
8688
}
8789

8890
function dispatchBufferCreation(state) {
8991
let { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
9092
let buffer = subscriber.openBuffer();
9193
var action = <Action>this;
92-
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
93-
action.schedule(state, bufferCreationInterval);
94+
if(!subscriber.isUnsubscribed) {
95+
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
96+
action.schedule(state, bufferCreationInterval);
97+
}
9498
}
9599

96100
function dispatchBufferClose({ subscriber, buffer }) {

src/schedulers/VirtualTimeScheduler.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,25 @@ export default class VirtualTimeScheduler implements Scheduler {
99
index: number = 0;
1010
sorted: boolean = false;
1111
frame: number = 0;
12+
maxFrames: number = 750;
1213

1314
now() {
1415
return 0;
1516
}
1617

1718
flush() {
1819
const actions = this.actions;
20+
const maxFrames = this.maxFrames;
1921
while (actions.length > 0) {
2022
let action = actions.shift();
2123
this.frame = action.delay;
22-
action.execute();
24+
if(this.frame <= maxFrames) {
25+
action.execute();
26+
} else {
27+
break;
28+
}
2329
}
30+
actions.length = 0;
2431
this.frame = 0;
2532
}
2633

0 commit comments

Comments
 (0)