Skip to content

Commit

Permalink
feat(switchAll): add higher-order lettable version of switch
Browse files Browse the repository at this point in the history
- also fixes typings on switch
  • Loading branch information
benlesh committed Jun 16, 2017
1 parent f85c60e commit 2f12572
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 68 deletions.
71 changes: 3 additions & 68 deletions src/operator/switch.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { switchAll as higherOrder } from '../operators';

/**
* Converts a higher-order Observable into a first-order Observable by
Expand Down Expand Up @@ -48,66 +43,6 @@ import { subscribeToResult } from '../util/subscribeToResult';
* @name switch
* @owner Observable
*/
export function _switch<T>(this: Observable<T>): T {
return <any>this.lift<any>(new SwitchOperator());
}

class SwitchOperator<T, R> implements Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new SwitchSubscriber(subscriber));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class SwitchSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private hasCompleted: boolean = false;
innerSubscription: Subscription;

constructor(destination: Subscriber<R>) {
super(destination);
}

protected _next(value: T): void {
this.unsubscribeInner();
this.active++;
this.add(this.innerSubscription = subscribeToResult(this, value));
}

protected _complete(): void {
this.hasCompleted = true;
if (this.active === 0) {
this.destination.complete();
}
}

private unsubscribeInner(): void {
this.active = this.active > 0 ? this.active - 1 : 0;
const innerSubscription = this.innerSubscription;
if (innerSubscription) {
innerSubscription.unsubscribe();
this.remove(innerSubscription);
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}

notifyError(err: any): void {
this.destination.error(err);
}

notifyComplete(): void {
this.unsubscribeInner();
if (this.hasCompleted && this.active === 0) {
this.destination.complete();
}
}
export function _switch<T>(this: Observable<Observable<T>>): Observable<T> {
return higherOrder()(this);
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export { multicast } from './multicast';
export { publish } from './publish';
export { reduce } from './reduce';
export { scan } from './scan';
export { switchAll } from './switchAll';
export { switchMap } from './switchMap';
export { takeLast } from './takeLast';
export { tap } from './tap';
8 changes: 8 additions & 0 deletions src/operators/switchAll.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { OperatorFunction } from '../interfaces';
import { Observable } from '../Observable';
import { switchMap } from './switchMap';
import { identity } from '../util/identity';

export function switchAll<T>(): OperatorFunction<Observable<T>, T> {
return switchMap(identity);
}
3 changes: 3 additions & 0 deletions src/util/identity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function identity<T>(x: T): T {
return x;
}

0 comments on commit 2f12572

Please sign in to comment.