Skip to content

Commit bb2ddaa

Browse files
committed
feat(takeUntil): add higher-order lettable version of takeUntil
1 parent 089a5a6 commit bb2ddaa

File tree

3 files changed

+83
-41
lines changed

3 files changed

+83
-41
lines changed

src/operator/takeUntil.ts

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
1-
import { Operator } from '../Operator';
21
import { Observable } from '../Observable';
3-
import { Subscriber } from '../Subscriber';
4-
import { TeardownLogic } from '../Subscription';
5-
6-
import { OuterSubscriber } from '../OuterSubscriber';
7-
import { InnerSubscriber } from '../InnerSubscriber';
8-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { takeUntil as higherOrder } from '../operators/takeUntil';
93

104
/**
115
* Emits the values emitted by the source Observable until a `notifier`
@@ -41,38 +35,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
4135
* @owner Observable
4236
*/
4337
export function takeUntil<T>(this: Observable<T>, notifier: Observable<any>): Observable<T> {
44-
return this.lift(new TakeUntilOperator(notifier));
45-
}
46-
47-
class TakeUntilOperator<T> implements Operator<T, T> {
48-
constructor(private notifier: Observable<any>) {
49-
}
50-
51-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
52-
return source.subscribe(new TakeUntilSubscriber(subscriber, this.notifier));
53-
}
54-
}
55-
56-
/**
57-
* We need this JSDoc comment for affecting ESDoc.
58-
* @ignore
59-
* @extends {Ignored}
60-
*/
61-
class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
62-
63-
constructor(destination: Subscriber<any>,
64-
private notifier: Observable<any>) {
65-
super(destination);
66-
this.add(subscribeToResult(this, notifier));
67-
}
68-
69-
notifyNext(outerValue: T, innerValue: R,
70-
outerIndex: number, innerIndex: number,
71-
innerSub: InnerSubscriber<T, R>): void {
72-
this.complete();
73-
}
74-
75-
notifyComplete(): void {
76-
// noop
77-
}
38+
return higherOrder(notifier)(this);
7839
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export { switchMap } from './switchMap';
7171
export { switchMapTo } from './switchMapTo';
7272
export { take } from './take';
7373
export { takeLast } from './takeLast';
74+
export { takeUntil } from './takeUntil';
7475
export { tap } from './tap';
7576
export { timestamp } from './timestamp';
7677
export { toArray } from './toArray';

src/operators/takeUntil.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { Operator } from '../Operator';
2+
import { Observable } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { TeardownLogic } from '../Subscription';
5+
6+
import { OuterSubscriber } from '../OuterSubscriber';
7+
import { InnerSubscriber } from '../InnerSubscriber';
8+
import { subscribeToResult } from '../util/subscribeToResult';
9+
10+
import { MonoTypeOperatorFunction } from '../interfaces';
11+
12+
/**
13+
* Emits the values emitted by the source Observable until a `notifier`
14+
* Observable emits a value.
15+
*
16+
* <span class="informal">Lets values pass until a second Observable,
17+
* `notifier`, emits something. Then, it completes.</span>
18+
*
19+
* <img src="./img/takeUntil.png" width="100%">
20+
*
21+
* `takeUntil` subscribes and begins mirroring the source Observable. It also
22+
* monitors a second Observable, `notifier` that you provide. If the `notifier`
23+
* emits a value or a complete notification, the output Observable stops
24+
* mirroring the source Observable and completes.
25+
*
26+
* @example <caption>Tick every second until the first click happens</caption>
27+
* var interval = Rx.Observable.interval(1000);
28+
* var clicks = Rx.Observable.fromEvent(document, 'click');
29+
* var result = interval.takeUntil(clicks);
30+
* result.subscribe(x => console.log(x));
31+
*
32+
* @see {@link take}
33+
* @see {@link takeLast}
34+
* @see {@link takeWhile}
35+
* @see {@link skip}
36+
*
37+
* @param {Observable} notifier The Observable whose first emitted value will
38+
* cause the output Observable of `takeUntil` to stop emitting values from the
39+
* source Observable.
40+
* @return {Observable<T>} An Observable that emits the values from the source
41+
* Observable until such time as `notifier` emits its first value.
42+
* @method takeUntil
43+
* @owner Observable
44+
*/
45+
export function takeUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> {
46+
return (source: Observable<T>) => source.lift(new TakeUntilOperator(notifier));
47+
}
48+
49+
class TakeUntilOperator<T> implements Operator<T, T> {
50+
constructor(private notifier: Observable<any>) {
51+
}
52+
53+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
54+
return source.subscribe(new TakeUntilSubscriber(subscriber, this.notifier));
55+
}
56+
}
57+
58+
/**
59+
* We need this JSDoc comment for affecting ESDoc.
60+
* @ignore
61+
* @extends {Ignored}
62+
*/
63+
class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
64+
65+
constructor(destination: Subscriber<any>,
66+
private notifier: Observable<any>) {
67+
super(destination);
68+
this.add(subscribeToResult(this, notifier));
69+
}
70+
71+
notifyNext(outerValue: T, innerValue: R,
72+
outerIndex: number, innerIndex: number,
73+
innerSub: InnerSubscriber<T, R>): void {
74+
this.complete();
75+
}
76+
77+
notifyComplete(): void {
78+
// noop
79+
}
80+
}

0 commit comments

Comments
 (0)