-
Notifications
You must be signed in to change notification settings - Fork 2
/
buffer.ts
55 lines (49 loc) · 1.34 KB
/
buffer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import { Source, Operator, Observer } from '../index';
import { createSource } from '../sources';
import subscribe from '../utils/subscribe';
/**
* Buffers the source values until closingNotifier emits
*
* @param closeNotifier - callbag source that signals the buffer to be emitted on the output source
* @return callbag operator
*
* @public
*/
function buffer<I>(closeNotifier: Source<any>): Operator<I, I[]> {
return source => {
return createSource((next, complete, error) => {
let valuesBuffer: I[] = [];
const notifierObserver: Observer<any> = {
next: () => {
if (buffer.length > 0) {
next(valuesBuffer);
valuesBuffer = [];
}
},
};
const observer: Observer<I> = {
next: value => {
valuesBuffer.push(value);
},
error: err => {
unsubscribeNotifier();
error(err);
},
complete: () => {
if (buffer.length > 0) {
next(valuesBuffer);
}
unsubscribeNotifier();
complete();
},
};
const unsubscribeNotifier = subscribe(closeNotifier)(notifierObserver);
const unsubscribe = subscribe(source)(observer);
return () => {
unsubscribeNotifier();
unsubscribe();
};
});
};
}
export default buffer;