Skip to content

Commit 4ec2f56

Browse files
jdaltonclaude
andcommitted
Add PromiseQueue utility for controlled concurrency
Implements a promise queue with configurable concurrency limits based on patterns from coana-package-manager. Features include: - Configurable max concurrency - Optional queue size limits to prevent memory buildup - Progress tracking (activeCount, pendingCount) - Idle detection with onIdle() - Proper error propagation This utility enables better resource management and rate limiting for async operations in the SDK. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 1705fc6 commit 4ec2f56

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

src/promise-queue.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* A promise queue that limits concurrent execution of async tasks.
3+
* Based on patterns from coana-package-manager for resource-aware async operations.
4+
*/
5+
6+
type QueuedTask<T> = {
7+
fn: () => Promise<T>
8+
resolve: (value: T) => void
9+
reject: (error: unknown) => void
10+
}
11+
12+
export class PromiseQueue {
13+
private queue: Array<QueuedTask<unknown>> = []
14+
private running = 0
15+
16+
private readonly maxConcurrency: number
17+
private readonly maxQueueLength: number | undefined
18+
19+
/**
20+
* Creates a new PromiseQueue
21+
* @param maxConcurrency - Maximum number of promises that can run concurrently
22+
* @param maxQueueLength - Maximum queue size (older tasks are dropped if exceeded)
23+
*/
24+
constructor(maxConcurrency: number, maxQueueLength?: number | undefined) {
25+
this.maxConcurrency = maxConcurrency
26+
this.maxQueueLength = maxQueueLength
27+
if (maxConcurrency < 1) {
28+
throw new Error('maxConcurrency must be at least 1')
29+
}
30+
}
31+
32+
/**
33+
* Add a task to the queue
34+
* @param fn - Async function to execute
35+
* @returns Promise that resolves with the function's result
36+
*/
37+
async add<T>(fn: () => Promise<T>): Promise<T> {
38+
return await new Promise<T>((resolve, reject) => {
39+
const task: QueuedTask<T> = { fn, resolve, reject }
40+
41+
if (this.maxQueueLength && this.queue.length >= this.maxQueueLength) {
42+
// Drop oldest task to prevent memory buildup
43+
this.queue.shift()
44+
}
45+
46+
this.queue.push(task as QueuedTask<unknown>)
47+
this.runNext()
48+
})
49+
}
50+
51+
private runNext(): void {
52+
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
53+
return
54+
}
55+
56+
const task = this.queue.shift()
57+
if (!task) {
58+
return
59+
}
60+
61+
this.running++
62+
63+
task
64+
.fn()
65+
.then(task.resolve)
66+
.catch(task.reject)
67+
.finally(() => {
68+
this.running--
69+
this.runNext()
70+
})
71+
}
72+
73+
/**
74+
* Wait for all queued and running tasks to complete
75+
*/
76+
async onIdle(): Promise<void> {
77+
return await new Promise<void>(resolve => {
78+
const check = () => {
79+
if (this.running === 0 && this.queue.length === 0) {
80+
resolve()
81+
} else {
82+
setImmediate(check)
83+
}
84+
}
85+
check()
86+
})
87+
}
88+
89+
/**
90+
* Get the number of tasks currently running
91+
*/
92+
get activeCount(): number {
93+
return this.running
94+
}
95+
96+
/**
97+
* Get the number of tasks waiting in the queue
98+
*/
99+
get pendingCount(): number {
100+
return this.queue.length
101+
}
102+
103+
/**
104+
* Clear all pending tasks from the queue (does not affect running tasks)
105+
*/
106+
clear(): void {
107+
this.queue = []
108+
}
109+
}

0 commit comments

Comments
 (0)