Skip to content

Commit

Permalink
fix(skipUntil): properly manages notifier subscription
Browse files Browse the repository at this point in the history
- No longer waits until notifier is complete to complete resulting observable
- Unsubs from notifier after first notification
- Updates tests to be correct
- Corrects some grammar in test descriptions

fixes #1886
  • Loading branch information
benlesh committed Apr 2, 2018
1 parent 11c0496 commit 889f84a
Showing 1 changed file with 10 additions and 20 deletions.
30 changes: 10 additions & 20 deletions src/internal/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Observable } from '../Observable';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types';
import { Subscription } from '../Subscription';

/**
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
Expand All @@ -26,8 +27,8 @@ class SkipUntilOperator<T> implements Operator<T, T> {
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier));
call(destination: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new SkipUntilSubscriber(destination, this.notifier));
}
}

Expand All @@ -39,12 +40,11 @@ class SkipUntilOperator<T> implements Operator<T, T> {
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {

private hasValue: boolean = false;
private isInnerStopped: boolean = false;
private innerSubscription: Subscription;

constructor(destination: Subscriber<any>,
notifier: Observable<any>) {
constructor(destination: Subscriber<R>, notifier: ObservableInput<any>) {
super(destination);
this.add(subscribeToResult(this, notifier));
this.add(this.innerSubscription = subscribeToResult(this, notifier));
}

protected _next(value: T) {
Expand All @@ -53,24 +53,14 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

protected _complete() {
if (this.isInnerStopped) {
super._complete();
} else {
this.unsubscribe();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.hasValue = true;
this.innerSubscription.unsubscribe();
}

notifyComplete(): void {
this.isInnerStopped = true;
if (this.isStopped) {
super._complete();
}
notifyComplete() {
/* do nothing */
}
}

0 comments on commit 889f84a

Please sign in to comment.