Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit 288c3f2

Browse files
committed
Seperate code out and properly deal with rejection
1 parent 810e726 commit 288c3f2

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

index.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Deque = require("double-ended-queue");
2424
interface ItemType<T, RT> {
2525
item: T;
2626
resolver: (result: RT) => void;
27+
reject: (reason?: any) => void;
2728
}
2829

2930
/**
@@ -63,9 +64,9 @@ export default class ConcurrentPriorityWorkerQueue<T, RT> {
6364
this._length.set(priority, 0);
6465
}
6566
this._maxPriority = Math.max(this._maxPriority, priority);
66-
return new Promise((resolve) => {
67+
return new Promise((resolve, reject) => {
6768
// The reason we do this is that we know it exists due to the check above.
68-
this._map.get(priority)!.push({ item, resolver: resolve });
69+
this._map.get(priority)!.push({ item, resolver: resolve, reject });
6970
this._length.set(priority, this._length.get(priority)! + 1);
7071
this._loop();
7172
});
@@ -108,6 +109,19 @@ export default class ConcurrentPriorityWorkerQueue<T, RT> {
108109
return this._running >= this.limit;
109110
}
110111

112+
private async processItem(item: ItemType<T, RT>){
113+
this._running++;
114+
try {
115+
const result = await this.worker(item.item);
116+
item.resolver(result);
117+
} catch (error) {
118+
item.reject(error);
119+
} finally {
120+
this._running--;
121+
this._loop();
122+
}
123+
}
124+
111125
private _loop(): void {
112126
if (this._running < this.limit && !this.isEmpty()) {
113127
const highestPriorityQueue = this._map.get(this._maxPriority)!;
@@ -121,12 +135,7 @@ export default class ConcurrentPriorityWorkerQueue<T, RT> {
121135
this._maxPriority = Math.max(...Array.from(this._length.keys())
122136
.filter((value) => value < this._maxPriority))
123137
}
124-
this._running++;
125-
this.worker(item.item).then((result: RT) => {
126-
item.resolver(result);
127-
this._running--;
128-
this._loop();
129-
});
138+
this.processItem(item);
130139
}
131140
}
132141
}

0 commit comments

Comments
 (0)