Skip to content

Commit fc304e4

Browse files
committed
fix: ports could be awaited infinitely after closure
#2
1 parent 5217257 commit fc304e4

File tree

1 file changed

+152
-8
lines changed

1 file changed

+152
-8
lines changed

src/thunderbird/util/promisifying_queue.ts

Lines changed: 152 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,26 @@
22
* A utility class to help implementation of {@see IMessagePort}s.
33
* @template TMessage The type of messages queued
44
*/
5-
export class PromisifyingQueue<TResponse> {
6-
private readonly receiveQueue = [] as TResponse[]
5+
export class PromisifyingQueue<TMessage> {
6+
private readonly receiveQueue = [] as TMessage[]
77
private readonly pending = [] as PromiseWithResolvers<void>[]
88
private disconnectError: Error | undefined
99

10+
/**
11+
* Checks if the queue connection is still open.
12+
*
13+
* @returns true if the connection is open, false if closed
14+
*/
1015
isOpen(): boolean {
1116
return !this.disconnectError
1217
}
1318

14-
pushReceived(msg: TResponse): void {
19+
/**
20+
* Adds a received message to the queue. Resolves all pending consumers.
21+
*
22+
* @param msg The message to add to the queue
23+
*/
24+
pushReceived(msg: TMessage): void {
1525
this.receiveQueue.push(msg)
1626

1727
for (let resolvers of this.pending) {
@@ -20,31 +30,64 @@ export class PromisifyingQueue<TResponse> {
2030
this.pending.length = 0
2131
}
2232

33+
/**
34+
* Close the queue. Rejects all pending and subsequent consumers.
35+
*
36+
* @param err Reason to close used to reject awaiting consumers
37+
*/
2338
notifyClosed(err: Error): void {
2439
this.disconnectError = err
2540

2641
for (let resolvers of this.pending) {
27-
resolvers.reject()
42+
resolvers.reject(err)
2843
}
2944
this.pending.length = 0
3045
}
3146

47+
/**
48+
* Wait for a new message to be dequeued.
49+
*
50+
* @returns resolves when messages are ready to be dequeued
51+
* @throws The disconnect error if the connection is closed
52+
*/
3253
waitReady(): Promise<void> {
54+
if (this.disconnectError) {
55+
return Promise.reject(this.disconnectError)
56+
}
57+
3358
if (this.receiveQueue.length) {
3459
return Promise.resolve()
3560
}
36-
// Wait for next message
61+
3762
let resolvers = Promise.withResolvers<void>()
3863
this.pending.push(resolvers)
39-
4064
return resolvers.promise
4165
}
4266

43-
dequeueReceived(): TResponse | undefined {
67+
/**
68+
* Removes and returns the first message from the queue.
69+
*
70+
* @returns The first message in the queue, or undefined if the queue is empty
71+
* @throws The disconnect error if the connection is closed
72+
*/
73+
dequeueReceived(): TMessage | undefined {
74+
if (this.disconnectError) {
75+
throw this.disconnectError
76+
}
4477
return this.receiveQueue.shift()
4578
}
4679

47-
clearReceived(): TResponse | undefined {
80+
/**
81+
* Clears all messages from the queue and returns the last message that was queued.
82+
*
83+
* @returns The last message that was in the queue, or undefined if the queue was empty
84+
* @throws The disconnect error if the connection is closed
85+
*/
86+
clearReceived(): TMessage | undefined {
87+
if (this.disconnectError) {
88+
throw this.disconnectError
89+
}
90+
4891
let len = this.receiveQueue.length
4992
if (len === 0) {
5093
return
@@ -55,3 +98,104 @@ export class PromisifyingQueue<TResponse> {
5598
return last
5699
}
57100
}
101+
102+
if (import.meta.vitest) {
103+
const { describe, expect, it } = import.meta.vitest
104+
105+
describe(PromisifyingQueue, () => {
106+
it("should be open by default", () => {
107+
const sut = new PromisifyingQueue<string>()
108+
109+
expect(sut.isOpen()).toBe(true)
110+
})
111+
112+
it("should be closed after notifyClosed", () => {
113+
const sut = new PromisifyingQueue<string>()
114+
const error = new Error("Connection closed")
115+
116+
sut.notifyClosed(error)
117+
118+
expect(sut.isOpen()).toBe(false)
119+
})
120+
121+
it("should allow dequeuing before closure", () => {
122+
const sut = new PromisifyingQueue<string>()
123+
const error = new Error("Connection closed")
124+
125+
sut.pushReceived("hello")
126+
sut.pushReceived("world")
127+
128+
expect(sut.dequeueReceived()).toBe("hello")
129+
expect(sut.clearReceived()).toBe("world")
130+
131+
sut.notifyClosed(error)
132+
133+
expect(() => sut.dequeueReceived()).toThrow(error)
134+
expect(() => sut.clearReceived()).toThrow(error)
135+
})
136+
137+
it("should notify existing consumers when pushed", async () => {
138+
const sut = new PromisifyingQueue<string>()
139+
140+
let wait = sut.waitReady()
141+
142+
expect(sut.dequeueReceived()).toBe(undefined)
143+
await expect(isPending(wait)).resolves.toBe(true)
144+
145+
sut.pushReceived("hello")
146+
sut.pushReceived("cruel")
147+
sut.pushReceived("world")
148+
149+
await expect(isPending(wait)).resolves.toBe(false)
150+
151+
expect(sut.dequeueReceived()).toBe("hello")
152+
expect(sut.clearReceived()).toBe("world")
153+
})
154+
155+
it("should reject pending consumers when closed", async () => {
156+
const sut = new PromisifyingQueue<string>()
157+
const error = new Error("Connection closed")
158+
159+
const wait = sut.waitReady()
160+
161+
sut.notifyClosed(error)
162+
163+
await expect(wait).rejects.toBe(error)
164+
})
165+
166+
it("should reject waitReady immediately when already closed", async () => {
167+
const sut = new PromisifyingQueue<string>()
168+
const error = new Error("Connection closed")
169+
170+
sut.notifyClosed(error)
171+
172+
await expect(sut.waitReady()).rejects.toBe(error)
173+
})
174+
175+
it("should throw when dequeueReceived called on closed queue", () => {
176+
const sut = new PromisifyingQueue<string>()
177+
const error = new Error("Connection closed")
178+
179+
sut.pushReceived("message")
180+
sut.notifyClosed(error)
181+
182+
expect(() => sut.dequeueReceived()).toThrow(error)
183+
})
184+
185+
it("should throw when clearReceived called on closed queue", () => {
186+
const sut = new PromisifyingQueue<string>()
187+
const error = new Error("Connection closed")
188+
189+
sut.pushReceived("message")
190+
sut.notifyClosed(error)
191+
192+
expect(() => sut.clearReceived()).toThrow(error)
193+
})
194+
})
195+
196+
const PENDING = Symbol("pending")
197+
async function isPending(promise: Promise<unknown>): Promise<boolean> {
198+
let raced = await Promise.race([promise, Promise.resolve(PENDING)])
199+
return raced === PENDING
200+
}
201+
}

0 commit comments

Comments
 (0)