Skip to content

Commit 94eac56

Browse files
timursevimlitshemsedinov
authored andcommitted
Add promise queue
1 parent c99aede commit 94eac56

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed

JavaScript/8-promise.js

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
'use strict';
2+
3+
class Queue {
4+
constructor(concurrency) {
5+
this.concurrency = concurrency;
6+
this.count = 0;
7+
this.waiting = [];
8+
this.promises = [];
9+
this.onProcess = null;
10+
this.onDone = null;
11+
this.onSuccess = null;
12+
this.onFailure = null;
13+
this.onDrain = null;
14+
}
15+
16+
static channels(concurrency) {
17+
return new Queue(concurrency);
18+
}
19+
20+
add(task) {
21+
this.waiting.push(task);
22+
const hasChannel = this.count < this.concurrency;
23+
if (hasChannel) this.next();
24+
}
25+
26+
next() {
27+
const emptyChannels = this.concurrency - this.count;
28+
let launchCount = Math.min(emptyChannels, this.waiting.length);
29+
while (launchCount-- > 0) {
30+
this.count++;
31+
const task = this.waiting.shift();
32+
this.onProcess(task)
33+
.then(
34+
(res) => {
35+
if (this.onSuccess) this.onSuccess(res);
36+
if (this.onDone) this.onDone(null, res);
37+
},
38+
(err) => {
39+
if (this.onFailure) this.onFailure(err);
40+
if (this.onDone) this.onDone(err);
41+
}
42+
)
43+
.finally(() => {
44+
this.count--;
45+
if (this.count === 0 && this.waiting.length === 0) {
46+
if (this.onDrain) this.onDrain();
47+
}
48+
this.next();
49+
});
50+
}
51+
}
52+
53+
process(listener) {
54+
this.onProcess = listener;
55+
return this;
56+
}
57+
58+
done(listener) {
59+
this.onDone = listener;
60+
return this;
61+
}
62+
63+
success(listener) {
64+
this.onSuccess = listener;
65+
return this;
66+
}
67+
68+
failure(listener) {
69+
this.onFailure = listener;
70+
return this;
71+
}
72+
73+
drain(listener) {
74+
this.onDrain = listener;
75+
return this;
76+
}
77+
}
78+
79+
// Usage
80+
81+
const job = ({ name, interval }) =>
82+
new Promise((resolve, reject) => {
83+
if (interval === 1200) {
84+
setTimeout(reject, interval, new Error('Big error!'));
85+
} else {
86+
setTimeout(resolve, interval, name);
87+
}
88+
});
89+
90+
const queue = Queue.channels(3)
91+
.process(job)
92+
.done((err, res) => {
93+
const { count } = queue;
94+
const waiting = queue.waiting.length;
95+
console.log(
96+
`Done | res: ${res}, err: ${err}, count:${count}, waiting: ${waiting}`
97+
);
98+
})
99+
// .success((res) => void console.log(`Success: ${res}`))
100+
// .failure((err) => void console.log(`Failure: ${err}`))
101+
.drain(() => void console.log('Queue drain'));
102+
103+
for (let i = 0; i < 20; i++) {
104+
if (i < 10) queue.add({ name: `Task${i}`, interval: 1000 });
105+
else queue.add({ name: `Task${i}`, interval: i * 100 });
106+
}

0 commit comments

Comments
 (0)