Skip to content

Commit 889f84a

Browse files
committed
fix(skipUntil): properly manages notifier subscription
- No longer waits until notifier is complete to complete resulting observable - Unsubs from notifier after first notification - Updates tests to be correct - Corrects some grammar in test descriptions fixes #1886
1 parent 11c0496 commit 889f84a

File tree

1 file changed

+10
-20
lines changed

1 file changed

+10
-20
lines changed

src/internal/operators/skipUntil.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { Observable } from '../Observable';
44
import { OuterSubscriber } from '../OuterSubscriber';
55
import { InnerSubscriber } from '../InnerSubscriber';
66
import { subscribeToResult } from '../util/subscribeToResult';
7-
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
7+
import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types';
8+
import { Subscription } from '../Subscription';
89

910
/**
1011
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
@@ -26,8 +27,8 @@ class SkipUntilOperator<T> implements Operator<T, T> {
2627
constructor(private notifier: Observable<any>) {
2728
}
2829

29-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
30-
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier));
30+
call(destination: Subscriber<T>, source: any): TeardownLogic {
31+
return source.subscribe(new SkipUntilSubscriber(destination, this.notifier));
3132
}
3233
}
3334

@@ -39,12 +40,11 @@ class SkipUntilOperator<T> implements Operator<T, T> {
3940
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
4041

4142
private hasValue: boolean = false;
42-
private isInnerStopped: boolean = false;
43+
private innerSubscription: Subscription;
4344

44-
constructor(destination: Subscriber<any>,
45-
notifier: Observable<any>) {
45+
constructor(destination: Subscriber<R>, notifier: ObservableInput<any>) {
4646
super(destination);
47-
this.add(subscribeToResult(this, notifier));
47+
this.add(this.innerSubscription = subscribeToResult(this, notifier));
4848
}
4949

5050
protected _next(value: T) {
@@ -53,24 +53,14 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
5353
}
5454
}
5555

56-
protected _complete() {
57-
if (this.isInnerStopped) {
58-
super._complete();
59-
} else {
60-
this.unsubscribe();
61-
}
62-
}
63-
6456
notifyNext(outerValue: T, innerValue: R,
6557
outerIndex: number, innerIndex: number,
6658
innerSub: InnerSubscriber<T, R>): void {
6759
this.hasValue = true;
60+
this.innerSubscription.unsubscribe();
6861
}
6962

70-
notifyComplete(): void {
71-
this.isInnerStopped = true;
72-
if (this.isStopped) {
73-
super._complete();
74-
}
63+
notifyComplete() {
64+
/* do nothing */
7565
}
7666
}

0 commit comments

Comments
 (0)