-
-
Notifications
You must be signed in to change notification settings - Fork 97
/
processGlobalQueue.ts
181 lines (158 loc) · 6.56 KB
/
processGlobalQueue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import { RestManager } from "../bot.ts";
import { HTTPResponseCodes } from "../types/shared.ts";
export async function processGlobalQueue(rest: RestManager) {
// IF QUEUE IS EMPTY EXIT
if (!rest.globalQueue.length) return;
// IF QUEUE IS ALREADY RUNNING EXIT
if (rest.globalQueueProcessing) return;
// SET AS TRUE SO OTHER QUEUES DON'T START
rest.globalQueueProcessing = true;
while (rest.globalQueue.length) {
// IF THE BOT IS GLOBALLY RATE LIMITED TRY AGAIN
if (rest.globallyRateLimited) {
setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] Globally rate limited, running setTimeout.`);
rest.processGlobalQueue(rest);
}, 1000);
// BREAK WHILE LOOP
break;
}
if (rest.invalidRequests === rest.maxInvalidRequests - rest.invalidRequestsSafetyAmount) {
setTimeout(() => {
const time = rest.invalidRequestsInterval - (Date.now() - rest.invalidRequestFrozenAt);
rest.debug(
`[REST - processGlobalQueue] Freeze global queue because of invalid requests. Time Remaining: ${
time / 1000
} seconds.`,
);
rest.processGlobalQueue(rest);
}, 1000);
// BREAK WHILE LOOP
break;
}
const request = rest.globalQueue.shift();
// REMOVES ANY POTENTIAL INVALID CONFLICTS
if (!request) continue;
// CHECK RATE LIMITS FOR 429 REPEATS
// IF THIS URL IS STILL RATE LIMITED, TRY AGAIN
const urlResetIn = rest.checkRateLimits(rest, request.basicURL);
// IF A BUCKET EXISTS, CHECK THE BUCKET'S RATE LIMITS
const bucketResetIn = request.payload.bucketId ? rest.checkRateLimits(rest, request.payload.bucketId) : false;
if (urlResetIn || bucketResetIn) {
// ONLY ADD TIMEOUT IF ANOTHER QUEUE IS NOT PENDING
setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] rate limited, running setTimeout.`);
// THIS REST IS RATE LIMITED, SO PUSH BACK TO START
rest.globalQueue.unshift(request);
// START QUEUE IF NOT STARTED
rest.processGlobalQueue(rest);
}, urlResetIn || (bucketResetIn as number));
continue;
}
try {
// CUSTOM HANDLER FOR USER TO LOG OR WHATEVER WHENEVER A FETCH IS MADE
rest.debug(`[REST - fetching] URL: ${request.urlToUse} | ${JSON.stringify(request.payload)}`);
const response = await fetch(request.urlToUse, rest.createRequestBody(rest, request));
rest.debug(`[REST - fetched] URL: ${request.urlToUse} | ${JSON.stringify(request.payload)}`);
const bucketIdFromHeaders = rest.processRequestHeaders(rest, request.basicURL, response.headers);
// SET THE BUCKET Id IF IT WAS PRESENT
if (bucketIdFromHeaders) {
request.payload.bucketId = bucketIdFromHeaders;
}
if (response.status < 200 || response.status >= 400) {
rest.debug(
`[REST - httpError] Payload: ${JSON.stringify(request.payload)} | Response: ${JSON.stringify(response)}`,
);
let error = "REQUEST_UNKNOWN_ERROR";
switch (response.status) {
case HTTPResponseCodes.BadRequest:
error = "The request was improperly formatted, or the server couldn't understand it.";
break;
case HTTPResponseCodes.Unauthorized:
error = "The Authorization header was missing or invalid.";
break;
case HTTPResponseCodes.Forbidden:
error = "The Authorization token you passed did not have permission to the resource.";
break;
case HTTPResponseCodes.NotFound:
error = "The resource at the location specified doesn't exist.";
break;
case HTTPResponseCodes.MethodNotAllowed:
error = "The HTTP method used is not valid for the location specified.";
break;
case HTTPResponseCodes.GatewayUnavailable:
error = "There was not a gateway available to process your request. Wait a bit and retry.";
break;
}
if (
rest.invalidRequestErrorStatuses.includes(response.status) &&
!(response.status === 429 && response.headers.get("X-RateLimit-Scope"))
) {
// INCREMENT CURRENT INVALID REQUESTS
++rest.invalidRequests;
if (!rest.invalidRequestsTimeoutId) {
rest.invalidRequestsTimeoutId = setTimeout(() => {
rest.debug(`[REST - processGlobalQueue] Resetting invalid requests counter in setTimeout.`);
rest.invalidRequests = 0;
rest.invalidRequestsTimeoutId = 0;
}, rest.invalidRequestsInterval);
}
}
// If NOT rate limited remove from queue
if (response.status !== 429) {
let json = undefined;
if (response.type) {
json = JSON.stringify(await response.json());
}
request.request.reject({
ok: false,
status: response.status,
error,
body: json,
});
} else {
if (request.payload.retryCount++ >= rest.maxRetryCount) {
rest.debug(`[REST - RetriesMaxed] ${JSON.stringify(request.payload)}`);
// REMOVE ITEM FROM QUEUE TO PREVENT RETRY
request.request.reject({
ok: false,
status: response.status,
error: "The request was rate limited and it maxed out the retries limit.",
});
continue;
}
// WAS RATE LIMITED. PUSH TO END OF GLOBAL QUEUE, SO WE DON'T BLOCK OTHER REQUESTS.
rest.globalQueue.push(request);
}
continue;
}
// SOMETIMES DISCORD RETURNS AN EMPTY 204 RESPONSE THAT CAN'T BE MADE TO JSON
if (response.status === 204) {
rest.debug(`[REST - FetchSuccess] URL: ${request.urlToUse} | ${JSON.stringify(request.payload)}`);
request.request.respond({
ok: true,
status: 204,
});
} else {
// CONVERT THE RESPONSE TO JSON
const json = JSON.stringify(await response.json());
rest.debug(`[REST - fetchSuccess] ${JSON.stringify(request.payload)}`);
request.request.respond({
ok: true,
status: 200,
body: json,
});
}
} catch (error) {
// SOMETHING WENT WRONG, LOG AND RESPOND WITH ERROR
rest.debug(`[REST - fetchFailed] Payload: ${JSON.stringify(request.payload)} | Error: ${error}`);
request.request.reject({
ok: false,
status: 599,
error: "Internal Proxy Error",
});
}
}
// ALLOW OTHER QUEUES TO START WHEN NEW REQUEST IS MADE
rest.globalQueueProcessing = false;
}