-
Notifications
You must be signed in to change notification settings - Fork 899
/
executor.ts
88 lines (78 loc) · 2.27 KB
/
executor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import { Queue } from "../../../throttler/queue";
import { ThrottlerOptions } from "../../../throttler/throttler";
/**
* An Executor runs lambdas (which may be async).
*/
export interface Executor {
run<T>(func: () => Promise<T>, opts?: RunOptions): Promise<T>;
}
export interface RunOptions {
retryCodes?: number[];
}
interface Operation {
func: () => any;
retryCodes: number[];
result?: any;
error?: any;
}
export const DEFAULT_RETRY_CODES = [429, 409, 503];
function parseErrorCode(err: any): number {
return (
err.status ||
err.code ||
err.context?.response?.statusCode ||
err.original?.code ||
err.original?.context?.response?.statusCode
);
}
async function handler(op: Operation): Promise<void> {
try {
op.result = await op.func();
} catch (err: any) {
// Throw retry functions back to the queue where they will be retried
// with backoffs. To do this we cast a wide net for possible error codes.
// These can be either TOO MANY REQUESTS (429) errors or CONFLICT (409)
// errors. This can be a raw error with the correct HTTP code, a raw
// error with the HTTP code stashed where GCP puts it, or a FirebaseError
// wrapping either of the previous two cases.
const code = parseErrorCode(err);
if (op.retryCodes.includes(code)) {
throw err;
}
err.code = code;
op.error = err;
}
return;
}
/**
* A QueueExecutor implements the executor interface on top of a throttler queue.
* Any 429 will be retried within the ThrottlerOptions parameters, but all
* other errors are rethrown.
*/
export class QueueExecutor implements Executor {
private readonly queue: Queue<Operation, void>;
constructor(options: Omit<ThrottlerOptions<Operation, void>, "handler">) {
this.queue = new Queue({ ...options, handler });
}
async run<T>(func: () => Promise<T>, opts?: RunOptions): Promise<T> {
const retryCodes = opts?.retryCodes || DEFAULT_RETRY_CODES;
const op: Operation = {
func,
retryCodes,
};
await this.queue.run(op);
if (op.error) {
throw op.error;
}
return op.result as T;
}
}
/**
* Inline executors run their functions right away.
* Useful for testing.
*/
export class InlineExecutor {
run<T>(func: () => Promise<T>): Promise<T> {
return func();
}
}