Skip to content

Commit ec3eceb

Browse files
committed
feat(bufferWhen): add higher-order lettable version of bufferWhen
1 parent ea1c3ee commit ec3eceb

File tree

3 files changed

+140
-97
lines changed

3 files changed

+140
-97
lines changed

src/operator/bufferWhen.ts

Lines changed: 3 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
3-
import { Observable } from '../Observable';
4-
import { Subscription } from '../Subscription';
5-
import { tryCatch } from '../util/tryCatch';
6-
import { errorObject } from '../util/errorObject';
71

8-
import { OuterSubscriber } from '../OuterSubscriber';
9-
import { InnerSubscriber } from '../InnerSubscriber';
10-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { Observable } from '../Observable';
3+
import { bufferWhen as higherOrder } from '../operators';
114

125
/**
136
* Buffers the source Observable values, using a factory function of closing
@@ -43,92 +36,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
4336
* @owner Observable
4437
*/
4538
export function bufferWhen<T>(this: Observable<T>, closingSelector: () => Observable<any>): Observable<T[]> {
46-
return this.lift(new BufferWhenOperator<T>(closingSelector));
47-
}
48-
49-
class BufferWhenOperator<T> implements Operator<T, T[]> {
50-
51-
constructor(private closingSelector: () => Observable<any>) {
52-
}
53-
54-
call(subscriber: Subscriber<T[]>, source: any): any {
55-
return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
56-
}
57-
}
58-
59-
/**
60-
* We need this JSDoc comment for affecting ESDoc.
61-
* @ignore
62-
* @extends {Ignored}
63-
*/
64-
class BufferWhenSubscriber<T> extends OuterSubscriber<T, any> {
65-
private buffer: T[];
66-
private subscribing: boolean = false;
67-
private closingSubscription: Subscription;
68-
69-
constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
70-
super(destination);
71-
this.openBuffer();
72-
}
73-
74-
protected _next(value: T) {
75-
this.buffer.push(value);
76-
}
77-
78-
protected _complete() {
79-
const buffer = this.buffer;
80-
if (buffer) {
81-
this.destination.next(buffer);
82-
}
83-
super._complete();
84-
}
85-
86-
protected _unsubscribe() {
87-
this.buffer = null;
88-
this.subscribing = false;
89-
}
90-
91-
notifyNext(outerValue: T, innerValue: any,
92-
outerIndex: number, innerIndex: number,
93-
innerSub: InnerSubscriber<T, any>): void {
94-
this.openBuffer();
95-
}
96-
97-
notifyComplete(): void {
98-
if (this.subscribing) {
99-
this.complete();
100-
} else {
101-
this.openBuffer();
102-
}
103-
}
104-
105-
openBuffer() {
106-
107-
let { closingSubscription } = this;
108-
109-
if (closingSubscription) {
110-
this.remove(closingSubscription);
111-
closingSubscription.unsubscribe();
112-
}
113-
114-
const buffer = this.buffer;
115-
if (this.buffer) {
116-
this.destination.next(buffer);
117-
}
118-
119-
this.buffer = [];
120-
121-
const closingNotifier = tryCatch(this.closingSelector)();
122-
123-
if (closingNotifier === errorObject) {
124-
this.error(errorObject.e);
125-
} else {
126-
closingSubscription = new Subscription();
127-
this.closingSubscription = closingSubscription;
128-
this.add(closingSubscription);
129-
this.subscribing = true;
130-
closingSubscription.add(subscribeToResult(this, closingNotifier));
131-
this.subscribing = false;
132-
}
133-
}
39+
return higherOrder(closingSelector)(this);
13440
}

