Skip to content

Commit 24fdd34

Browse files
committed
feat(switch): add promise, iterable and array support
- switch will now work with an observable of mixed Observables, observables, arrays, promises and iterables
1 parent d249895 commit 24fdd34

File tree

4 files changed

+41
-25
lines changed

4 files changed

+41
-25
lines changed

spec/operators/switch-spec.js

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
/* expect, it, describe, expectObserable, hot, cold */
22
var Rx = require('../../dist/cjs/Rx');
3+
var Promise = require('promise');
34

45
var Observable = Rx.Observable;
56
var immediateScheduler = Rx.Scheduler.immediate;
67

7-
describe('Observable.prototype.switch()', function(){
8+
fdescribe('Observable.prototype.switch()', function(){
89
it("should switch to each immediately-scheduled inner Observable", function (done) {
910
var a = Observable.of(1, 2, 3, immediateScheduler);
1011
var b = Observable.of(4, 5, 6, immediateScheduler);
@@ -51,4 +52,33 @@ describe('Observable.prototype.switch()', function(){
5152
var expected = '--------a---b----d--e---f---|';
5253
expectObservable(e1.switch()).toBe(expected);
5354
});
55+
56+
it('should handle an observable of promises', function(done){
57+
var expected = [3];
58+
59+
Observable.of(Promise.resolve(1), Promise.resolve(2), Promise.resolve(3))
60+
.switch()
61+
.subscribe(function(x) {
62+
expect(x).toBe(expected.shift());
63+
}, null, function(){
64+
expect(expected.length).toBe(0);
65+
done();
66+
});
67+
});
68+
69+
it('should handle an observable with Arrays in it', function() {
70+
var expected = [1,2,3,4];
71+
var completed = false;
72+
73+
Observable.of(Observable.never(), Observable.never(), [1,2,3,4])
74+
.switch()
75+
.subscribe(function(x) {
76+
expect(x).toBe(expected.shift());
77+
}, null, function() {
78+
completed = true;
79+
expect(expected.length).toBe(0);
80+
});
81+
82+
expect(completed).toBe(true);
83+
})
5484
});

src/operators/switch.ts

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import Observer from '../Observer';
33
import Observable from '../Observable';
44
import Subscriber from '../Subscriber';
55
import Subscription from '../Subscription';
6+
import OuterSubscriber from '../OuterSubscriber';
7+
import subscribeToResult from '../util/subscribeToResult';
68

79
export default function _switch<T>(): Observable<T> {
810
return this.lift(new SwitchOperator());
@@ -18,7 +20,7 @@ class SwitchOperator<T, R> implements Operator<T, R> {
1820
}
1921
}
2022

21-
class SwitchSubscriber<T> extends Subscriber<T> {
23+
class SwitchSubscriber<T, R> extends OuterSubscriber<T, R> {
2224
private active: number = 0;
2325
private hasCompleted: boolean = false;
2426
innerSubscription: Subscription<T>;
@@ -28,9 +30,9 @@ class SwitchSubscriber<T> extends Subscriber<T> {
2830
}
2931

3032
_next(value: any) {
31-
this.active++;
3233
this.unsubscribeInner();
33-
this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this)));
34+
this.active++;
35+
this.add(this.innerSubscription = subscribeToResult(this, value));
3436
}
3537

3638
_complete() {
@@ -41,15 +43,15 @@ class SwitchSubscriber<T> extends Subscriber<T> {
4143
}
4244

4345
unsubscribeInner() {
46+
this.active = this.active > 0 ? this.active - 1 : 0;
4447
const innerSubscription = this.innerSubscription;
4548
if(innerSubscription) {
46-
this.active--;
4749
innerSubscription.unsubscribe();
4850
this.remove(innerSubscription);
4951
}
5052
}
5153

52-
notifyNext(value: T) {
54+
notifyNext(value: any) {
5355
this.destination.next(value);
5456
}
5557

@@ -65,21 +67,3 @@ class SwitchSubscriber<T> extends Subscriber<T> {
6567
}
6668
}
6769

68-
class InnerSwitchSubscriber<T> extends Subscriber<T> {
69-
constructor(private parent: SwitchSubscriber<T>) {
70-
super();
71-
}
72-
73-
_next(value: T) {
74-
this.parent.notifyNext(value);
75-
}
76-
77-
_error(err: any) {
78-
this.parent.notifyError(err);
79-
}
80-
81-
_complete() {
82-
this.parent.notifyComplete();
83-
}
84-
}
85-

src/operators/switchMap.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export default function switchMap<T, R, R2>(project: (value: T, index: number) =
1414
return this.lift(new SwitchMapOperator(project, resultSelector));
1515
}
1616

17+
1718
class SwitchMapOperator<T, R, R2> implements Operator<T, R> {
1819
constructor(private project: (value: T, index: number) => Observable<R>,
1920
private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) {

src/util/subscribeToResult.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ export default function subscribeToResult<T, R, R2>(outerSubscriber: OuterSubscr
3939
} else if (typeof result.then === 'function') {
4040
result.then(x => {
4141
if(!destination.isUnsubscribed) {
42-
destination.next(result);
42+
destination.next(x);
4343
destination.complete();
4444
}
4545
}, err => destination.error(err));
46+
return destination;
4647
} else if (typeof result[$$iterator] === 'function') {
4748
for(let item of result) {
4849
destination.next(item);

0 commit comments

Comments
 (0)