Skip to content

Commit fde7205

Browse files
committed
feat(mergeScan): add higher-order lettable version of mergeScan
1 parent 653b47a commit fde7205

File tree

3 files changed

+137
-95
lines changed

3 files changed

+137
-95
lines changed

src/operator/mergeScan.ts

Lines changed: 3 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
1-
import { Operator } from '../Operator';
1+
22
import { Observable } from '../Observable';
3-
import { Subscriber } from '../Subscriber';
4-
import { Subscription } from '../Subscription';
5-
import { tryCatch } from '../util/tryCatch';
6-
import { errorObject } from '../util/errorObject';
7-
import { subscribeToResult } from '../util/subscribeToResult';
8-
import { OuterSubscriber } from '../OuterSubscriber';
9-
import { InnerSubscriber } from '../InnerSubscriber';
3+
import { mergeScan as higherOrder } from '../operators/mergeScan';
104

115
/**
126
* Applies an accumulator function over the source Observable where the
@@ -43,91 +37,5 @@ export function mergeScan<T, R>(this: Observable<T>,
4337
accumulator: (acc: R, value: T) => Observable<R>,
4438
seed: R,
4539
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
46-
return this.lift(new MergeScanOperator(accumulator, seed, concurrent));
47-
}
48-
49-
export class MergeScanOperator<T, R> implements Operator<T, R> {
50-
constructor(private accumulator: (acc: R, value: T) => Observable<R>,
51-
private seed: R,
52-
private concurrent: number) {
53-
}
54-
55-
call(subscriber: Subscriber<R>, source: any): any {
56-
return source.subscribe(new MergeScanSubscriber(
57-
subscriber, this.accumulator, this.seed, this.concurrent
58-
));
59-
}
60-
}
61-
62-
/**
63-
* We need this JSDoc comment for affecting ESDoc.
64-
* @ignore
65-
* @extends {Ignored}
66-
*/
67-
export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
68-
private hasValue: boolean = false;
69-
private hasCompleted: boolean = false;
70-
private buffer: Observable<any>[] = [];
71-
private active: number = 0;
72-
protected index: number = 0;
73-
74-
constructor(destination: Subscriber<R>,
75-
private accumulator: (acc: R, value: T) => Observable<R>,
76-
private acc: R,
77-
private concurrent: number) {
78-
super(destination);
79-
}
80-
81-
protected _next(value: any): void {
82-
if (this.active < this.concurrent) {
83-
const index = this.index++;
84-
const ish = tryCatch(this.accumulator)(this.acc, value);
85-
const destination = this.destination;
86-
if (ish === errorObject) {
87-
destination.error(errorObject.e);
88-
} else {
89-
this.active++;
90-
this._innerSub(ish, value, index);
91-
}
92-
} else {
93-
this.buffer.push(value);
94-
}
95-
}
96-
97-
private _innerSub(ish: any, value: T, index: number): void {
98-
this.add(subscribeToResult<T, R>(this, ish, value, index));
99-
}
100-
101-
protected _complete(): void {
102-
this.hasCompleted = true;
103-
if (this.active === 0 && this.buffer.length === 0) {
104-
if (this.hasValue === false) {
105-
this.destination.next(this.acc);
106-
}
107-
this.destination.complete();
108-
}
109-
}
110-
111-
notifyNext(outerValue: T, innerValue: R,
112-
outerIndex: number, innerIndex: number,
113-
innerSub: InnerSubscriber<T, R>): void {
114-
const { destination } = this;
115-
this.acc = innerValue;
116-
this.hasValue = true;
117-
destination.next(innerValue);
118-
}
119-
120-
notifyComplete(innerSub: Subscription): void {
121-
const buffer = this.buffer;
122-
this.remove(innerSub);
123-
this.active--;
124-
if (buffer.length > 0) {
125-
this._next(buffer.shift());
126-
} else if (this.active === 0 && this.hasCompleted) {
127-
if (this.hasValue === false) {
128-
this.destination.next(this.acc);
129-
}
130-
this.destination.complete();
131-
}
132-
}
40+
return higherOrder(accumulator, seed, concurrent)(this);
13341
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export { merge } from './merge';
4141
export { mergeAll } from './mergeAll';
4242
export { mergeMap } from './mergeMap';
4343
export { mergeMapTo } from './mergeMapTo';
44+
export { mergeScan } from './mergeScan';
4445
export { min } from './min';
4546
export { multicast } from './multicast';
4647
export { observeOn } from './observeOn';