src/operators/bufferWhen.ts

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable } from '../Observable';
4+
import { Subscription } from '../Subscription';
5+
import { tryCatch } from '../util/tryCatch';
6+
import { errorObject } from '../util/errorObject';
7+
import { OuterSubscriber } from '../OuterSubscriber';
8+
import { InnerSubscriber } from '../InnerSubscriber';
9+
import { subscribeToResult } from '../util/subscribeToResult';
10+
import { OperatorFunction } from '../interfaces';
11+
12+
/**
13+
* Buffers the source Observable values, using a factory function of closing
14+
* Observables to determine when to close, emit, and reset the buffer.
15+
*
16+
* <span class="informal">Collects values from the past as an array. When it
17+
* starts collecting values, it calls a function that returns an Observable that
18+
* tells when to close the buffer and restart collecting.</span>
19+
*
20+
* <img src="./img/bufferWhen.png" width="100%">
21+
*
22+
* Opens a buffer immediately, then closes the buffer when the observable
23+
* returned by calling `closingSelector` function emits a value. When it closes
24+
* the buffer, it immediately opens a new buffer and repeats the process.
25+
*
26+
* @example <caption>Emit an array of the last clicks every [1-5] random seconds</caption>
27+
* var clicks = Rx.Observable.fromEvent(document, 'click');
28+
* var buffered = clicks.bufferWhen(() =>
29+
* Rx.Observable.interval(1000 + Math.random() * 4000)
30+
* );
31+
* buffered.subscribe(x => console.log(x));
32+
*
33+
* @see {@link buffer}
34+
* @see {@link bufferCount}
35+
* @see {@link bufferTime}
36+
* @see {@link bufferToggle}
37+
* @see {@link windowWhen}
38+
*
39+
* @param {function(): Observable} closingSelector A function that takes no
40+
* arguments and returns an Observable that signals buffer closure.
41+
* @return {Observable<T[]>} An observable of arrays of buffered values.
42+
* @method bufferWhen
43+
* @owner Observable
44+
*/
45+
export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
46+
return function (source: Observable<T>) {
47+
return source.lift(new BufferWhenOperator(closingSelector));
48+
};
49+
}
50+
51+
class BufferWhenOperator<T> implements Operator<T, T[]> {
52+
53+
constructor(private closingSelector: () => Observable<any>) {
54+
}
55+
56+
call(subscriber: Subscriber<T[]>, source: any): any {
57+
return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
58+
}
59+
}
60+
61+
/**
62+
* We need this JSDoc comment for affecting ESDoc.
63+
* @ignore
64+
* @extends {Ignored}
65+
*/
66+
class BufferWhenSubscriber<T> extends OuterSubscriber<T, any> {
67+
private buffer: T[];
68+
private subscribing: boolean = false;
69+
private closingSubscription: Subscription;
70+
71+
constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
72+
super(destination);
73+
this.openBuffer();
74+
}
75+
76+
protected _next(value: T) {
77+
this.buffer.push(value);
78+
}
79+
80+
protected _complete() {
81+
const buffer = this.buffer;
82+
if (buffer) {
83+
this.destination.next(buffer);
84+
}
85+
super._complete();
86+
}
87+
88+
protected _unsubscribe() {
89+
this.buffer = null;
90+
this.subscribing = false;
91+
}
92+
93+
notifyNext(outerValue: T, innerValue: any,
94+
outerIndex: number, innerIndex: number,
95+
innerSub: InnerSubscriber<T, any>): void {
96+
this.openBuffer();
97+
}
98+
99+
notifyComplete(): void {
100+
if (this.subscribing) {
101+
this.complete();
102+
} else {
103+
this.openBuffer();
104+
}
105+
}
106+
107+
openBuffer() {
108+
109+
let { closingSubscription } = this;
110+
111+
if (closingSubscription) {
112+
this.remove(closingSubscription);
113+
closingSubscription.unsubscribe();
114+
}
115+
116+
const buffer = this.buffer;
117+
if (this.buffer) {
118+
this.destination.next(buffer);
119+
}
120+
121+
this.buffer = [];
122+
123+
const closingNotifier = tryCatch(this.closingSelector)();
124+
125+
if (closingNotifier === errorObject) {
126+
this.error(errorObject.e);
127+
} else {
128+
closingSubscription = new Subscription();
129+
this.closingSubscription = closingSubscription;
130+
this.add(closingSubscription);
131+
this.subscribing = true;
132+
closingSubscription.add(subscribeToResult(this, closingNotifier));
133+
this.subscribing = false;
134+
}
135+
}
136+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export { buffer } from './buffer';
44
export { bufferCount } from './bufferCount';
55
export { bufferTime } from './bufferTime';
66
export { bufferToggle } from './bufferToggle';
7+
export { bufferWhen } from './bufferWhen';
78
export { catchError } from './catchError';
89
export { concat } from './concat';
910
export { concatAll } from './concatAll';

0 commit comments

Comments
 (0)