Skip to content

Commit 2a68b72

Browse files
committed
Add timeouts
1 parent 8e4772e commit 2a68b72

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed

JavaScript/2-timeouts.js

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
'use strict';
2+
3+
class Queue {
4+
constructor(concurrency) {
5+
this.concurrency = concurrency;
6+
this.count = 0;
7+
this.waiting = [];
8+
this.waitingStart = [];
9+
this.onProcess = null;
10+
this.onDone = null;
11+
this.onSuccess = null;
12+
this.onFailure = null;
13+
this.onDrain = null;
14+
this.waitTimeout = Infinity;
15+
this.processTimeout = Infinity;
16+
}
17+
static channels(concurrency) {
18+
return new Queue(concurrency);
19+
}
20+
wait(msec) {
21+
this.waitTimeout = msec;
22+
return this;
23+
}
24+
timeout(msec) {
25+
this.processTimeout = msec;
26+
return this;
27+
}
28+
add(task) {
29+
const hasChannel = this.count < this.concurrency;
30+
if (hasChannel) {
31+
this.next(task);
32+
return;
33+
}
34+
this.waiting.push(task);
35+
this.waitingStart.push(Date.now());
36+
}
37+
next(task) {
38+
this.count++;
39+
let timer = null;
40+
let finished = false;
41+
const { processTimeout, onProcess } = this;
42+
const finish = (err, res) => {
43+
if (finished) return;
44+
finished = true;
45+
if (timer) clearTimeout(timer);
46+
this.count--;
47+
this.finish(err, res);
48+
if (this.waiting.length > 0) this.takeNext();
49+
};
50+
if (processTimeout !== Infinity) {
51+
const err = new Error('Process timed out');
52+
timer = setTimeout(finish, processTimeout, err, task);
53+
}
54+
onProcess(task, finish);
55+
}
56+
takeNext() {
57+
const { waiting, waitingStart, waitTimeout } = this;
58+
const task = waiting.shift();
59+
const start = waitingStart.shift();
60+
if (waitTimeout !== Infinity) {
61+
const delay = Date.now() - start;
62+
if (delay > waitTimeout) {
63+
const err = new Error('Waiting timed out');
64+
this.finish(err, task);
65+
if (waiting.length > 0) this.takeNext();
66+
return;
67+
}
68+
}
69+
this.next(task);
70+
return;
71+
}
72+
finish(err, res) {
73+
const { onFailure, onSuccess, onDone, onDrain } = this;
74+
if (err) {
75+
if (onFailure) onFailure(err, res);
76+
} else if (onSuccess) {
77+
onSuccess(res);
78+
}
79+
if (onDone) onDone(err, res);
80+
if (this.count === 0 && onDrain) onDrain();
81+
}
82+
process(listener) {
83+
this.onProcess = listener;
84+
return this;
85+
}
86+
done(listener) {
87+
this.onDone = listener;
88+
return this;
89+
}
90+
success(listener) {
91+
this.onSuccess = listener;
92+
return this;
93+
}
94+
failure(listener) {
95+
this.onFailure = listener;
96+
return this;
97+
}
98+
drain(listener) {
99+
this.onDrain = listener;
100+
return this;
101+
}
102+
}
103+
104+
// Usage
105+
106+
const job = (task, next) => {
107+
console.log(`Process: ${task.name}`);
108+
setTimeout(next, task.interval, null, task).unref();
109+
};
110+
111+
const queue = Queue.channels(3)
112+
.wait(4000)
113+
.timeout(5000)
114+
.process(job)
115+
.done((err, task) => console.log(`Done: ${task.name}`))
116+
.success(task => console.log(`Success: ${task.name}`))
117+
.failure((err, task) => console.log(`Failure: ${err} ${task.name}`))
118+
.drain(() => console.log('Queue drain'));
119+
120+
for (let i = 0; i < 10; i++) {
121+
queue.add({ name: `Task${i}`, interval: i * 1000 });
122+
}

0 commit comments

Comments
 (0)