src/operators/mergeScan.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { Operator } from '../Operator';
2+
import { Observable } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { Subscription } from '../Subscription';
5+
import { tryCatch } from '../util/tryCatch';
6+
import { errorObject } from '../util/errorObject';
7+
import { subscribeToResult } from '../util/subscribeToResult';
8+
import { OuterSubscriber } from '../OuterSubscriber';
9+
import { InnerSubscriber } from '../InnerSubscriber';
10+
import { OperatorFunction } from '../interfaces';
11+
12+
/**
13+
* Applies an accumulator function over the source Observable where the
14+
* accumulator function itself returns an Observable, then each intermediate
15+
* Observable returned is merged into the output Observable.
16+
*
17+
* <span class="informal">It's like {@link scan}, but the Observables returned
18+
* by the accumulator are merged into the outer Observable.</span>
19+
*
20+
* @example <caption>Count the number of click events</caption>
21+
* const click$ = Rx.Observable.fromEvent(document, 'click');
22+
* const one$ = click$.mapTo(1);
23+
* const seed = 0;
24+
* const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed);
25+
* count$.subscribe(x => console.log(x));
26+
*
27+
* // Results:
28+
* 1
29+
* 2
30+
* 3
31+
* 4
32+
* // ...and so on for each click
33+
*
34+
* @param {function(acc: R, value: T): Observable<R>} accumulator
35+
* The accumulator function called on each source value.
36+
* @param seed The initial accumulation value.
37+
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of
38+
* input Observables being subscribed to concurrently.
39+
* @return {Observable<R>} An observable of the accumulated values.
40+
* @method mergeScan
41+
* @owner Observable
42+
*/
43+
export function mergeScan<T, R>(accumulator: (acc: R, value: T) => Observable<R>,
44+
seed: R,
45+
concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> {
46+
return (source: Observable<T>) => source.lift(new MergeScanOperator(accumulator, seed, concurrent));
47+
}
48+
49+
export class MergeScanOperator<T, R> implements Operator<T, R> {
50+
constructor(private accumulator: (acc: R, value: T) => Observable<R>,
51+
private seed: R,
52+
private concurrent: number) {
53+
}
54+
55+
call(subscriber: Subscriber<R>, source: any): any {
56+
return source.subscribe(new MergeScanSubscriber(
57+
subscriber, this.accumulator, this.seed, this.concurrent
58+
));
59+
}
60+
}
61+
62+
/**
63+
* We need this JSDoc comment for affecting ESDoc.
64+
* @ignore
65+
* @extends {Ignored}
66+
*/
67+
export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
68+
private hasValue: boolean = false;
69+
private hasCompleted: boolean = false;
70+
private buffer: Observable<any>[] = [];
71+
private active: number = 0;
72+
protected index: number = 0;
73+
74+
constructor(destination: Subscriber<R>,
75+
private accumulator: (acc: R, value: T) => Observable<R>,
76+
private acc: R,
77+
private concurrent: number) {
78+
super(destination);
79+
}
80+
81+
protected _next(value: any): void {
82+
if (this.active < this.concurrent) {
83+
const index = this.index++;
84+
const ish = tryCatch(this.accumulator)(this.acc, value);
85+
const destination = this.destination;
86+
if (ish === errorObject) {
87+
destination.error(errorObject.e);
88+
} else {
89+
this.active++;
90+
this._innerSub(ish, value, index);
91+
}
92+
} else {
93+
this.buffer.push(value);
94+
}
95+
}
96+
97+
private _innerSub(ish: any, value: T, index: number): void {
98+
this.add(subscribeToResult<T, R>(this, ish, value, index));
99+
}
100+
101+
protected _complete(): void {
102+
this.hasCompleted = true;
103+
if (this.active === 0 && this.buffer.length === 0) {
104+
if (this.hasValue === false) {
105+
this.destination.next(this.acc);
106+
}
107+
this.destination.complete();
108+
}
109+
}
110+
111+
notifyNext(outerValue: T, innerValue: R,
112+
outerIndex: number, innerIndex: number,
113+
innerSub: InnerSubscriber<T, R>): void {
114+
const { destination } = this;
115+
this.acc = innerValue;
116+
this.hasValue = true;
117+
destination.next(innerValue);
118+
}
119+
120+
notifyComplete(innerSub: Subscription): void {
121+
const buffer = this.buffer;
122+
this.remove(innerSub);
123+
this.active--;
124+
if (buffer.length > 0) {
125+
this._next(buffer.shift());
126+
} else if (this.active === 0 && this.hasCompleted) {
127+
if (this.hasValue === false) {
128+
this.destination.next(this.acc);
129+
}
130+
this.destination.complete();
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)