Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit 6bf916a

Browse files
committed
Make concurrent priority queue
1 parent 0c555fd commit 6bf916a

File tree

3 files changed

+156
-0
lines changed

3 files changed

+156
-0
lines changed

index.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright (C) 2021 PythonCoderAS
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
*/
17+
18+
import Deque = require("double-ended-queue");
19+
20+
/**
21+
* The item type.
22+
* @private
23+
*/
24+
interface ItemType<T, RT> {
25+
item: T;
26+
resolver: (result: RT) => void;
27+
}
28+
29+
/**
30+
* Implements a priority worker queue.
31+
*
32+
* All priorities are supposed to be >=0. This queue will pop the item at the highest priority **first**.
33+
*
34+
* The queue has a limit on the number of concurrent operations that can be ran.
35+
*
36+
* @property {(item) => Promise} worker The worker function. Must take an item and return a promise.
37+
* @property {number} concurrency The number of concurrent operations that can be ran.
38+
*/
39+
export default class ConcurrentPriorityWorkerQueue<T, RT> {
40+
private _map: Map<number, Deque<ItemType<T, RT>>> = new Map();
41+
private _length: Map<number, number> = new Map();
42+
private _maxPriority = 0;
43+
private _running = 0;
44+
public readonly worker: (item: T) => Promise<RT>;
45+
public readonly limit: number;
46+
47+
public constructor(options: {
48+
worker: (item: T) => Promise<RT>;
49+
limit?: number;
50+
}) {
51+
this.worker = options.worker;
52+
this.limit = options.limit || 1;
53+
}
54+
55+
/**
56+
* Enqueues an item. Returns a promise with the result.
57+
* @param item The item to process.
58+
* @param priority The priority to process at.
59+
*/
60+
public enqueue(item: T, priority: number): Promise<RT> {
61+
if (!this._map.has(priority)) {
62+
this._map.set(priority, new Deque());
63+
this._length.set(priority, 0);
64+
}
65+
this._maxPriority = Math.max(this._maxPriority, priority);
66+
return new Promise((resolve) => {
67+
// The reason we do this is that we know it exists due to the check above.
68+
this._map.get(priority)!.push({ item, resolver: resolve });
69+
this._length.set(priority, this._length.get(priority)! + 1);
70+
this._loop();
71+
});
72+
}
73+
74+
public get highestPriority() {
75+
return this._maxPriority;
76+
}
77+
78+
/**
79+
* Gets whether the queue is empty.
80+
*/
81+
public isEmpty(): boolean {
82+
return Array.from(this._length.values()).reduce((a, b) => a + b, 0) === 0;
83+
}
84+
85+
public determineNextPosition(priority: number): number {
86+
if (this._running < this.limit) {
87+
return 0;
88+
} else {
89+
// To get the next position, it will be processed after all the items with the same priority or higher
90+
// priority than the specified number.
91+
return (
92+
Array.from(this._length.entries())
93+
// Filter out all the items with lower priority
94+
.filter((value) => value[0] >= priority)
95+
// Map the key-value pair to the value array's length
96+
.map((value) => value[1])
97+
// Sum up all the lengths and add 1 to get the next position
98+
.reduce((a, b) => a + b, 0) + 1
99+
);
100+
}
101+
}
102+
103+
public willQueue(): boolean {
104+
return this._running >= this.limit;
105+
}
106+
107+
private _loop(): void {
108+
if (this._running < this.limit && !this.isEmpty()) {
109+
const highestPriorityQueue = this._map.get(this._maxPriority)!;
110+
const item: ItemType<T, RT> | undefined = highestPriorityQueue.shift()
111+
if (item === undefined) {
112+
return;
113+
}
114+
const newLength = this._length.get(this._maxPriority)! - 1
115+
this._length.set(this._maxPriority, newLength);
116+
if (newLength === 0){
117+
this._maxPriority = Math.max(...Array.from(this._length.keys())
118+
.filter((value) => value < this._maxPriority))
119+
}
120+
this._running++;
121+
this.worker(item.item).then((result: RT) => {
122+
item.resolver(result);
123+
this._running--;
124+
this._loop();
125+
});
126+
}
127+
}
128+
}

package-lock.json

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,9 @@
3232
"chai": "^4.3.4",
3333
"mocha": "^9.1.3",
3434
"typescript": "^4.5.4"
35+
},
36+
"dependencies": {
37+
"@types/double-ended-queue": "^2.1.1",
38+
"double-ended-queue": "^2.1.0-0"
3539
}
3640
}

0 commit comments

Comments
 (0)