Skip to content

Commit 43b63cc

Browse files
committed
fix(mergeAll): merge all will properly handle async observables
1 parent c2e2d29 commit 43b63cc

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

spec/operators/merge-all-spec.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,12 @@ describe('mergeAll', function () {
3333
done();
3434
});
3535
});
36+
37+
it('should handle merging a hot observable of observables', function (){
38+
var x = cold( 'a---b---c---|');
39+
var y = cold( 'd---e---f---|');
40+
var e1 = hot('--x--y--|', { x: x, y: y });
41+
var expected = '--a--db--ec--f---|';
42+
expectObservable(e1.mergeAll()).toBe(expected);
43+
});
3644
});

src/operators/mergeAll.ts

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,68 @@
11
import Observable from '../Observable';
2-
import { MergeOperator } from './merge-support';
2+
import Operator from '../Operator';
3+
import Subscriber from '../Subscriber';
4+
import Observer from '../Observer';
5+
import Subscription from '../Subscription';
36

4-
export default function mergeAll<R>(concurrent?: any): Observable<R> {
5-
return this.lift(new MergeOperator(concurrent));
7+
export default function mergeAll<R>(concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
8+
return this.lift(new MergeAllOperator(concurrent));
69
}
10+
11+
class MergeAllOperator<T, R> implements Operator<T, R> {
12+
constructor(private concurrent: number) {
13+
14+
}
15+
16+
call(observer: Observer<T>) {
17+
return new MergeAllSubscriber(observer, this.concurrent);
18+
}
19+
}
20+
21+
class MergeAllSubscriber<T> extends Subscriber<T> {
22+
private hasCompleted: boolean = false;
23+
private buffer: Observable<any>[] = [];
24+
private active: number = 0;
25+
constructor(destination: Observer<T>, private concurrent:number) {
26+
super(destination);
27+
}
28+
29+
_next(value: any) {
30+
if(this.active < this.concurrent) {
31+
const innerSub = new Subscription();
32+
this.add(innerSub);
33+
this.active++;
34+
innerSub.add(value.subscribe(new MergeAllInnerSubscriber(this.destination, this, innerSub)));
35+
} else {
36+
this.buffer.push(value);
37+
}
38+
}
39+
40+
_complete() {
41+
this.hasCompleted = true;
42+
if(this.active === 0 && this.buffer.length === 0) {
43+
this.destination.complete();
44+
}
45+
}
46+
47+
notifyComplete(innerSub: Subscription<T>) {
48+
const buffer = this.buffer;
49+
this.remove(innerSub);
50+
this.active--;
51+
if(buffer.length > 0) {
52+
this._next(buffer.shift());
53+
} else if (this.active === 0 && this.hasCompleted) {
54+
this.destination.complete();
55+
}
56+
}
57+
}
58+
59+
class MergeAllInnerSubscriber<T> extends Subscriber<T> {
60+
constructor(destination: Observer<T>, private parent: MergeAllSubscriber<T>,
61+
private innerSub: Subscription<T> ) {
62+
super(destination);
63+
}
64+
65+
_complete() {
66+
this.parent.notifyComplete(this.innerSub);
67+
}
68+
}

0 commit comments

Comments
 (0)