-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy paththrottle.ts
552 lines (501 loc) · 17.2 KB
/
throttle.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
import assert from "assert";
import { createHash } from "crypto";
import { PersistentCache } from "./cache";
import { FaastError, FaastErrorNames } from "./error";
import { deserialize, serialize } from "./serialize";
import { sleep } from "./shared";
export class Deferred<T = void> {
promise: Promise<T>;
resolve!: (arg: T | PromiseLike<T>) => void;
reject!: (err?: any) => void;
constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
}
export class DeferredWorker<T = void> extends Deferred<T> {
constructor(
private worker: () => Promise<T>,
private cancel?: () => string | undefined
) {
super();
}
async execute() {
const cancelMessage = this.cancel?.();
if (cancelMessage) {
this.reject(new FaastError({ name: FaastErrorNames.ECANCEL }, cancelMessage));
} else {
try {
const rv = await this.worker();
this.resolve(rv);
} catch (err: any) {
this.reject(err);
}
}
}
}
function popFirst<T>(set: Set<T>): T | undefined {
let firstElem: T | undefined;
for (const elem of set) {
firstElem = elem;
break;
}
if (firstElem) {
set.delete(firstElem);
}
return firstElem;
}
export type RetryType = number | ((err: any, retries: number) => boolean);
export async function retryOp<T>(retryN: RetryType, fn: (retries: number) => Promise<T>) {
const retryTest =
typeof retryN === "function" ? retryN : (_: any, i: number) => i < retryN;
for (let i = 0; true; i++) {
try {
return await fn(i);
} catch (err: any) {
if (!retryTest(err, i)) {
throw err;
}
await sleep(
Math.min(30 * 1000, 1000 * (1 + Math.random()) * 2 ** i) + Math.random()
);
}
}
}
export class Funnel<T = void> {
protected pendingQueue: Set<DeferredWorker<T>> = new Set();
protected executingQueue: Set<DeferredWorker<T>> = new Set();
public processed = 0;
public errors = 0;
constructor(public concurrency: number = 0, protected shouldRetry?: RetryType) {}
push(
worker: () => Promise<T>,
shouldRetry?: RetryType,
cancel?: () => string | undefined
) {
const retryTest = shouldRetry || this.shouldRetry || 0;
const retryWorker = () => retryOp(retryTest, worker);
const future = new DeferredWorker(retryWorker, cancel);
this.pendingQueue.add(future);
setImmediate(() => this.doWork());
return future.promise;
}
clear() {
this.pendingQueue.clear();
this.executingQueue.clear();
}
promises() {
return [...this.executingQueue, ...this.pendingQueue].map(p => p.promise);
}
all() {
return Promise.all(this.promises().map(p => p.catch(_ => {})));
}
size() {
return this.pendingQueue.size + this.executingQueue.size;
}
setMaxConcurrency(maxConcurrency: number) {
this.concurrency = maxConcurrency;
}
getConcurrency() {
return this.executingQueue.size;
}
protected doWork() {
const { pendingQueue } = this;
while (
pendingQueue.size > 0 &&
(!this.concurrency || this.executingQueue.size < this.concurrency)
) {
const worker = popFirst(pendingQueue)!;
this.executingQueue.add(worker);
worker.promise
.then(_ => this.processed++)
.catch(_ => this.errors++)
.then(_ => {
this.executingQueue.delete(worker);
this.doWork();
});
worker.execute();
}
}
}
/**
* @internal
*/
export interface PumpOptions {
concurrency: number;
verbose?: boolean;
}
/**
* @internal
*/
export class Pump<T = void> extends Funnel<T | void> {
stopped: boolean = false;
constructor(protected options: PumpOptions, protected worker: () => Promise<T>) {
super(options.concurrency);
options.verbose = options.verbose ?? true;
}
start() {
const restart = () => {
if (this.stopped) {
return;
}
while (this.executingQueue.size + this.pendingQueue.size < this.concurrency) {
this.push(async () => {
try {
return await this.worker();
} catch (err: any) {
this.options.verbose && console.error(err);
return;
} finally {
setImmediate(restart);
}
});
}
};
this.stopped = false;
restart();
}
stop() {
this.stopped = true;
}
drain() {
this.stop();
return this.all();
}
setMaxConcurrency(concurrency: number) {
super.setMaxConcurrency(concurrency);
if (!this.stopped) {
this.start();
}
}
}
export class RateLimiter<T = void> {
protected lastTick = 0;
protected bucket = 0;
protected queue: Set<DeferredWorker<T>> = new Set();
constructor(protected targetRequestsPerSecond: number, protected burst: number = 1) {
assert(targetRequestsPerSecond > 0);
assert(this.burst >= 1);
}
push(worker: () => Promise<T>, cancel?: () => string | undefined) {
this.updateBucket();
if (this.queue.size === 0 && this.bucket <= this.burst - 1) {
this.bucket++;
return worker();
}
const future = new DeferredWorker(worker, cancel);
this.queue.add(future);
if (this.queue.size === 1) {
this.drainQueue();
}
return future.promise;
}
protected updateBucket() {
const now = Date.now();
const secondsElapsed = (now - this.lastTick) / 1000;
this.bucket -= secondsElapsed * this.targetRequestsPerSecond;
this.bucket = Math.max(this.bucket, 0);
this.lastTick = now;
}
protected async drainQueue() {
const requestAmountToDrain = 1 - (this.burst - this.bucket);
const secondsToDrain = requestAmountToDrain / this.targetRequestsPerSecond;
if (secondsToDrain > 0) {
await sleep(Math.ceil(secondsToDrain * 1000));
}
this.updateBucket();
while (this.bucket <= this.burst - 1) {
const next = popFirst(this.queue);
if (!next) {
break;
}
this.bucket++;
next.execute();
}
if (this.queue.size > 0) {
this.drainQueue();
}
}
clear() {
this.queue.clear();
}
}
/**
* Specify {@link throttle} limits. These limits shape the way throttle invokes
* the underlying function.
* @public
*/
export interface Limits {
/**
* The maximum number of concurrent executions of the underlying function to
* allow. Must be supplied, there is no default. Specifying `0` or
* `Infinity` is allowed and means there is no concurrency limit.
*/
concurrency: number;
/**
* The maximum number of calls per second to allow to the underlying
* function. Default: no rate limit.
*/
rate?: number;
/**
* The maximum number of calls to the underlying function to "burst" -- e.g.
* the number that can be issued immediately as long as the rate limit is
* not exceeded. For example, if rate is 5 and burst is 5, and 10 calls are
* made to the throttled function, 5 calls are made immediately and then
* after 1 second, another 5 calls are made immediately. Setting burst to 1
* means calls are issued uniformly every `1/rate` seconds. If `rate` is not
* specified, then `burst` does not apply. Default: 1.
*/
burst?: number;
/**
* Retry if the throttled function returns a rejected promise. `retry` can
* be a number or a function. If it is a number `N`, then up to `N`
* additional attempts are made in addition to the initial call. If retry is
* a function, it should return `true` if another retry attempt should be
* made, otherwise `false`. The first argument will be the value of the
* rejected promise from the previous call attempt, and the second argument
* will be the number of previous retry attempts (e.g. the first call will
* have value 0). Default: 0 (no retry attempts).
*/
retry?: number | ((err: any, retries: number) => boolean);
/**
* If `memoize` is `true`, then every call to the throttled function will be
* saved as an entry in a map from arguments to return value. If same
* arguments are seen again in a future call, the return value is retrieved
* from the Map rather than calling the function again. This can be useful
* for avoiding redundant calls that are expected to return the same results
* given the same arguments.
*
* The arguments will be captured with `JSON.stringify`, therefore types
* that do not stringify uniquely won't be distinguished from each other.
* Care must be taken when specifying `memoize` to ensure avoid incorrect
* results.
*/
memoize?: boolean;
/**
* Similar to `memoize` except the map from function arguments to results is
* stored in a persistent cache on disk. This is useful to prevent redundant
* calls to APIs which are expected to return the same results for the same
* arguments, and which are likely to be called across many faast.js module
* instantiations. This is used internally by faast.js for caching cloud
* prices for AWS, and for saving the last garbage collection
* date for AWS. Persistent cache entries expire after a period of time. See
* {@link PersistentCache}.
*/
cache?: PersistentCache;
/**
* A promise that, if resolved, causes cancellation of pending throttled
* invocations. This is typically created using `Deferred`. The idea is to
* use the resolving of the promise as an asynchronous signal that any
* pending invocations in this throttled function should be cleared.
* @internal
*/
cancel?: Promise<void>;
}
export function memoizeFn<A extends any[], R>(
fn: (...args: A) => R,
cache: Map<string, R> = new Map()
) {
return (...args: A) => {
const key = JSON.stringify(args);
const prev = cache.get(key);
if (prev) {
return prev;
}
const value = fn(...args);
cache.set(key, value);
return value;
};
}
export function cacheFn<A extends any[], R>(
cache: PersistentCache,
fn: (...args: A) => Promise<R>
) {
return async (...args: A) => {
const key = serialize(args, true);
const hasher = createHash("sha256");
hasher.update(key);
const cacheKey = hasher.digest("hex");
const prev = await cache.get(cacheKey);
if (prev) {
const str = prev.toString();
if (str === "undefined") {
return undefined;
}
return deserialize(str);
}
const value = await fn(...args);
await cache.set(cacheKey, serialize(value, true));
return value;
};
}
/**
* A decorator for rate limiting, concurrency limiting, retry, memoization, and
* on-disk caching. See {@link Limits}.
* @remarks
* When programming against cloud services, databases, and other resources, it
* is often necessary to control the rate of request issuance to avoid
* overwhelming the service provider. In many cases the provider has built-in
* safeguards against abuse, which automatically fail requests if they are
* coming in too fast. Some systems don't have safeguards and precipitously
* degrade their service level or fail outright when faced with excessive load.
*
* With faast.js it becomes very easy to (accidentally) generate requests from
* thousands of cloud functions. The `throttle` function can help manage request
* flow without resorting to setting up a separate service. This is in keeping
* with faast.js' zero-ops philosophy.
*
* Usage is simple:
*
* ```typescript
* async function operation() { ... }
* const throttledOperation = throttle({ concurrency: 10, rate: 5 }, operation);
* for(let i = 0; i < 100; i++) {
* // at most 10 concurrent executions at a rate of 5 invocations per second.
* throttledOperation();
* }
* ```
*
* Note that each invocation to `throttle` creates a separate function with a
* separate limits. Therefore it is likely that you want to use `throttle` in a
* global context, not within a dynamic context:
*
* ```typescript
* async function operation() { ... }
* for(let i = 0; i < 100; i++) {
* // WRONG - each iteration creates a separate throttled function that's only called once.
* const throttledOperation = throttle({ concurrency: 10, rate: 5 }, operation);
* throttledOperation();
* }
* ```
*
* A better way to use throttle avoids creating a named `operation` function
* altogether, ensuring it cannot be accidentally called without throttling:
*
* ```typescript
* const operation = throttle({ concurrency: 10, rate: 5 }, async () => {
* ...
* });
* ```
*
* Throttle supports functions with arguments automatically infers the correct
* type for the returned function:
*
* ```typescript
* // `operation` inferred to have type (str: string) => Promise<string>
* const operation = throttle({ concurrency: 10, rate: 5 }, async (str: string) => {
* return string;
* });
* ```
*
* In addition to limiting concurrency and invocation rate, `throttle` also
* supports retrying failed invocations, memoizing calls, and on-disk caching.
* See {@link Limits} for details.
*
* @param limits - see {@link Limits}.
* @param fn - The function to throttle. It can take any arguments, but must
* return a Promise (which includes `async` functions).
* @returns Returns a throttled function with the same signature as the argument
* `fn`.
* @public
*/
export function throttle<A extends any[], R>(
limits: Limits,
fn: (...args: A) => Promise<R>
): (...args: A) => Promise<R> {
const { concurrency, retry, rate, burst, memoize, cache, cancel } = limits;
const funnel = new Funnel<R>(concurrency, retry);
const cancellationQueue = [() => funnel.clear()];
let conditionedFunc: (...args: A) => Promise<R>;
if (rate) {
const rateLimiter = new RateLimiter<R>(rate, burst);
cancellationQueue.push(() => rateLimiter.clear());
conditionedFunc = (...args: A) =>
funnel.push(() => rateLimiter.push(() => fn(...args)));
} else {
conditionedFunc = (...args: A) => funnel.push(() => fn(...args));
}
if (cache) {
conditionedFunc = cacheFn(cache, conditionedFunc);
}
if (memoize) {
const mcache = new Map<string, Promise<R>>();
cancellationQueue.push(() => mcache.clear());
conditionedFunc = memoizeFn(conditionedFunc, mcache);
}
cancel?.then(() => cancellationQueue.forEach(cleanupFn => cleanupFn()));
return conditionedFunc;
}
function iteratorResult<T>(value: T | Promise<T>) {
return Promise.resolve(value).then(v => ({ done: false, value: v } as const));
}
const done = Promise.resolve({ done: true, value: undefined } as const);
export class AsyncQueue<T> {
protected deferred: Array<Deferred<T>> = [];
protected enqueued: Promise<T>[] = [];
enqueue(value: T | Promise<T>) {
if (this.deferred.length > 0) {
const d = this.deferred.shift();
d!.resolve(value);
} else {
this.enqueued.push(Promise.resolve(value));
}
}
next(): Promise<T> {
if (this.enqueued.length > 0) {
return this.enqueued.shift()!;
}
const d = new Deferred<T>();
this.deferred.push(d);
return d.promise;
}
clear() {
this.deferred = [];
this.enqueued = [];
}
}
export class AsyncIterableQueue<T> extends AsyncQueue<IteratorResult<T>> {
push(value: T | Promise<T>) {
super.enqueue(iteratorResult(value));
}
done() {
super.enqueue(done);
}
[Symbol.asyncIterator]() {
return this;
}
}
export class AsyncOrderedQueue<T> {
protected queue = new AsyncQueue<T>();
protected arrived: Map<number, Promise<T>> = new Map();
protected current = 0;
push(value: T | Promise<T>, sequence: number) {
this.enqueue(Promise.resolve(value), sequence);
}
pushImmediate(value: T | Promise<T>) {
this.queue.enqueue(value);
}
enqueue(value: Promise<T>, sequence: number) {
if (sequence < this.current) {
return;
}
if (!this.arrived.has(sequence)) {
this.arrived.set(sequence, value);
}
while (this.arrived.has(this.current)) {
this.queue.enqueue(this.arrived.get(this.current)!);
this.arrived.delete(this.current);
this.current++;
}
}
next(): Promise<T> {
return this.queue.next();
}
clear() {
this.arrived.clear();
this.queue.clear();
this.current = 0;
}
}