Skip to content

Commit 0ebb5bd

Browse files
trxcllntbenlesh
authored andcommitted
feat(mergeScan): add new mergeScan operator.
1 parent 73d743d commit 0ebb5bd

File tree

3 files changed

+201
-0
lines changed

3 files changed

+201
-0
lines changed

spec/operators/mergeScan-spec.js

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/* globals describe, it, expect, expectObservable, hot */
2+
var Rx = require('../../dist/cjs/Rx');
3+
var Observable = Rx.Observable;
4+
5+
describe('Observable.prototype.mergeScan()', function () {
6+
it('should mergeScan things', function () {
7+
var e1 = hot('--a--^--b--c--d--e--f--g--|');
8+
var expected = '---u--v--w--x--y--z--|';
9+
10+
var values = {
11+
u: ['b'],
12+
v: ['b', 'c'],
13+
w: ['b', 'c', 'd'],
14+
x: ['b', 'c', 'd', 'e'],
15+
y: ['b', 'c', 'd', 'e', 'f'],
16+
z: ['b', 'c', 'd', 'e', 'f', 'g']
17+
};
18+
19+
var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);
20+
21+
expectObservable(source).toBe(expected, values);
22+
});
23+
24+
it('should handle errors', function () {
25+
var e1 = hot('--a--^--b--c--d--#');
26+
var expected = '---u--v--w--#';
27+
28+
var values = {
29+
u: ['b'],
30+
v: ['b', 'c'],
31+
w: ['b', 'c', 'd']
32+
};
33+
34+
var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);
35+
36+
expectObservable(source).toBe(expected, values);
37+
});
38+
39+
it('should handle errors in the projection function', function () {
40+
var e1 = hot('--a--^--b--c--d--e--f--g--|');
41+
var expected = '---u--v--#';
42+
43+
var values = {
44+
u: ['b'],
45+
v: ['b', 'c']
46+
};
47+
48+
var source = e1.mergeScan(function (acc, x) {
49+
if (x === 'd') {
50+
throw 'bad!';
51+
}
52+
return Observable.of(acc.concat(x));
53+
}, []);
54+
55+
expectObservable(source).toBe(expected, values, 'bad!');
56+
});
57+
58+
it('handle empty', function () {
59+
var e1 = Observable.empty();
60+
var expected = '(u|)';
61+
62+
var values = {
63+
u: []
64+
};
65+
66+
var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);
67+
68+
expectObservable(source).toBe(expected, values);
69+
});
70+
71+
it('handle never', function () {
72+
var e1 = Observable.never();
73+
var expected = '-';
74+
75+
var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);
76+
77+
expectObservable(source).toBe(expected);
78+
});
79+
80+
it('handle throw', function () {
81+
var e1 = Observable.throw('bad!');
82+
var expected = '#';
83+
84+
var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);
85+
86+
expectObservable(source).toBe(expected, undefined, 'bad!');
87+
});
88+
89+
it('should mergeScan unsubscription', function () {
90+
var e1 = hot('--a--^--b--c--d--e--f--g--|');
91+
var expected = '---u--v--w--x--';
92+
var sub = '^ !';
93+
var values = {
94+
u: ['b'],
95+
v: ['b', 'c'],
96+
w: ['b', 'c', 'd'],
97+
x: ['b', 'c', 'd', 'e'],
98+
y: ['b', 'c', 'd', 'e', 'f'],
99+
z: ['b', 'c', 'd', 'e', 'f', 'g']
100+
};
101+
102+
var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);
103+
104+
expectObservable(source, sub).toBe(expected, values);
105+
});
106+
});

src/Rx.KitchenSink.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ interface KitchenSinkOperators<T> extends CoreOperators<T> {
1212
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
1313
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
1414
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
15+
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) => Observable<R>;
1516
}
1617

1718
// operators
@@ -196,6 +197,9 @@ import mergeMapTo from './operators/mergeMapTo';
196197
observableProto.mergeMapTo = mergeMapTo;
197198
observableProto.flatMapTo = mergeMapTo;
198199

200+
import mergeScan from './operators/extended/mergeScan';
201+
observableProto.mergeScan = mergeScan;
202+
199203
import min from './operators/extended/min';
200204
observableProto.min = min;
201205

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import Operator from '../../Operator';
2+
import Observable from '../../Observable';
3+
import Subscriber from '../../Subscriber';
4+
import Subscription from '../../Subscription';
5+
import tryCatch from '../../util/tryCatch';
6+
import { errorObject } from '../../util/errorObject';
7+
import subscribeToResult from '../../util/subscribeToResult';
8+
import OuterSubscriber from '../../OuterSubscriber';
9+
10+
export default function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) {
11+
return this.lift(new MergeScanOperator(project, seed));
12+
}
13+
14+
export class MergeScanOperator<T, R> implements Operator<T, R> {
15+
constructor(private project: (acc: R, x: T) => Observable<R>,
16+
private seed: R,
17+
private concurrent: number = Number.POSITIVE_INFINITY) {
18+
}
19+
20+
call(subscriber: Subscriber<R>): Subscriber<T> {
21+
return new MergeScanSubscriber(
22+
subscriber, this.project, this.seed, this.concurrent
23+
);
24+
}
25+
}
26+
27+
export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
28+
private hasValue: boolean = false;
29+
private hasCompleted: boolean = false;
30+
private buffer: Observable<any>[] = [];
31+
private active: number = 0;
32+
protected index: number = 0;
33+
34+
constructor(destination: Subscriber<R>,
35+
private project: (acc: R, x: T) => Observable<R>,
36+
private acc: R,
37+
private concurrent: number = Number.POSITIVE_INFINITY) {
38+
super(destination);
39+
}
40+
41+
_next(value: any): void {
42+
if (this.active < this.concurrent) {
43+
const index = this.index++;
44+
const ish = tryCatch(this.project)(this.acc, value);
45+
const destination = this.destination;
46+
if (ish === errorObject) {
47+
destination.error(ish.e);
48+
} else {
49+
this.active++;
50+
this._innerSub(ish, value, index);
51+
}
52+
} else {
53+
this.buffer.push(value);
54+
}
55+
}
56+
57+
_innerSub(ish: any, value: T, index: number): void {
58+
this.add(subscribeToResult<T, R>(this, ish, value, index));
59+
}
60+
61+
_complete(): void {
62+
this.hasCompleted = true;
63+
if (this.active === 0 && this.buffer.length === 0) {
64+
if (this.hasValue === false) {
65+
this.destination.next(this.acc);
66+
}
67+
this.destination.complete();
68+
}
69+
}
70+
71+
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
72+
const { destination } = this;
73+
this.acc = innerValue;
74+
this.hasValue = true;
75+
destination.next(innerValue);
76+
}
77+
78+
notifyComplete(innerSub: Subscription<T>): void {
79+
const buffer = this.buffer;
80+
this.remove(innerSub);
81+
this.active--;
82+
if (buffer.length > 0) {
83+
this._next(buffer.shift());
84+
} else if (this.active === 0 && this.hasCompleted) {
85+
if (this.hasValue === false) {
86+
this.destination.next(this.acc);
87+
}
88+
this.destination.complete();
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)