Skip to content

Commit

Permalink
refactor(delayWhen): use subscribeToResult with outerSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Feb 5, 2016
1 parent 142d99a commit 621ec68
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions src/operator/delayWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Subscription} from '../Subscription';

import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
* Returns an Observable that delays the emission of items from the source Observable
* by a subscription delay and a delay selector function for each element.
Expand All @@ -29,31 +33,41 @@ class DelayWhenOperator<T> implements Operator<T, T> {
}
}

class DelayWhenSubscriber<T> extends Subscriber<T> {
class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
private completed: boolean = false;
private delayNotifierSubscriptions: Array<Subscription> = [];
private values: Array<T> = [];

constructor(destination: Subscriber<T>,
private delayDurationSelector: (value: T) => Observable<any>) {
super(destination);
}

notifyNext(value: T, subscription: Subscription): void {
this.destination.next(value);
notifyNext(outerValue: T, innerValue: any,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(outerValue);
this.removeSubscription(innerSub);
this.tryComplete();
}

const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription);
if (subscriptionIdx !== -1) {
this.delayNotifierSubscriptions.splice(subscriptionIdx, 1);
}
notifyError(error: any, innerSub: InnerSubscriber<T, R>): void {
this._error(error);
}

notifyComplete(innerSub: InnerSubscriber<T, R>): void {
const value = this.removeSubscription(innerSub);
if (value) {
this.destination.next(value);
}
this.tryComplete();
}

protected _next(value: T): void {
try {
const delayNotifier = this.delayDurationSelector(value);
if (delayNotifier) {
this._tryNext(delayNotifier, value);
this.tryDelay(delayNotifier, value);
}
} catch (err) {
this.destination.error(err);
Expand All @@ -65,41 +79,32 @@ class DelayWhenSubscriber<T> extends Subscriber<T> {
this.tryComplete();
}

private _tryNext(delayNotifier: Observable<any>, value: T): void {
const notifierSubscription = new Subscription();
notifierSubscription.add(delayNotifier.subscribe(new DelayNotifierSubscriber(this, value, notifierSubscription)));
this.add(notifierSubscription);
this.delayNotifierSubscriptions.push(notifierSubscription);
}
private removeSubscription(subscription: InnerSubscriber<T, R>): T {
subscription.unsubscribe();

private tryComplete(): void {
if (this.completed && this.delayNotifierSubscriptions.length === 0) {
this.destination.complete();
}
}
}
const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription);
let value: T = null;

class DelayNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: DelayWhenSubscriber<T>, private value: T, private subscription: Subscription) {
super();
}
if (subscriptionIdx !== -1) {
value = this.values[subscriptionIdx];
this.delayNotifierSubscriptions.splice(subscriptionIdx, 1);
this.values.splice(subscriptionIdx, 1);
}

protected _next(unused: any) {
this.emitValue();
return value;
}

protected _error(err: any) {
this.parent.error(err);
}
private tryDelay(delayNotifier: Observable<any>, value: T): void {
const notifierSubscription = subscribeToResult(this, delayNotifier, value);
this.add(notifierSubscription);

protected _complete() {
this.emitValue();
this.delayNotifierSubscriptions.push(notifierSubscription);
this.values.push(value);
}

private emitValue(): void {
if (!this.isUnsubscribed) {
this.unsubscribe();
this.parent.notifyNext(this.value, this.subscription);
private tryComplete(): void {
if (this.completed && this.delayNotifierSubscriptions.length === 0) {
this.destination.complete();
}
}
}
Expand Down

0 comments on commit 621ec68

Please sign in to comment.