Skip to content

Commit 6cc2cd6

Browse files
committed
feat(skipUntil): add higher-order lettable version of skipUntil
1 parent 6e1ff3c commit 6cc2cd6

File tree

3 files changed

+80
-60
lines changed

3 files changed

+80
-60
lines changed

src/operator/skipUntil.ts

Lines changed: 2 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
31
import { Observable } from '../Observable';
4-
import { TeardownLogic } from '../Subscription';
5-
import { OuterSubscriber } from '../OuterSubscriber';
6-
import { InnerSubscriber } from '../InnerSubscriber';
7-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { skipUntil as higherOrder } from '../operators/skipUntil';
83

94
/**
105
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
@@ -19,58 +14,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
1914
* @owner Observable
2015
*/
2116
export function skipUntil<T>(this: Observable<T>, notifier: Observable<any>): Observable<T> {
22-
return this.lift(new SkipUntilOperator(notifier));
23-
}
24-
25-
class SkipUntilOperator<T> implements Operator<T, T> {
26-
constructor(private notifier: Observable<any>) {
27-
}
28-
29-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
30-
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier));
31-
}
32-
}
33-
34-
/**
35-
* We need this JSDoc comment for affecting ESDoc.
36-
* @ignore
37-
* @extends {Ignored}
38-
*/
39-
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
40-
41-
private hasValue: boolean = false;
42-
private isInnerStopped: boolean = false;
43-
44-
constructor(destination: Subscriber<any>,
45-
notifier: Observable<any>) {
46-
super(destination);
47-
this.add(subscribeToResult(this, notifier));
48-
}
49-
50-
protected _next(value: T) {
51-
if (this.hasValue) {
52-
super._next(value);
53-
}
54-
}
55-
56-
protected _complete() {
57-
if (this.isInnerStopped) {
58-
super._complete();
59-
} else {
60-
this.unsubscribe();
61-
}
62-
}
63-
64-
notifyNext(outerValue: T, innerValue: R,
65-
outerIndex: number, innerIndex: number,
66-
innerSub: InnerSubscriber<T, R>): void {
67-
this.hasValue = true;
68-
}
69-
70-
notifyComplete(): void {
71-
this.isInnerStopped = true;
72-
if (this.isStopped) {
73-
super._complete();
74-
}
75-
}
17+
return higherOrder(notifier)(this);
7618
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ export { refCount } from './refCount';
6262
export { scan } from './scan';
6363
export { skip } from './skip';
6464
export { skipLast } from './skipLast';
65+
export { skipUntil } from './skipUntil';
6566
export { subscribeOn } from './subscribeOn';
6667
export { switchAll } from './switchAll';
6768
export { switchMap } from './switchMap';

src/operators/skipUntil.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable } from '../Observable';
4+
import { TeardownLogic } from '../Subscription';
5+
import { OuterSubscriber } from '../OuterSubscriber';
6+
import { InnerSubscriber } from '../InnerSubscriber';
7+
import { subscribeToResult } from '../util/subscribeToResult';
8+
import { MonoTypeOperatorFunction } from '../interfaces';
9+
10+
/**
11+
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
12+
*
13+
* <img src="./img/skipUntil.png" width="100%">
14+
*
15+
* @param {Observable} notifier - The second Observable that has to emit an item before the source Observable's elements begin to
16+
* be mirrored by the resulting Observable.
17+
* @return {Observable<T>} An Observable that skips items from the source Observable until the second Observable emits
18+
* an item, then emits the remaining items.
19+
* @method skipUntil
20+
* @owner Observable
21+
*/
22+
export function skipUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> {
23+
return (source: Observable<T>) => source.lift(new SkipUntilOperator(notifier));
24+
}
25+
26+
class SkipUntilOperator<T> implements Operator<T, T> {
27+
constructor(private notifier: Observable<any>) {
28+
}
29+
30+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
31+
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier));
32+
}
33+
}
34+
35+
/**
36+
* We need this JSDoc comment for affecting ESDoc.
37+
* @ignore
38+
* @extends {Ignored}
39+
*/
40+
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
41+
42+
private hasValue: boolean = false;
43+
private isInnerStopped: boolean = false;
44+
45+
constructor(destination: Subscriber<any>,
46+
notifier: Observable<any>) {
47+
super(destination);
48+
this.add(subscribeToResult(this, notifier));
49+
}
50+
51+
protected _next(value: T) {
52+
if (this.hasValue) {
53+
super._next(value);
54+
}
55+
}
56+
57+
protected _complete() {
58+
if (this.isInnerStopped) {
59+
super._complete();
60+
} else {
61+
this.unsubscribe();
62+
}
63+
}
64+
65+
notifyNext(outerValue: T, innerValue: R,
66+
outerIndex: number, innerIndex: number,
67+
innerSub: InnerSubscriber<T, R>): void {
68+
this.hasValue = true;
69+
}
70+
71+
notifyComplete(): void {
72+
this.isInnerStopped = true;
73+
if (this.isStopped) {
74+
super._complete();
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)