-
Notifications
You must be signed in to change notification settings - Fork 554
/
websocket-keepalive.ts
152 lines (138 loc) · 4.37 KB
/
websocket-keepalive.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import { SECOND, wait } from '@atproto/common'
import { WebSocket, ClientOptions } from 'ws'
import { streamByteChunks } from './stream'
import { CloseCode, DisconnectError } from './types'
export class WebSocketKeepAlive {
public ws: WebSocket | null = null
public initialSetup = true
public reconnects: number | null = null
constructor(
public opts: ClientOptions & {
getUrl: () => Promise<string>
maxReconnectSeconds?: number
signal?: AbortSignal
heartbeatIntervalMs?: number
onReconnectError?: (
error: unknown,
n: number,
initialSetup: boolean,
) => void
},
) {}
async *[Symbol.asyncIterator](): AsyncGenerator<Uint8Array> {
const maxReconnectMs = 1000 * (this.opts.maxReconnectSeconds ?? 64)
while (true) {
if (this.reconnects !== null) {
const duration = this.initialSetup
? Math.min(1000, maxReconnectMs)
: backoffMs(this.reconnects++, maxReconnectMs)
await wait(duration)
}
const url = await this.opts.getUrl()
this.ws = new WebSocket(url, this.opts)
const ac = new AbortController()
if (this.opts.signal) {
forwardSignal(this.opts.signal, ac)
}
this.ws.once('open', () => {
this.initialSetup = false
this.reconnects = 0
if (this.ws) {
this.startHeartbeat(this.ws)
}
})
this.ws.once('close', (code, reason) => {
if (code === CloseCode.Abnormal) {
// Forward into an error to distinguish from a clean close
ac.abort(
new AbnormalCloseError(`Abnormal ws close: ${reason.toString()}`),
)
}
})
try {
const wsStream = streamByteChunks(this.ws, { signal: ac.signal })
for await (const chunk of wsStream) {
yield chunk
}
} catch (_err) {
const err = _err?.['code'] === 'ABORT_ERR' ? _err['cause'] : _err
if (err instanceof DisconnectError) {
// We cleanly end the connection
this.ws?.close(err.wsCode)
break
}
this.ws?.close() // No-ops if already closed or closing
if (isReconnectable(err)) {
this.reconnects ??= 0 // Never reconnect with a null
this.opts.onReconnectError?.(err, this.reconnects, this.initialSetup)
continue
} else {
throw err
}
}
break // Other side cleanly ended stream and disconnected
}
}
startHeartbeat(ws: WebSocket) {
let isAlive = true
let heartbeatInterval: NodeJS.Timeout | null = null
const checkAlive = () => {
if (!isAlive) {
return ws.terminate()
}
isAlive = false // expect websocket to no longer be alive unless we receive a "pong" within the interval
ws.ping()
}
checkAlive()
heartbeatInterval = setInterval(
checkAlive,
this.opts.heartbeatIntervalMs ?? 10 * SECOND,
)
ws.on('pong', () => {
isAlive = true
})
ws.once('close', () => {
if (heartbeatInterval) {
clearInterval(heartbeatInterval)
heartbeatInterval = null
}
})
}
}
export default WebSocketKeepAlive
class AbnormalCloseError extends Error {
code = 'EWSABNORMALCLOSE'
}
function isReconnectable(err: unknown): boolean {
// Network errors are reconnectable.
// AuthenticationRequired and InvalidRequest XRPCErrors are not reconnectable.
// @TODO method-specific XRPCErrors may be reconnectable, need to consider. Receiving
// an invalid message is not current reconnectable, but the user can decide to skip them.
if (!err || typeof err['code'] !== 'string') return false
return networkErrorCodes.includes(err['code'])
}
const networkErrorCodes = [
'EWSABNORMALCLOSE',
'ECONNRESET',
'ECONNREFUSED',
'ECONNABORTED',
'EPIPE',
'ETIMEDOUT',
'ECANCELED',
]
function backoffMs(n: number, maxMs: number) {
const baseSec = Math.pow(2, n) // 1, 2, 4, ...
const randSec = Math.random() - 0.5 // Random jitter between -.5 and .5 seconds
const ms = 1000 * (baseSec + randSec)
return Math.min(ms, maxMs)
}
function forwardSignal(signal: AbortSignal, ac: AbortController) {
if (signal.aborted) {
return ac.abort(signal.reason)
} else {
signal.addEventListener('abort', () => ac.abort(signal.reason), {
// @ts-ignore https://github.com/DefinitelyTyped/DefinitelyTyped/pull/68625
signal: ac.signal,
})
}
}