Skip to content

Commit 47b178b

Browse files
committed
feat(operator): Add expand operator.
1 parent 526e4c9 commit 47b178b

File tree

4 files changed

+104
-0
lines changed

4 files changed

+104
-0
lines changed

spec/operators/expand-spec.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
var Rx = require('../../dist/cjs/Rx');
2+
var Observable = Rx.Observable;
3+
4+
describe('Observable.prototype.expand()', function () {
5+
it('should map and recursively flatten', function (done) {
6+
var expected = [1, 2, 3, 4, 5];
7+
Observable.of(0).expand(function (x) {
8+
if (x > 4) {
9+
return Observable.empty();
10+
}
11+
return Observable.of(x + 1);
12+
})
13+
.subscribe(function (x) {
14+
expect(x).toBe(expected.shift());
15+
}, null, done);
16+
});
17+
it('should map and recursively flatten with ScalarObservables', function (done) {
18+
var expected = [1, 2, 3, 4, 5];
19+
Observable.return(0).expand(function (x) {
20+
if (x > 4) {
21+
return Observable.empty();
22+
}
23+
return Observable.return(x + 1);
24+
})
25+
.subscribe(function (x) {
26+
expect(x).toBe(expected.shift());
27+
}, null, done);
28+
});
29+
});

src/Observable.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ export default class Observable<T> {
107107
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
108108
concurrent?: number) => Observable<R>;
109109

110+
expand: (project: (x: T, ix: number) => Observable<any>) => Observable<any>;
111+
110112
switchAll: <R>() => Observable<R>;
111113
switchLatest: <R>(project: ((x: T, ix: number) => Observable<any>),
112114
projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;

src/Rx.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import flatMapTo from './operators/flatMapTo';
6363
import switchAll from './operators/switchAll';
6464
import switchLatest from './operators/switchLatest';
6565
import switchLatestTo from './operators/switchLatestTo';
66+
import expand from './operators/expand';
6667

6768
Observable.merge = merge;
6869
observableProto.merge = merge;
@@ -72,6 +73,7 @@ observableProto.flatMapTo = flatMapTo;
7273
observableProto.switchAll = switchAll;
7374
observableProto.switchLatest = switchLatest;
7475
observableProto.switchLatestTo = switchLatestTo;
76+
observableProto.expand = expand;
7577

7678
import map from './operators/map';
7779
import mapTo from './operators/mapTo';

src/operators/expand.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import Operator from '../Operator';
2+
import Observer from '../Observer';
3+
import Observable from '../Observable';
4+
import Subscriber from '../Subscriber';
5+
6+
import {MergeSubscriber, MergeInnerSubscriber} from './merge';
7+
import EmptyObservable from '../observables/EmptyObservable';
8+
import ScalarObservable from '../observables/ScalarObservable';
9+
10+
import tryCatch from '../util/tryCatch';
11+
import {errorObject} from '../util/errorObject';
12+
13+
export default function expand<T>(project: (x: T, ix: number) => Observable<any>): Observable<any> {
14+
return this.lift(new ExpandOperator(project));
15+
}
16+
17+
export class ExpandOperator<T, R> extends Operator<T, R> {
18+
19+
project: (x: T, ix: number) => Observable<any>;
20+
21+
constructor(project: (x: T, ix: number) => Observable<any>) {
22+
super();
23+
this.project = project;
24+
}
25+
26+
call(observer: Observer<R>): Observer<T> {
27+
return new ExpandSubscriber(observer, this.project);
28+
}
29+
}
30+
31+
export class ExpandSubscriber<T, R> extends MergeSubscriber<T, R> {
32+
33+
project: (x: T, ix: number) => Observable<any>;
34+
35+
constructor(destination: Observer<R>,
36+
project: (x: T, ix: number) => Observable<any>) {
37+
super(destination, Number.POSITIVE_INFINITY);
38+
this.project = project;
39+
}
40+
41+
_project(value, index) {
42+
const observable = tryCatch(this.project).call(this, value, index);
43+
if (observable === errorObject) {
44+
this.error(errorObject.e);
45+
return null;
46+
}
47+
return observable;
48+
}
49+
50+
_subscribeInner(observable, value, index) {
51+
if(observable instanceof ScalarObservable) {
52+
this.destination.next((<ScalarObservable<T>> observable).value);
53+
this._innerComplete();
54+
this._next((<ScalarObservable<T>> observable).value);
55+
} else if(observable instanceof EmptyObservable) {
56+
this._innerComplete();
57+
} else {
58+
return observable.subscribe(new ExpandInnerSubscriber(this));
59+
}
60+
}
61+
}
62+
63+
export class ExpandInnerSubscriber<T, R> extends MergeInnerSubscriber<T, R> {
64+
constructor(parent: ExpandSubscriber<T, R>) {
65+
super(parent);
66+
}
67+
_next(value) {
68+
this.destination.next(value);
69+
this.parent.next(value);
70+
}
71+
}

0 commit comments

Comments
 (0)