-
Notifications
You must be signed in to change notification settings - Fork 5
/
websocket.server.ts
41 lines (40 loc) · 1.64 KB
/
websocket.server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import { Server } from 'https://deno.land/std@0.61.0/http/server.ts'
import { acceptWebSocket, WebSocket } from 'https://deno.land/std@0.61.0/ws/mod.ts'
import { CallbackBasedChannel } from '../../src/types.ts'
export class WebSocketChannel extends EventTarget implements CallbackBasedChannel {
constructor(public server: Server) {
super()
}
private async acceptRequest(callback: (data: unknown) => Promise<unknown>, signal: AbortController) {
for await (const req of this.server) {
if (signal.signal.aborted) return
const { conn, r: bufReader, w: bufWriter, headers } = req
const ws = await acceptWebSocket({
conn,
bufReader,
bufWriter,
headers,
})
signal.signal.addEventListener('abort', () => ws.close(), { once: true })
this.handledWebSocket(ws, callback, signal).catch(this.error)
}
}
private async handledWebSocket(
websocket: WebSocket,
callback: (data: unknown) => Promise<unknown>,
signal: AbortController,
) {
for await (const event of websocket) {
if (signal.signal.aborted || websocket.isClosed) return
callback(event).then((x) => x && !websocket.isClosed && websocket.send(x as any), this.error)
}
}
private error = (error: any) => {
this.dispatchEvent(new ErrorEvent('error', { error }))
}
setup(callback: (data: unknown) => Promise<unknown>) {
const signal = new AbortController()
this.acceptRequest(callback, signal).catch(this.error)
return () => signal.abort()
}
}