/
queue.ts
318 lines (297 loc) · 12.1 KB
/
queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Maximal valid value that can be passed to `setTimeout`
const MAX_TIMEOUT_VALUE = ~(1 << 31); // equals (2 ** 31 - 1)
/**
* A drift is an element in a doubly linked list, and it stores a task. A task
* is represented as a Promise. Drifts remove themselves from the queue (they
* _decay_) after their task completes or exceeds the timeout. The timeout is
* the maximum time that drifts are allowed to be contained in the queue.
*
* Drifts are appended to the tail of the queue and _drift_ towards the head as
* older elements are removed, hence the name.
*
* The task of every drift is created by a worker function that operates based
* on a source element. A drift keeps a reference to that source element. In
* case of a timeout, the timeout handlers will be supplied with the source
* element because it is interesting to know for which source elements the
* worker function produced a promise that timed out.
*
* A drift also stores the date at which it was added to the queue.
*
* In the context of `grammy`, each middleware invocation corresponds to a task
* in a drift. Drifts are used to manage concurrent middleware execution.
*/
interface Drift<Y> {
/** Previous drift in the queue. `null` iff this drift is the head element. */
prev: Drift<Y> | null;
/** Next drift in the queue. `null` iff this drift is the tail element. */
next: Drift<Y> | null;
/**
* Task of the drift. Contains logic that removes this drift from the queue as
* soon as the task completes by itself (either resolves or rejects).
*/
task: Promise<void>;
/**
* Timestamp (milliseconds since The Epoch) when this drift was added. This
* may be inspected when starting a new timer that might purge this drift upon
* timeout.
*
* The timestamp will be set to `-1` when the drift is removed from the queue,
* in other words, checking `date > 0` serves as a containment test.
*/
date: number;
/** Reference to the source element that was used to start the task */
elem: Y;
}
/**
* A _decaying deque_ is a special kind of doubly linked list that serves as a
* queue for a special kind of nodes, called _drifts_.
*
* A decaying deque has a worker function that spawns a task for each element
* that is added to the queue. This task then gets wrapped into a drift. The
* drifts are then the actual elements (aka. links) in the queue.
*
* In addition, the decaying deque runs a timer that purges old elements from
* the queue. This period of time is determined by the `taskTimeout`.
*
* When a task completes or exceeds its timeout, the corresponding drift is
* removed from the queue. As a result, only drifts with pending tasks are
* contained in the queue at all times.
*
* When a tasks completes with failure (`reject`s or exceeds the timeout), the
* respective handler (`catchError` or `catchTimeout`) is called.
*
* The decaying deque has its name from the observation that new elements are
* appended to the tail, and the old elements are removed at arbitrary positions
* in the queue whenever a task completes, hence, the queue seems to _decay_.
*/
export class DecayingDeque<Y, R = unknown> {
/**
* Number of drifts in the queue. Equivalent to the number of currently
* pending tasks.
*/
private len = 0;
/** Head element (oldest), `null` iff the queue is empty */
private head: Drift<Y> | null = null;
/** Tail element (newest), `null` iff the queue is empty */
private tail: Drift<Y> | null = null;
/**
* Number of currently pending tasks that we strive for (`add` calls will
* resolve only after the number of pending tasks falls below this value.
*
* In the context of `grammy`, it is possible to `await` calls to `add` to
* determine when to fetch more updates.
*/
public readonly concurrency: number;
/**
* Timer that waits for the head element to time out, will be rescheduled
* whenever the head element changes. It is `undefined` iff the queue is
* empty.
*/
private timer: ReturnType<typeof setTimeout> | undefined;
/**
* List of subscribers that wait for the queue to have capacity again. All
* functions in this array will be called as soon as new capacity is
* available, i.e. the number of pending tasks falls below `concurrency`.
*/
private subscribers: Array<(capacity: number) => void> = [];
private emptySubscribers: Array<() => void> = [];
/**
* Creates a new decaying queue with the given parameters.
*
* @param taskTimeout Max period of time for a task
* @param worker Task generator
* @param concurrency `add` will return only after the number of pending tasks fell below `concurrency`. `false` means `1`, `true` means `Infinity`, numbers below `1` mean `1`
* @param catchError Error handler, receives the error and the source element
* @param catchTimeout Timeout handler, receives the source element and the promise of the task
*/
constructor(
private readonly taskTimeout: number,
private readonly worker: (t: Y) => Promise<void>,
concurrency: boolean | number,
private readonly catchError: (err: R, elem: Y) => void | Promise<void>,
private readonly catchTimeout: (t: Y, task: Promise<void>) => void,
) {
if (concurrency === false) this.concurrency = 1;
else if (concurrency === true) this.concurrency = Infinity;
else this.concurrency = concurrency < 1 ? 1 : concurrency;
}
/**
* Adds the provided elements to the queue and starts tasks for all of them
* immediately. Returns a `Promise` that resolves with `concurrency - length`
* once this value becomes positive.
* @param elems Elements to be added
* @returns `this.capacity()`
*/
add(elems: Y[]): Promise<number> {
const len = elems.length;
this.len += len;
if (len > 0) {
let i = 0;
const now = Date.now();
// emptyness check
if (this.head === null) {
this.head = this.tail = this.toDrift(elems[i++]!, now);
// start timer because head element changed
this.startTimer();
}
let prev = this.tail!;
while (i < len) {
// create drift from source element
const node = this.toDrift(elems[i++]!, now);
// link it to previous element (append operation)
prev.next = node;
node.prev = prev;
prev = node;
}
this.tail = prev;
}
return this.capacity();
}
empty(): Promise<void> {
return new Promise((resolve) => {
if (this.len === 0) resolve();
else this.emptySubscribers.push(resolve);
});
}
/**
* Returns a `Promise` that resolves with `concurrency - length` once this
* value becomes positive. Use `await queue.capacity()` to wait until the
* queue has free space again.
*
* @returns `concurrency - length` once positive
*/
capacity(): Promise<number> {
return new Promise((resolve) => {
const capacity = this.concurrency - this.len;
if (capacity > 0) resolve(capacity);
else this.subscribers.push(resolve);
});
}
/**
* Called when a node completed its lifecycle and should be removed from the
* queue. Effectively wraps the `remove` call and takes care of the timer.
*
* @param node Drift to decay
*/
private decay(node: Drift<Y>): void {
// We only need to restart the timer if we decay the head element of the
// queue, however, if the next element has the same date as `node`, we can
// skip this step, too.
if (this.head === node && node.date !== node.next?.date) {
// Clear previous timeout
if (this.timer !== undefined) clearTimeout(this.timer);
// Emptyness check (do not start if queue is now empty)
if (node.next === null) this.timer = undefined;
// Reschedule timer for the next node's timeout
else {
this.startTimer(node.next.date + this.taskTimeout - Date.now());
}
}
this.remove(node);
}
/**
* Removes an element from the queue. Calls subscribers if there is capacity
* after performing this operation.
*
* @param node Drift to remove
*/
private remove(node: Drift<Y>): void {
// Connecting the links of `prev` and `next` removes `node`
if (this.head === node) this.head = node.next;
else node.prev!.next = node.next;
if (this.tail === node) this.tail = node.prev;
else node.next!.prev = node.prev;
// Mark this drift as no longer contained
node.date = -1;
// Notify subscribers if there is capacity by now
const capacity = this.concurrency - --this.len;
if (capacity > 0) {
this.subscribers.forEach((resolve) => resolve(capacity));
this.subscribers = [];
}
// Notify subscribers if the queue is empty now
if (this.len === 0) {
this.emptySubscribers.forEach((resolve) => resolve());
this.emptySubscribers = [];
}
}
/**
* Takes a source element and starts the task for it by calling the worker
* function. Then wraps this task into a drift. Also makes sure that the drift
* removes itself from the queue once it completes, and that the error handler
* is invoked if it fails (rejects).
*
* @param elem Source element
* @param date Date when this drift is created
* @returns The created drift
*/
private toDrift(elem: Y, date: number): Drift<Y> {
const node: Drift<Y> = {
prev: null,
task: this.worker(elem)
.catch(async (err) => {
// Rethrow iff the drift is no longer contained (timed out)
if (node.date > 0) await this.catchError(err, elem);
else throw err;
})
.finally(() => {
// Decay the node once the task completes (unless the drift was
// removed due to a timeout before)
if (node.date > 0) this.decay(node);
}),
next: null,
date,
elem,
};
return node;
}
/**
* Starts a timer that fires off a timeout after the given period of time.
*
* @param ms Number of milliseconds to wait before the timeout kicks in
*/
private startTimer(ms = this.taskTimeout): void {
this.timer = ms > MAX_TIMEOUT_VALUE
? undefined
: setTimeout(() => this.timeout(), ms);
}
/**
* Performs a timeout event. This removes the head element as well as all
* subsequent drifts with the same date (added in the same millisecond).
*
* The timeout handler is called in sequence for every removed drift.
*/
private timeout(): void {
// Rare cases of the event ordering might fire a timeout even though the
// head element has just decayed.
if (this.head === null) return;
while (this.head.date === this.head.next?.date) {
this.catchTimeout(this.head.elem, this.head.task);
// No need to restart timer here, we'll modify head again anyway
this.remove(this.head);
}
this.catchTimeout(this.head.elem, this.head.task);
this.decay(this.head);
}
/**
* Number of pending tasks in the queue. Equivalent to
* `this.pendingTasks().length` (but much more efficient).
*/
get length() {
return this.len;
}
/**
* Creates a snapshot of the queue by computing a list of those elements that
* are currently being processed.
*/
pendingTasks(): Y[] {
const len = this.len;
const snapshot: Y[] = Array(len);
let drift = this.head!;
for (let i = 0; i < len; i++) {
snapshot[i] = drift.elem;
drift = drift.next!;
}
return snapshot;
}
}