Skip to content

Commit fe0eb37

Browse files
committed
feat(mergeScan): support concurrency parameter for mergeScan
- expose concurrency parameter to interface of mergeScan - expand test coverage to test concurrency works closes #868
1 parent d2e6318 commit fe0eb37

File tree

3 files changed

+154
-6
lines changed

3 files changed

+154
-6
lines changed

spec/operators/mergeScan-spec.js

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */
1+
/* globals describe, it, expect, rxTestScheduler, expectObservable, expectSubscriptions, hot, cold */
22
var Rx = require('../../dist/cjs/Rx');
33
var Observable = Rx.Observable;
44

@@ -225,4 +225,150 @@ describe('Observable.prototype.mergeScan()', function () {
225225
expectObservable(source, sub).toBe(expected, values);
226226
expectSubscriptions(e1.subscriptions).toBe(sub);
227227
});
228+
229+
it('should mergescan projects cold Observable with single concurrency', function () {
230+
var e1 = hot('--a--b--c--|');
231+
var e1subs = '^ !';
232+
233+
var inner = [
234+
cold( '--d--e--f--| '),
235+
cold( '--g--h--i--| '),
236+
cold( '--j--k--l--|')
237+
];
238+
239+
var xsubs = ' ^ !';
240+
var ysubs = ' ^ !';
241+
var zsubs = ' ^ !';
242+
243+
var expected = '--x-d--e--f--f-g--h--i--i-j--k--l--|';
244+
245+
var index = 0;
246+
var source = e1.mergeScan(function (acc, x) {
247+
var value = inner[index++];
248+
return value.startWith(acc);
249+
}, 'x', 1);
250+
251+
expectObservable(source).toBe(expected);
252+
253+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
254+
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
255+
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
256+
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
257+
});
258+
259+
it('should emit accumulator if inner completes without value', function () {
260+
var e1 = hot('--a--^--b--c--d--e--f--g--|');
261+
var e1subs = '^ !';
262+
var expected = '---------------------(x|)';
263+
264+
var source = e1.mergeScan(function (acc, x) {
265+
return Observable.empty();
266+
}, ['1']);
267+
268+
expectObservable(source).toBe(expected, {x: ['1']});
269+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
270+
});
271+
272+
it('should emit accumulator if inner completes without value after source completes', function () {
273+
var e1 = hot('--a--^--b--c--d--e--f--g--|');
274+
var e1subs = '^ !';
275+
var expected = '-----------------------(x|)';
276+
277+
var source = e1.mergeScan(function (acc, x) {
278+
return Observable.empty().delay(50, rxTestScheduler);
279+
}, ['1']);
280+
281+
expectObservable(source).toBe(expected, {x: ['1']});
282+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
283+
});
284+
285+
it('should mergescan projects hot Observable with single concurrency', function () {
286+
var e1 = hot('---a---b---c---|');
287+
var e1subs = '^ !';
288+
289+
var inner = [
290+
hot( '--d--e--f--|'),
291+
hot( '----g----h----i----|'),
292+
hot( '------j------k-------l------|')
293+
];
294+
295+
var xsubs = ' ^ !';
296+
var ysubs = ' ^ !';
297+
var zsubs = ' ^ !';
298+
299+
var expected = '---x-e--f--f--i----i-l------|';
300+
301+
var index = 0;
302+
var source = e1.mergeScan(function (acc, x) {
303+
var value = inner[index++];
304+
return value.startWith(acc);
305+
}, 'x', 1);
306+
307+
expectObservable(source).toBe(expected);
308+
309+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
310+
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
311+
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
312+
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
313+
});
314+
315+
it('should mergescan projects cold Observable with dual concurrency', function () {
316+
var e1 = hot('----a----b----c----|');
317+
var e1subs = '^ !';
318+
319+
var inner = [
320+
cold( '---d---e---f---| '),
321+
cold( '---g---h---i---| '),
322+
cold( '---j---k---l---|')
323+
];
324+
325+
var xsubs = ' ^ !';
326+
var ysubs = ' ^ !';
327+
var zsubs = ' ^ !';
328+
329+
var expected = '----x--d-d-eg--fh--hi-j---k---l---|';
330+
331+
var index = 0;
332+
var source = e1.mergeScan(function (acc, x) {
333+
var value = inner[index++];
334+
return value.startWith(acc);
335+
}, 'x', 2);
336+
337+
expectObservable(source).toBe(expected);
338+
339+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
340+
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
341+
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
342+
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
343+
});
344+
345+
it('should mergescan projects hot Observable with dual concurrency', function () {
346+
var e1 = hot('---a---b---c---|');
347+
var e1subs = '^ !';
348+
349+
var inner = [
350+
hot( '--d--e--f--|'),
351+
hot( '----g----h----i----|'),
352+
hot( '------j------k-------l------|')
353+
];
354+
355+
var xsubs = ' ^ !';
356+
var ysubs = ' ^ !';
357+
var zsubs = ' ^ !';
358+
359+
var expected = '---x-e-efh-h-ki------l------|';
360+
361+
var index = 0;
362+
var source = e1.mergeScan(function (acc, x) {
363+
var value = inner[index++];
364+
return value.startWith(acc);
365+
}, 'x', 2);
366+
367+
expectObservable(source).toBe(expected);
368+
369+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
370+
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
371+
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
372+
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
373+
});
228374
});

src/Rx.KitchenSink.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
1414
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
1515
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
1616
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
17-
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) => Observable<R>;
17+
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R, concurrent?: number) => Observable<R>;
1818
switchFirst?: () => Observable<T>;
1919
switchMapFirst?: <R>(project: ((x: T, ix: number) => Observable<any>),
2020
projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;

src/operator/extended/mergeScan.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ import {errorObject} from '../../util/errorObject';
77
import {subscribeToResult} from '../../util/subscribeToResult';
88
import {OuterSubscriber} from '../../OuterSubscriber';
99

10-
export function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) {
11-
return this.lift(new MergeScanOperator(project, seed));
10+
export function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>,
11+
seed: R,
12+
concurrent: number = Number.POSITIVE_INFINITY) {
13+
return this.lift(new MergeScanOperator(project, seed, concurrent));
1214
}
1315

1416
export class MergeScanOperator<T, R> implements Operator<T, R> {
1517
constructor(private project: (acc: R, x: T) => Observable<R>,
1618
private seed: R,
17-
private concurrent: number = Number.POSITIVE_INFINITY) {
19+
private concurrent: number) {
1820
}
1921

2022
call(subscriber: Subscriber<R>): Subscriber<T> {
@@ -34,7 +36,7 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
3436
constructor(destination: Subscriber<R>,
3537
private project: (acc: R, x: T) => Observable<R>,
3638
private acc: R,
37-
private concurrent: number = Number.POSITIVE_INFINITY) {
39+
private concurrent: number) {
3840
super(destination);
3941
}
4042

0 commit comments

Comments
 (0)