-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
executable file
·242 lines (216 loc) · 7.08 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/* eslint-disable no-promise-executor-return */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-plusplus */
const Joi = require('joi');
const StoppedError = require('./StoppedError');
/**
* @description Schema for TaskQueue's constructor options
*/
const optionsSchema = Joi.object({
name: Joi.string().description('The name of the queue. It is logged.'),
size: Joi.number()
.integer()
.min(1)
.description(`push() resolves when the task queue's length is less than this value`),
workers: Joi.number()
.integer()
.min(1)
.description('The maximum number of tasks that can execute simultaneously. Defaults to size.'),
logger: Joi.object(),
});
/**
* All log entries use this tag
* @ignore
* @private
*/
const logTag = 'taskQueue';
/**
* A queue that enforces a maximum number of simultaneously executing tasks
*/
class TaskQueue {
/**
* Properties:
* {boolean} stopped
* {object} logger
* {number} taskCount The number of currently executing tasks
* {function[]} doneListeners
* {function[]} waitListeners
*/
/**
* Constructor. There is no need to call start() after creating a new object.
* @param {object} options
* @param {number} options.size The maximum number of functions that can execute at a time
*/
constructor(options) {
const validation = optionsSchema.validate(options);
if (validation.error) throw new Error(validation.error.message);
Object.assign(this, validation.value);
// eslint-disable-next-line no-multi-assign
if (!this.workers && !this.size) this.workers = this.size = 1;
else if (!this.workers) this.workers = this.size;
else if (!this.size) this.size = this.workers;
if (this.logger && !this.logger.isLevelEnabled(logTag)) delete this.logger;
this.taskCount = 0;
this.doneListeners = [];
this.waitListeners = [];
}
/**
* Called when a task finishes
* @private
* @ignore
*/
taskFinished() {
const newTasks = this.taskCount - 1;
if (this.logger) {
this.logger.log(logTag, {
message: `Task finished for '${this.name}'. Tasks: ${newTasks}`,
name: this.name,
taskCount: newTasks,
});
}
if (newTasks < 0) {
const message = `Task counter is negative for '${this.name}'`;
if (this.logger) {
this.logger.log(['error', logTag], {
message,
name: this.name,
});
}
} else {
this.taskCount = newTasks;
}
const { length: doneListenerCount } = this.doneListeners;
// Calling resolve() on a doneListener doesn't cause a new task to be started
if (doneListenerCount) {
this.doneListeners.shift()();
} else if (!newTasks) {
const { waitListeners } = this;
// =========================
// Release callers to wait()
if (waitListeners.length) {
this.waitListeners = [];
waitListeners.forEach((resolve) => resolve());
}
}
}
/**
* Same as push() but without the check for this.stopping
* @param {function} task
* @return {Promise}
* @private
* @ignore
*/
async pushInternal(task) {
// Wait for an available slot in the queue
// eslint-disable-next-line no-await-in-loop
for (;;) {
if (this.taskCount < this.workers) break;
if (this.full) {
// ============
// Size reached
await new Promise((resolve) => this.doneListeners.push(resolve));
} else {
// ===============
// Workers reached
const promise = new Promise((resolve, reject) =>
this.doneListeners.push(() =>
this.pushInternal(task).then((ret) => {
// ret.promise is from the task. Forward to the Promise returned by this method.
ret.promise.then(resolve, reject);
})
)
);
return { promise };
}
}
let fret;
let err;
try {
fret = task(); // this could throw an exception if it's not explicitly marked async, so increment
} catch (error) {
err = error;
}
// Increment taskCount here, not earlier
const taskCount = ++this.taskCount;
if (this.logger) {
this.logger.log(logTag, {
message: `Task started for '${this.name}'. Tasks: ${taskCount}`,
name: this.name,
taskCount,
});
}
let promise;
if (fret && Object.prototype.toString.call(fret) === '[object Promise]') {
promise = new Promise((resolve, reject) => {
fret.then(
(value) => {
this.taskFinished();
resolve(value);
},
(error) => {
this.taskFinished();
reject(error);
}
);
});
} else {
promise = err ? Promise.reject(err) : Promise.resolve(fret);
this.taskFinished();
}
// If bare 'promise' was returned, the caller would wait for fret to resolve. Instead, callers should only wait for
// for task to be called. Therefore, return an object that contains a 'promise' key. Awaiting on this method
// therefore waits for an empty slot in the queue and returns a Promise that immediately resolves to an object.
return { promise };
}
/**
* Starts a task. If the queue's maximum size has been reached, this method waits for a task to finish
* before invoking task().
* @param {function} task A function to call. It can return a Promise, throw an exception, or return a value.
* @throws {StoppingError} If stop() has been called
* @return {Promise} Does not reject. Resolves to an object with the property 'promise'
* containing either the Promise returned by task or a new Promise that resolves to the value returned by task or
* rejects using the exception thrown by it. Therefore, it is not only possible to wait for the task to start, it is
* also possible to wait for it to finish.
*
* For example:
* // Wait for an open slot in the queue
* const ret = await queue.push(()=>new Promise(resolve=>setTimeout(()=>resolve('Hello'), 5000)));
* // Wait for 5 seconds and output Hello
* console.log(await ret.promise);
*/
push(task) {
if (this.stopped) throw new StoppedError('Stopped');
return this.pushInternal(task);
}
/**
* Is the queue full?
* @return {boolean} true if the maximum number of tasks are queued
*/
get full() {
return this.taskCount + this.doneListeners.length >= this.size;
}
/**
* Waits for running tasks to complete. Callers are not prevented callers from calling push(); thus
* there is no guarantee that when the returned Promise resolves, the queue will have an available slot.
* @return {Promise}
*/
wait() {
// eslint-disable-next-line no-await-in-loop
if (this.taskCount) return new Promise((resolve) => this.waitListeners.push(resolve));
return undefined;
}
/**
* Waits for running tasks to complete. Prevents additional calls to push().
*/
stop() {
this.stopped = true;
return this.wait();
}
/**
* Undo method for stop(). There is no need to invoke start() after creating a new object.
*/
start() {
this.stopped = false;
}
}
module.exports = TaskQueue;