This repository has been archived by the owner on Jan 8, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
SequentialHandler.ts
219 lines (198 loc) · 7.85 KB
/
SequentialHandler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import { sleep } from '@discordjs/core';
import { AsyncQueue } from '@sapphire/async-queue';
import AbortController from 'abort-controller';
import fetch, { RequestInit, Response } from 'node-fetch';
import { DiscordAPIError, DiscordErrorData } from '../errors/DiscordAPIError';
import { HTTPError } from '../errors/HTTPError';
import type { RequestManager, RouteData } from '../RequestManager';
import { RESTEvents } from '../utils/constants';
import { parseResponse } from '../utils/utils';
/**
* The structure used to handle requests for a given bucket
*/
export class SequentialHandler {
/**
* The unique ID of the handler
*/
public readonly id: string;
/**
* The time this rate limit bucket will reset
*/
private reset = -1;
/**
* The remaining requests that can be made before we are rate limited
*/
private remaining = 1;
/**
* The total number of requests that can be made before we are rate limited
*/
private limit = Infinity;
/**
* The interface used to sequence async requests sequentially
*/
// eslint-disable-next-line @typescript-eslint/explicit-member-accessibility
#asyncQueue = new AsyncQueue();
/**
* @param manager The request manager
* @param hash The hash that this RequestHandler handles
* @param majorParameter The major parameter for this handler
*/
public constructor(
private readonly manager: RequestManager,
private readonly hash: string,
private readonly majorParameter: string,
) {
this.id = `${hash}:${majorParameter}`;
}
/**
* If the bucket is currently inactive (no pending requests)
*/
public get inactive(): boolean {
return this.#asyncQueue.remaining === 0 && !this.limited;
}
/**
* If the rate limit bucket is currently limited
*/
private get limited(): boolean {
return this.remaining <= 0 && Date.now() < this.reset;
}
/**
* The time until queued requests can continue
*/
private get timeToReset(): number {
return this.reset - Date.now();
}
/**
* Emits a debug message
* @param message The message to debug
*/
private debug(message: string) {
this.manager.emit(RESTEvents.Debug, `[REST ${this.id}] ${message}`);
}
/**
* Queues a request to be sent
* @param routeID The generalized api route with literal ids for major parameters
* @param url The url to do the request on
* @param options All the information needed to make a request
*/
public async queueRequest(routeID: RouteData, url: string, options: RequestInit): Promise<unknown> {
// Wait for any previous requests to be completed before this one is run
await this.#asyncQueue.wait();
try {
// Wait for any global rate limits to pass before continuing to process requests
await this.manager.globalTimeout;
// Check if this request handler is currently rate limited
if (this.limited) {
// Let library users know they have hit a rate limit
this.manager.emit(RESTEvents.RateLimited, {
timeToReset: this.timeToReset,
limit: this.limit,
method: options.method,
hash: this.hash,
route: routeID.bucketRoute,
majorParameter: this.majorParameter,
});
this.debug(`Waiting ${this.timeToReset}ms for rate limit to pass`);
// Wait the remaining time left before the rate limit resets
await sleep(this.timeToReset);
}
// Make the request, and return the results
return await this.runRequest(routeID, url, options);
} finally {
// Allow the next request to fire
this.#asyncQueue.shift();
}
}
/**
* The method that actually makes the request to the api, and updates info about the bucket accordingly
* @param routeID The generalized api route with literal ids for major parameters
* @param url The fully resolved url to make the request to
* @param options The node-fetch options needed to make the request
* @param retries The number of retries this request has already attempted (recursion)
*/
private async runRequest(routeID: RouteData, url: string, options: RequestInit, retries = 0): Promise<unknown> {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), this.manager.options.timeout);
let res: Response;
try {
res = await fetch(url, { ...options, signal: controller.signal });
} catch (error) {
// Retry the specified number of times for possible timed out requests
if (error.name === 'AbortError' && retries !== this.manager.options.retries) {
return this.runRequest(routeID, url, options, ++retries);
}
throw error;
} finally {
clearTimeout(timeout);
}
let retryAfter = 0;
const method = options.method ?? 'get';
const limit = res.headers.get('X-RateLimit-Limit');
const remaining = res.headers.get('X-RateLimit-Remaining');
const reset = res.headers.get('X-RateLimit-Reset-After');
const hash = res.headers.get('X-RateLimit-Bucket');
const retry = res.headers.get('Retry-After');
// Update the total number of requests that can be made before the rate limit resets
this.limit = limit ? Number(limit) : Infinity;
// Update the number of remaining requests that can be made before the rate limit resets
this.remaining = remaining ? Number(remaining) : 1;
// Update the time when this rate limit resets (reset-after is in seconds)
this.reset = reset ? Number(reset) * 1000 + Date.now() + this.manager.options.offset : Date.now();
// Amount of time in milliseconds until we should retry if rate limited (globally or otherwise)
if (retry) retryAfter = Number(retry) * 1000 + this.manager.options.offset;
// Handle buckets via the hash header retroactively
if (hash && hash !== this.hash) {
// Let library users know when rate limit buckets have been updated
this.debug(['Received bucket hash update', ` Old Hash : ${this.hash}`, ` New Hash : ${hash}`].join('\n'));
// This queue will eventually be eliminated via attrition
this.manager.hashes.set(`${method}:${routeID.bucketRoute}`, hash);
}
// Handle global rate limit
if (res.headers.get('X-RateLimit-Global')) {
this.debug(`We are globally rate limited, blocking all requests for ${retryAfter}ms`);
// Set the manager's global timeout as the promise for other requests to "wait"
this.manager.globalTimeout = sleep(retryAfter).then(() => {
// After the timer is up, clear the promise
this.manager.globalTimeout = null;
});
}
if (res.ok) {
return parseResponse(res);
} else if (res.status === 429) {
// A rate limit was hit - this may happen if the route isn't associated with an official bucket hash yet, or when first globally rate limited
this.debug(
[
'Encountered unexpected 429 rate limit',
` Bucket : ${routeID.bucketRoute}`,
` Major parameter: ${routeID.majorParameter}`,
` Hash : ${this.hash}`,
` Retry After : ${retryAfter}ms`,
].join('\n'),
);
// Wait the retryAfter amount of time before retrying the request
await sleep(retryAfter);
// Since this is not a server side issue, the next request should pass, so we don't bump the retries counter
return this.runRequest(routeID, url, options, retries);
} else if (res.status >= 500 && res.status < 600) {
// Retry the specified number of times for possible server side issues
if (retries !== this.manager.options.retries) {
return this.runRequest(routeID, url, options, ++retries);
}
// We are out of retries, throw an error
throw new HTTPError(res.statusText, res.constructor.name, res.status, method, url);
} else {
// Handle possible malformed requests
if (res.status >= 400 && res.status < 500) {
// If we receive this status code, it means the token we had is no longer valid.
if (res.status === 401) {
this.manager.setToken(null!);
}
// The request will not succeed for some reason, parse the error returned from the api
const data = (await parseResponse(res)) as DiscordErrorData;
// throw the API error
throw new DiscordAPIError(data, data.code, res.status, method, url);
}
return null;
}
}
}