Skip to content

Commit dddde56

Browse files
committed
Implement pause/resume
1 parent 437d8f0 commit dddde56

File tree

1 file changed

+157
-0
lines changed

1 file changed

+157
-0
lines changed

JavaScript/3-pause.js

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
'use strict';
2+
3+
class Queue {
4+
constructor(concurrency) {
5+
this.paused = false;
6+
this.concurrency = concurrency;
7+
this.count = 0;
8+
this.waiting = [];
9+
this.onProcess = null;
10+
this.onDone = null;
11+
this.onSuccess = null;
12+
this.onFailure = null;
13+
this.onDrain = null;
14+
this.waitTimeout = Infinity;
15+
this.processTimeout = Infinity;
16+
}
17+
static channels(concurrency) {
18+
return new Queue(concurrency);
19+
}
20+
wait(msec) {
21+
this.waitTimeout = msec;
22+
return this;
23+
}
24+
timeout(msec) {
25+
this.processTimeout = msec;
26+
return this;
27+
}
28+
add(task) {
29+
if (!this.paused) {
30+
const hasChannel = this.count < this.concurrency;
31+
if (hasChannel) {
32+
this.next(task);
33+
return;
34+
}
35+
}
36+
this.waiting.push({ task, start: Date.now() });
37+
}
38+
next(task) {
39+
this.count++;
40+
let timer = null;
41+
let finished = false;
42+
const { processTimeout, onProcess } = this;
43+
const finish = (err, res) => {
44+
if (finished) return;
45+
finished = true;
46+
if (timer) clearTimeout(timer);
47+
this.count--;
48+
this.finish(err, res);
49+
if (!this.paused && this.waiting.length > 0) this.takeNext();
50+
};
51+
if (processTimeout !== Infinity) {
52+
const err = new Error('Process timed out');
53+
timer = setTimeout(finish, processTimeout, err, task);
54+
}
55+
onProcess(task, finish);
56+
}
57+
takeNext() {
58+
const { waiting, waitTimeout } = this;
59+
const { task, start } = waiting.shift();
60+
if (waitTimeout !== Infinity) {
61+
const delay = Date.now() - start;
62+
if (delay > waitTimeout) {
63+
const err = new Error('Waiting timed out');
64+
this.finish(err, task);
65+
if (waiting.length > 0) {
66+
setTimeout(() => {
67+
if (!this.paused) this.takeNext();
68+
}, 0);
69+
}
70+
return;
71+
}
72+
}
73+
const hasChannel = this.count < this.concurrency;
74+
if (hasChannel) this.next(task);
75+
return;
76+
}
77+
finish(err, res) {
78+
const { onFailure, onSuccess, onDone, onDrain } = this;
79+
if (err) {
80+
if (onFailure) onFailure(err, res);
81+
} else if (onSuccess) {
82+
onSuccess(res);
83+
}
84+
if (onDone) onDone(err, res);
85+
if (this.count === 0 && onDrain) onDrain();
86+
}
87+
process(listener) {
88+
this.onProcess = listener;
89+
return this;
90+
}
91+
done(listener) {
92+
this.onDone = listener;
93+
return this;
94+
}
95+
success(listener) {
96+
this.onSuccess = listener;
97+
return this;
98+
}
99+
failure(listener) {
100+
this.onFailure = listener;
101+
return this;
102+
}
103+
drain(listener) {
104+
this.onDrain = listener;
105+
return this;
106+
}
107+
pause() {
108+
this.paused = true;
109+
return this;
110+
}
111+
resume() {
112+
this.paused = false;
113+
if (this.waiting.length > 0) {
114+
const hasChannel = this.count < this.concurrency;
115+
if (hasChannel) this.takeNext();
116+
}
117+
return this;
118+
}
119+
}
120+
121+
// Usage
122+
123+
const job = (task, next) => {
124+
console.log(`Process: ${task.name}`);
125+
setTimeout(next, task.interval, null, task).unref();
126+
};
127+
128+
const queue = Queue.channels(3)
129+
.wait(4000)
130+
.timeout(5000)
131+
.process(job)
132+
.done((err, task) => console.log(`Done: ${task.name}`))
133+
.success(task => console.log(`Success: ${task.name}`))
134+
.failure((err, task) => console.log(`Failure: ${err} ${task.name}`))
135+
.drain(() => console.log('Queue drain'))
136+
.pause();
137+
138+
for (let i = 0; i < 10; i++) {
139+
queue.add({ name: `Task${i}`, interval: i * 1000 });
140+
}
141+
142+
console.log('Start paused');
143+
144+
setTimeout(() => {
145+
console.log('Resume');
146+
queue.resume();
147+
}, 3000);
148+
149+
setTimeout(() => {
150+
console.log('Pause');
151+
queue.pause();
152+
}, 4000);
153+
154+
setTimeout(() => {
155+
console.log('Resume');
156+
queue.resume();
157+
}, 5000);

0 commit comments

Comments
 (0)