Skip to content

Commit e4dd1fd

Browse files
committed
feat(throttle): add higher-order lettable version of throttle
1 parent 7bb8280 commit e4dd1fd

File tree

3 files changed

+163
-113
lines changed

3 files changed

+163
-113
lines changed

src/operator/throttle.ts

Lines changed: 2 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,5 @@
1-
import { Operator } from '../Operator';
21
import { Observable, SubscribableOrPromise } from '../Observable';
3-
import { Subscriber } from '../Subscriber';
4-
import { Subscription, TeardownLogic } from '../Subscription';
5-
6-
import { OuterSubscriber } from '../OuterSubscriber';
7-
import { InnerSubscriber } from '../InnerSubscriber';
8-
import { subscribeToResult } from '../util/subscribeToResult';
9-
10-
export interface ThrottleConfig {
11-
leading?: boolean;
12-
trailing?: boolean;
13-
}
14-
15-
export const defaultThrottleConfig: ThrottleConfig = {
16-
leading: true,
17-
trailing: false
18-
};
2+
import { throttle as higherOrder, ThrottleConfig, defaultThrottleConfig } from '../operators/throttle';
193

204
/**
215
* Emits a value from the source Observable, then ignores subsequent source
@@ -60,100 +44,5 @@ export const defaultThrottleConfig: ThrottleConfig = {
6044
export function throttle<T>(this: Observable<T>,
6145
durationSelector: (value: T) => SubscribableOrPromise<number>,
6246
config: ThrottleConfig = defaultThrottleConfig): Observable<T> {
63-
return this.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing));
64-
}
65-
66-
class ThrottleOperator<T> implements Operator<T, T> {
67-
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>,
68-
private leading: boolean,
69-
private trailing: boolean) {
70-
}
71-
72-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
73-
return source.subscribe(
74-
new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing)
75-
);
76-
}
77-
}
78-
79-
/**
80-
* We need this JSDoc comment for affecting ESDoc
81-
* @ignore
82-
* @extends {Ignored}
83-
*/
84-
class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
85-
private throttled: Subscription;
86-
private _trailingValue: T;
87-
private _hasTrailingValue = false;
88-
89-
constructor(protected destination: Subscriber<T>,
90-
private durationSelector: (value: T) => SubscribableOrPromise<number>,
91-
private _leading: boolean,
92-
private _trailing: boolean) {
93-
super(destination);
94-
}
95-
96-
protected _next(value: T): void {
97-
if (this.throttled) {
98-
if (this._trailing) {
99-
this._hasTrailingValue = true;
100-
this._trailingValue = value;
101-
}
102-
} else {
103-
const duration = this.tryDurationSelector(value);
104-
if (duration) {
105-
this.add(this.throttled = subscribeToResult(this, duration));
106-
}
107-
if (this._leading) {
108-
this.destination.next(value);
109-
if (this._trailing) {
110-
this._hasTrailingValue = true;
111-
this._trailingValue = value;
112-
}
113-
}
114-
}
115-
}
116-
117-
private tryDurationSelector(value: T): SubscribableOrPromise<any> {
118-
try {
119-
return this.durationSelector(value);
120-
} catch (err) {
121-
this.destination.error(err);
122-
return null;
123-
}
124-
}
125-
126-
protected _unsubscribe() {
127-
const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this;
128-
129-
this._trailingValue = null;
130-
this._hasTrailingValue = false;
131-
132-
if (throttled) {
133-
this.remove(throttled);
134-
this.throttled = null;
135-
throttled.unsubscribe();
136-
}
137-
}
138-
139-
private _sendTrailing() {
140-
const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this;
141-
if (throttled && _trailing && _hasTrailingValue) {
142-
destination.next(_trailingValue);
143-
this._trailingValue = null;
144-
this._hasTrailingValue = false;
145-
}
146-
}
147-
148-
notifyNext(outerValue: T, innerValue: R,
149-
outerIndex: number, innerIndex: number,
150-
innerSub: InnerSubscriber<T, R>): void {
151-
this._sendTrailing();
152-
this._unsubscribe();
153-
}
154-
155-
notifyComplete(): void {
156-
this._sendTrailing();
157-
this._unsubscribe();
158-
}
47+
return higherOrder(durationSelector, config)(this);
15948
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export { switchAll } from './switchAll';
6565
export { switchMap } from './switchMap';
6666
export { takeLast } from './takeLast';
6767
export { tap } from './tap';
68+
export { throttle } from './throttle';
6869
export { timestamp } from './timestamp';
6970
export { toArray } from './toArray';
7071
export { window } from './window';

