forked from enisdenjo/graphql-ws
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ws.ts
137 lines (125 loc) · 4.34 KB
/
ws.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import type * as http from 'http';
import type * as ws from 'ws';
import { makeServer, ServerOptions } from '../server';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL, Disposable } from '../common';
// for nicer documentation
type WebSocket = typeof ws.prototype;
type WebSocketServer = ws.Server;
/**
* The extra that will be put in the `Context`.
*
* @category Server/ws
*/
export interface Extra {
/**
* The actual socket connection between the server and the client.
*/
readonly socket: WebSocket;
/**
* The initial HTTP upgrade request before the actual
* socket and connection is established.
*/
readonly request: http.IncomingMessage;
}
/**
* Use the server on a [ws](https://github.com/websockets/ws) ws server.
* This is a basic starter, feel free to copy the code over and adjust it to your needs
*
* @category Server/ws
*/
export function useServer<
E extends Record<PropertyKey, unknown> = Record<PropertyKey, never>,
>(
options: ServerOptions<Extra & Partial<E>>,
ws: WebSocketServer,
/**
* The timout between dispatched keep-alive messages. Internally uses the [ws Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/wss_API/Writing_ws_servers#Pings_and_Pongs_The_Heartbeat_of_wss))
* to check that the link between the clients and the server is operating and to prevent the link
* from being broken due to idling.
*
* @default 12_000 // 12 seconds
*/
keepAlive = 12_000,
): Disposable {
const isProd = process.env.NODE_ENV === 'production';
const server = makeServer(options);
ws.on('error', (err) => {
// catch the first thrown error and re-throw it once all clients have been notified
let firstErr: Error | null = null;
// report server errors by erroring out all clients with the same error
for (const client of ws.clients) {
try {
client.close(1011, isProd ? 'Internal Error' : err.message);
} catch (err) {
firstErr = firstErr ?? err;
}
}
if (firstErr) throw firstErr;
});
ws.on('connection', (socket, request) => {
// keep alive through ping-pong messages
let pongWait: NodeJS.Timeout | null = null;
const pingInterval =
keepAlive > 0 && isFinite(keepAlive)
? setInterval(() => {
// ping pong on open sockets only
if (socket.readyState === socket.OPEN) {
// terminate the connection after pong wait has passed because the client is idle
pongWait = setTimeout(() => {
socket.terminate();
}, keepAlive);
// listen for client's pong and stop socket termination
socket.once('pong', () => {
if (pongWait) {
clearTimeout(pongWait);
pongWait = null;
}
});
socket.ping();
}
}, keepAlive)
: null;
const closed = server.opened(
{
protocol: socket.protocol,
send: (data) =>
new Promise((resolve, reject) => {
socket.send(data, (err) => (err ? reject(err) : resolve()));
}),
close: (code, reason) => socket.close(code, reason),
onMessage: (cb) =>
socket.on('message', async (event) => {
try {
await cb(String(event));
} catch (err) {
socket.close(1011, isProd ? 'Internal Error' : err.message);
}
}),
},
{ socket, request } as Extra & Partial<E>,
);
socket.once('close', (code, reason) => {
if (pongWait) clearTimeout(pongWait);
if (pingInterval) clearInterval(pingInterval);
if (!isProd && code === 4406)
console.warn(
`WebSocket protocol error occured. It was most likely caused due to an ` +
`unsupported subprotocol "${socket.protocol}" requested by the client. ` +
`graphql-ws implements exclusively the "${GRAPHQL_TRANSPORT_WS_PROTOCOL}" subprotocol, ` +
'please make sure that the client implements it too.',
);
closed(code, String(reason));
});
});
return {
dispose: async () => {
for (const client of ws.clients) {
client.close(1001, 'Going away');
}
ws.removeAllListeners();
await new Promise<void>((resolve, reject) => {
ws.close((err) => (err ? reject(err) : resolve()));
});
},
};
}