src/operators/throttle.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import { Operator } from '../Operator';
2+
import { Observable, SubscribableOrPromise } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { Subscription, TeardownLogic } from '../Subscription';
5+
6+
import { OuterSubscriber } from '../OuterSubscriber';
7+
import { InnerSubscriber } from '../InnerSubscriber';
8+
import { subscribeToResult } from '../util/subscribeToResult';
9+
10+
import { MonoTypeOperatorFunction } from '../interfaces';
11+
12+
export interface ThrottleConfig {
13+
leading?: boolean;
14+
trailing?: boolean;
15+
}
16+
17+
export const defaultThrottleConfig: ThrottleConfig = {
18+
leading: true,
19+
trailing: false
20+
};
21+
22+
/**
23+
* Emits a value from the source Observable, then ignores subsequent source
24+
* values for a duration determined by another Observable, then repeats this
25+
* process.
26+
*
27+
* <span class="informal">It's like {@link throttleTime}, but the silencing
28+
* duration is determined by a second Observable.</span>
29+
*
30+
* <img src="./img/throttle.png" width="100%">
31+
*
32+
* `throttle` emits the source Observable values on the output Observable
33+
* when its internal timer is disabled, and ignores source values when the timer
34+
* is enabled. Initially, the timer is disabled. As soon as the first source
35+
* value arrives, it is forwarded to the output Observable, and then the timer
36+
* is enabled by calling the `durationSelector` function with the source value,
37+
* which returns the "duration" Observable. When the duration Observable emits a
38+
* value or completes, the timer is disabled, and this process repeats for the
39+
* next source value.
40+
*
41+
* @example <caption>Emit clicks at a rate of at most one click per second</caption>
42+
* var clicks = Rx.Observable.fromEvent(document, 'click');
43+
* var result = clicks.throttle(ev => Rx.Observable.interval(1000));
44+
* result.subscribe(x => console.log(x));
45+
*
46+
* @see {@link audit}
47+
* @see {@link debounce}
48+
* @see {@link delayWhen}
49+
* @see {@link sample}
50+
* @see {@link throttleTime}
51+
*
52+
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
53+
* that receives a value from the source Observable, for computing the silencing
54+
* duration for each source value, returned as an Observable or a Promise.
55+
* @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults
56+
* to `{ leading: true, trailing: false }`.
57+
* @return {Observable<T>} An Observable that performs the throttle operation to
58+
* limit the rate of emissions from the source.
59+
* @method throttle
60+
* @owner Observable
61+
*/
62+
export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<number>,
63+
config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T> {
64+
return (source: Observable<T>) => source.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing));
65+
}
66+
67+
class ThrottleOperator<T> implements Operator<T, T> {
68+
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>,
69+
private leading: boolean,
70+
private trailing: boolean) {
71+
}
72+
73+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
74+
return source.subscribe(
75+
new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing)
76+
);
77+
}
78+
}
79+
80+
/**
81+
* We need this JSDoc comment for affecting ESDoc
82+
* @ignore
83+
* @extends {Ignored}
84+
*/
85+
class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
86+
private throttled: Subscription;
87+
private _trailingValue: T;
88+
private _hasTrailingValue = false;
89+
90+
constructor(protected destination: Subscriber<T>,
91+
private durationSelector: (value: T) => SubscribableOrPromise<number>,
92+
private _leading: boolean,
93+
private _trailing: boolean) {
94+
super(destination);
95+
}
96+
97+
protected _next(value: T): void {
98+
if (this.throttled) {
99+
if (this._trailing) {
100+
this._hasTrailingValue = true;
101+
this._trailingValue = value;
102+
}
103+
} else {
104+
const duration = this.tryDurationSelector(value);
105+
if (duration) {
106+
this.add(this.throttled = subscribeToResult(this, duration));
107+
}
108+
if (this._leading) {
109+
this.destination.next(value);
110+
if (this._trailing) {
111+
this._hasTrailingValue = true;
112+
this._trailingValue = value;
113+
}
114+
}
115+
}
116+
}
117+
118+
private tryDurationSelector(value: T): SubscribableOrPromise<any> {
119+
try {
120+
return this.durationSelector(value);
121+
} catch (err) {
122+
this.destination.error(err);
123+
return null;
124+
}
125+
}
126+
127+
protected _unsubscribe() {
128+
const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this;
129+
130+
this._trailingValue = null;
131+
this._hasTrailingValue = false;
132+
133+
if (throttled) {
134+
this.remove(throttled);
135+
this.throttled = null;
136+
throttled.unsubscribe();
137+
}
138+
}
139+
140+
private _sendTrailing() {
141+
const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this;
142+
if (throttled && _trailing && _hasTrailingValue) {
143+
destination.next(_trailingValue);
144+
this._trailingValue = null;
145+
this._hasTrailingValue = false;
146+
}
147+
}
148+
149+
notifyNext(outerValue: T, innerValue: R,
150+
outerIndex: number, innerIndex: number,
151+
innerSub: InnerSubscriber<T, R>): void {
152+
this._sendTrailing();
153+
this._unsubscribe();
154+
}
155+
156+
notifyComplete(): void {
157+
this._sendTrailing();
158+
this._unsubscribe();
159+
}
160+
}

0 commit comments

Comments
 (0)