From 38bde87122f4c39b0357c636fd98bfee886dd6e5 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Fri, 20 Nov 2020 03:39:10 +0100 Subject: [PATCH] feat(server): Make and use with your own flavour (#64) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: You now "make" a ready-to-use server that can be used with _any_ WebSocket implementation! Summary of breaking changes: - No more `keepAlive`. The user should provide its own keep-alive implementation. _(I highly recommend [WebSocket Ping and Pongs](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets))_ - No more HTTP `request` in the server context. - No more WebSocket in the server context (you're the one that creates it). - You use your own WebSocket server - Server exports only `makeServer` _(no more `createServer`)_ ### Benefits - You're responsible for the server (_any_ optimisation or adjustment can be applied) - Any WebSocket server can be used (or even mocked if necessary) - You control the disposal of the server (close or transfer clients however you wish) - New `extra` field in the `Context` for storing custom values useful for callbacks - Full control of authentication flow - Full control over error handling - True zero-dependency ### Migrating from v1 **Only the server has to be migrated.** Since this release allows you to use your favourite WebSocket library (or your own implementation), using [ws](https://github.com/websockets/ws) is just one way of using `graphql-ws`. This is how to use the implementation shipped with the lib: ```ts /** * ❌ instead of the lib creating a WebSocket server internally with the provided arguments */ import https from 'https'; import { createServer } from 'graphql-ws'; const server = https.createServer(...); createServer( { onConnect(ctx) { // were previously directly on the context ctx.request as IncomingRequest ctx.socket as WebSocket }, ...rest, }, { server, path: '/graphql', }, ); /** * ✅ you have to supply the server yourself */ import https from 'https'; import ws from 'ws'; // yarn add ws import { useServer } from 'graphql-ws/lib/use/ws'; // notice the import path const server = https.createServer(...); const wsServer = new ws.Server({ server, path: '/graphql', }); useServer( { onConnect(ctx) { // are now in the `extra` field ctx.extra.request as IncomingRequest ctx.extra.socket as WebSocket }, ...rest, }, wsServer, // optional keepAlive with ping pongs (defaults to 12 seconds) ); ``` Closes: #61, closes: #73, closes: #75 --- README.md | 196 ++++++---- docs/README.md | 1 + docs/interfaces/_server_.context.md | 35 +- docs/interfaces/_server_.server.md | 44 ++- docs/interfaces/_server_.serveroptions.md | 37 +- docs/interfaces/_server_.websocket.md | 101 +++++ docs/interfaces/_types_.disposable.md | 2 - docs/interfaces/_use_ws_.extra.md | 35 ++ docs/modules/_server_.md | 26 +- docs/modules/_use_ws_.md | 34 ++ jest.config.js | 2 +- package.json | 6 +- src/server.ts | 402 ++++++++------------ src/tests/client.ts | 4 +- src/tests/fixtures/simple.ts | 34 +- src/tests/server.ts | 429 +--------------------- src/tests/use/ws.ts | 332 +++++++++++++++++ src/tests/utils/index.ts | 1 + src/tests/utils/tclient.ts | 75 ++++ src/use/ws.ts | 126 +++++++ 20 files changed, 1099 insertions(+), 823 deletions(-) create mode 100644 docs/interfaces/_server_.websocket.md create mode 100644 docs/interfaces/_use_ws_.extra.md create mode 100644 docs/modules/_use_ws_.md create mode 100644 src/tests/use/ws.ts create mode 100644 src/tests/utils/index.ts create mode 100644 src/tests/utils/tclient.ts create mode 100644 src/use/ws.ts diff --git a/README.md b/README.md index 630c7cb0..599f81f9 100644 --- a/README.md +++ b/README.md @@ -52,25 +52,28 @@ const roots = { ```ts import https from 'https'; +import ws from 'ws'; // yarn add ws +import { useServer } from 'graphql-ws/lib/use/ws'; import { execute, subscribe } from 'graphql'; -import { createServer } from 'graphql-ws'; const server = https.createServer(function weServeSocketsOnly(_, res) { res.writeHead(404); res.end(); }); -createServer( +const wsServer = new ws.Server({ + server, + path: '/graphql', +}); + +useServer( { schema, // from the previous step roots, // from the previous step execute, subscribe, }, - { - server, - path: '/graphql', - }, + wsServer, ); server.listen(443); @@ -79,7 +82,7 @@ server.listen(443); #### Use the client ```ts -import { createClient } from 'graphql-ws'; +import { createClient } from 'graphql-ws/lib/use/ws'; const client = createClient({ url: 'wss://welcomer.com/graphql', @@ -133,7 +136,7 @@ const client = createClient({ 🔗 Client usage with Promise ```ts -import { createClient, SubscribePayload } from 'graphql-ws'; +import { createClient, SubscribePayload } from 'graphql-ws/lib/use/ws'; const client = createClient({ url: 'wss://hey.there/graphql', @@ -170,7 +173,7 @@ async function execute(payload: SubscribePayload) { 🔗 Client usage with AsyncIterator ```ts -import { createClient, SubscribePayload } from 'graphql-ws'; +import { createClient, SubscribePayload } from 'graphql-ws/lib/use/ws'; const client = createClient({ url: 'wss://iterators.ftw/graphql', @@ -281,7 +284,7 @@ import { RequestParameters, Variables, } from 'relay-runtime'; -import { createClient } from 'graphql-ws'; +import { createClient } from 'graphql-ws/lib/use/ws'; const subscriptionsClient = createClient({ url: 'wss://i.love/graphql', @@ -346,7 +349,7 @@ export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe); ```ts import { createClient, defaultExchanges, subscriptionExchange } from 'urql'; -import { createClient as createWSClient } from 'graphql-ws'; +import { createClient as createWSClient } from 'graphql-ws/lib/use/ws'; const wsClient = createWSClient({ url: 'wss://its.urql/graphql', @@ -380,7 +383,7 @@ const client = createClient({ ```typescript import { ApolloLink, Operation, FetchResult, Observable } from '@apollo/client'; import { print, GraphQLError } from 'graphql'; -import { createClient, ClientOptions, Client } from 'graphql-ws'; +import { createClient, ClientOptions, Client } from 'graphql-ws/lib/use/ws'; class WebSocketLink extends ApolloLink { private client: Client; @@ -472,13 +475,13 @@ const link = new WebSocketLink({ 🔗 Client usage in Node ```ts -const WebSocket = require('ws'); // yarn add ws +const ws = require('ws'); // yarn add ws const Crypto = require('crypto'); const { createClient } = require('graphql-ws'); const client = createClient({ url: 'wss://no.browser/graphql', - webSocketImpl: WebSocket, + webSocketImpl: ws, /** * Generates a v4 UUID to be used as the ID. * Reference: https://stackoverflow.com/a/2117523/709884 @@ -494,14 +497,72 @@ const client = createClient({ +
+🔗 Server usage with ws + +```ts +// minimal version of `import { useServer } from 'graphql-ws/lib/use/ws';` + +import http from 'http'; +import ws from 'ws'; // yarn add ws +import { makeServer, ServerOptions } from 'graphql-ws'; +import { execute, subscribe } from 'graphql'; +import { schema } from 'my-graphql-schema'; + +// make +const server = makeServer({ + schema, + execute, + subscribe, +}); + +// create websocket server +const wsServer = new ws.Server({ + server, + path: '/graphql', +}); + +// implement +wsServer.on('connection', (socket, request) => { + const closed = server.opened( + { + protocol: socket.protocol, + send: (data) => + new Promise((resolve, reject) => { + socket.send(data, (err) => (err ? reject(err) : resolve())); + }), + close: (code, reason) => socket.close(code, reason), + onMessage: (cb) => + socket.on('message', async (event) => { + try { + await cb(event.toString()); + } catch (err) { + socket.close(1011, err.message); + } + }), + }, + { socket, request }, + ); + + socket.once('close', () => { + if (pongWait) clearTimeout(pongWait); + if (pingInterval) clearInterval(pingInterval); + closed(); + }); +}); +``` + +
+
-🔗 Server usage with Express GraphQL +🔗 ws server usage with Express GraphQL ```typescript import https from 'https'; +import ws from 'ws'; // yarn add ws import express from 'express'; import { graphqlHTTP } from 'express-graphql'; -import { createServer } from 'graphql-ws'; +import { useServer } from 'graphql-ws/lib/use/ws'; import { execute, subscribe } from 'graphql'; import { schema } from 'my-graphql-schema'; @@ -512,17 +573,20 @@ app.use('/graphql', graphqlHTTP({ schema })); // create a http server using express const server = https.createServer(app); +// create websocket server +const wsServer = new ws.Server({ + server, + path: '/graphql', +}); + server.listen(443, () => { - createServer( + useServer( { schema, execute, subscribe, }, - { - server, - path: '/graphql', // you can use the same path too, just use the `ws` schema - }, + wsServer, ); }); ``` @@ -530,13 +594,14 @@ server.listen(443, () => {
-🔗 Server usage with Apollo Server Express +🔗 ws server usage with Apollo Server Express ```typescript import https from 'https'; import express from 'express'; import { ApolloServer } from 'apollo-server-express'; -import { createServer } from 'graphql-ws'; +import ws from 'ws'; // yarn add ws +import { useServer } from 'graphql-ws/lib/use/ws'; import { execute, subscribe } from 'graphql'; import { schema } from 'my-graphql-schema'; @@ -552,17 +617,20 @@ apolloServer.applyMiddleware({ app }); // create a http server using express const server = https.createServer(app); +// create websocket server +const wsServer = new ws.Server({ + server, + path: '/graphql', +}); + server.listen(443, () => { - createServer( + useServer( { schema, execute, subscribe, }, - { - server, - path: '/graphql', // you can use the same path too, just use the `ws` schema - }, + wsServer, ); }); ``` @@ -570,12 +638,13 @@ server.listen(443, () => {
-🔗 Server usage with console logging +🔗 ws server usage with console logging ```typescript import https from 'https'; import { execute, subscribe } from 'graphql'; -import { createServer } from 'graphql-ws'; +import ws from 'ws'; // yarn add ws +import { useServer } from 'graphql-ws/lib/use/ws'; import { schema } from 'my-graphql-schema'; const server = https.createServer(function weServeSocketsOnly(_, res) { @@ -583,7 +652,12 @@ const server = https.createServer(function weServeSocketsOnly(_, res) { res.end(); }); -createServer( +const wsServer = new ws.Server({ + server, + path: '/graphql', +}); + +useServer( { schema, onConnect: (ctx) => { @@ -602,10 +676,7 @@ createServer( console.log('Complete', { ctx, msg }); }, }, - { - server, - path: '/graphql', - }, + wsServer, ); server.listen(443); @@ -614,14 +685,14 @@ server.listen(443);
-🔗 Server usage on a multi WebSocket server +🔗 ws server usage on a multi WebSocket server ```typescript import https from 'https'; -import WebSocket from 'ws'; +import ws from 'ws'; // yarn add ws import url from 'url'; import { execute, subscribe } from 'graphql'; -import { createServer, createClient } from 'graphql-ws'; +import { useServer, createClient } from 'graphql-ws/lib/use/ws'; import { schema } from 'my-graphql-schema'; const server = https.createServer(function weServeSocketsOnly(_, res) { @@ -634,8 +705,8 @@ const server = https.createServer(function weServeSocketsOnly(_, res) { * - `/wave` sends out waves * - `/graphql` serves graphql */ -const waveWS = new WebSocket.Server({ noServer: true }); -const graphqlWS = new WebSocket.Server({ noServer: true }); +const waveWS = new ws.Server({ noServer: true }); +const graphqlWS = new ws.Server({ noServer: true }); // delegate upgrade requests to relevant destinations server.on('upgrade', (request, socket, head) => { @@ -660,7 +731,7 @@ waveWS.on('connection', (socket) => { }); // serve graphql -createServer( +useServer( { schema, execute, @@ -675,14 +746,15 @@ server.listen(443);
-🔗 Server usage with custom context value +🔗 ws server usage with custom context value ```typescript import { validate, execute, subscribe } from 'graphql'; -import { createServer } from 'graphql-ws'; +import ws from 'ws'; // yarn add ws +import { useServer } from 'graphql-ws/lib/use/ws'; import { schema, roots, getDynamicContext } from 'my-graphql'; -createServer( +useServer( { context: (ctx, msg, args) => { return getDynamicContext(ctx, msg, args); @@ -692,24 +764,22 @@ createServer( execute, subscribe, }, - { - server, - path: '/graphql', - }, + wsServer, ); ```
-🔗 Server usage with custom execution arguments and validation +🔗 ws server usage with custom execution arguments and validation ```typescript import { parse, validate, execute, subscribe } from 'graphql'; -import { createServer } from 'graphql-ws'; +import ws from 'ws'; // yarn add ws +import { useServer } from 'graphql-ws/lib/use/ws'; import { schema, myValidationRules } from 'my-graphql'; -createServer( +useServer( { execute, subscribe, @@ -730,23 +800,21 @@ createServer( return args; }, }, - { - server, - path: '/graphql', - }, + wsServer, ); ```
-🔗 Server and client usage with persisted queries +🔗 ws server and client usage with persisted queries ```typescript // 🛸 server import { parse, execute, subscribe } from 'graphql'; -import { createServer } from 'graphql-ws'; +import ws from 'ws'; // yarn add ws +import { useServer } from 'graphql-ws/lib/use/ws'; import { schema } from 'my-graphql-schema'; // a unique GraphQL execution ID used for representing @@ -761,7 +829,12 @@ const queriesStore: Record = { }, }; -createServer( +const wsServer = new ws.Server({ + server, + path: '/graphql', +}); + +useServer( { execute, subscribe, @@ -777,17 +850,14 @@ createServer( }; }, }, - { - server, - path: '/graphql', - }, + wsServer, ); ``` ```typescript // 📺 client -import { createClient } from 'graphql-ws'; +import { createClient } from 'graphql-ws/lib/use/ws'; const client = createClient({ url: 'wss://persisted.graphql/queries', diff --git a/docs/README.md b/docs/README.md index beecc159..975314ab 100644 --- a/docs/README.md +++ b/docs/README.md @@ -13,3 +13,4 @@ * ["protocol"](modules/_protocol_.md) * ["server"](modules/_server_.md) * ["types"](modules/_types_.md) +* ["use/ws"](modules/_use_ws_.md) diff --git a/docs/interfaces/_server_.context.md b/docs/interfaces/_server_.context.md index 7c42b5c3..b3ac4db2 100644 --- a/docs/interfaces/_server_.context.md +++ b/docs/interfaces/_server_.context.md @@ -2,7 +2,13 @@ > [Globals](../README.md) / ["server"](../modules/_server_.md) / Context -# Interface: Context +# Interface: Context\ + +## Type parameters + +Name | Default | +------ | ------ | +`E` | unknown | ## Hierarchy @@ -15,15 +21,14 @@ * [acknowledged](_server_.context.md#acknowledged) * [connectionInitReceived](_server_.context.md#connectioninitreceived) * [connectionParams](_server_.context.md#connectionparams) -* [request](_server_.context.md#request) -* [socket](_server_.context.md#socket) +* [extra](_server_.context.md#extra) * [subscriptions](_server_.context.md#subscriptions) ## Properties ### acknowledged -• **acknowledged**: boolean +• `Readonly` **acknowledged**: boolean Indicates that the connection was acknowledged by having dispatched the `ConnectionAck` message @@ -33,7 +38,7 @@ ___ ### connectionInitReceived -• **connectionInitReceived**: boolean +• `Readonly` **connectionInitReceived**: boolean Indicates that the `ConnectionInit` message has been received by the server. If this is @@ -44,32 +49,24 @@ ___ ### connectionParams -• `Optional` **connectionParams**: Readonly\> +• `Optional` `Readonly` **connectionParams**: Readonly\> The parameters passed during the connection initialisation. ___ -### request - -• `Readonly` **request**: IncomingMessage - -The initial HTTP request before the actual -socket and connection is established. - -___ - -### socket +### extra -• `Readonly` **socket**: WebSocket +• **extra**: E -The actual WebSocket connection between the server and the client. +An extra field where you can store your own context values +to pass between callbacks. ___ ### subscriptions -• **subscriptions**: Record\<[ID](../modules/_types_.md#id), AsyncIterator\> +• `Readonly` **subscriptions**: Record\<[ID](../modules/_types_.md#id), AsyncIterator\> Holds the active subscriptions for this context. Subscriptions are for **streaming operations only**, diff --git a/docs/interfaces/_server_.server.md b/docs/interfaces/_server_.server.md index 9ba1039c..5adefb02 100644 --- a/docs/interfaces/_server_.server.md +++ b/docs/interfaces/_server_.server.md @@ -2,33 +2,47 @@ > [Globals](../README.md) / ["server"](../modules/_server_.md) / Server -# Interface: Server +# Interface: Server\ -## Hierarchy +## Type parameters + +Name | Default | +------ | ------ | +`E` | undefined | -* [Disposable](_types_.disposable.md) +## Hierarchy - ↳ **Server** +* **Server** ## Index -### Properties +### Methods + +* [opened](_server_.server.md#opened) -* [dispose](_server_.server.md#dispose) -* [webSocketServer](_server_.server.md#websocketserver) +## Methods -## Properties +### opened -### dispose +▸ **opened**(`socket`: [WebSocket](_server_.websocket.md), `ctxExtra`: E): function -• **dispose**: () => void \| Promise\ +New socket has beeen established. The lib will validate +the protocol and use the socket accordingly. Returned promise +will resolve after the socket closes. -*Inherited from [Disposable](_types_.disposable.md).[dispose](_types_.disposable.md#dispose)* +The second argument will be passed in the `extra` field +of the `Context`. You may pass the initial request or the +original WebSocket, if you need it down the road. -Dispose of the instance and clear up resources. +Returns a function that should be called when the same socket +has been closed, for whatever reason. The returned promise will +resolve once the internal cleanup is complete. -___ +#### Parameters: -### webSocketServer +Name | Type | +------ | ------ | +`socket` | [WebSocket](_server_.websocket.md) | +`ctxExtra` | E | -• **webSocketServer**: Server +**Returns:** function diff --git a/docs/interfaces/_server_.serveroptions.md b/docs/interfaces/_server_.serveroptions.md index ef31447b..df6756eb 100644 --- a/docs/interfaces/_server_.serveroptions.md +++ b/docs/interfaces/_server_.serveroptions.md @@ -2,7 +2,13 @@ > [Globals](../README.md) / ["server"](../modules/_server_.md) / ServerOptions -# Interface: ServerOptions +# Interface: ServerOptions\ + +## Type parameters + +Name | Default | +------ | ------ | +`E` | unknown | ## Hierarchy @@ -15,7 +21,6 @@ * [connectionInitWaitTimeout](_server_.serveroptions.md#connectioninitwaittimeout) * [context](_server_.serveroptions.md#context) * [execute](_server_.serveroptions.md#execute) -* [keepAlive](_server_.serveroptions.md#keepalive) * [onComplete](_server_.serveroptions.md#oncomplete) * [onConnect](_server_.serveroptions.md#onconnect) * [onError](_server_.serveroptions.md#onerror) @@ -48,7 +53,7 @@ ___ ### context -• `Optional` **context**: [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue) \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs) => [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue) +• `Optional` **context**: [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue) \| (ctx: [Context](_server_.context.md)\, message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs) => Promise\<[GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue)> \| [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue) A value which is provided to every resolver and holds important contextual information like the currently @@ -78,23 +83,9 @@ in the close event reason. ___ -### keepAlive - -• `Optional` **keepAlive**: undefined \| number - -The timout between dispatched keep-alive messages. Internally the lib -uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between -the clients and the server is operating and to prevent the link from being broken due to idling. - -Set to nullish value to disable. - -**`default`** 12 * 1000 (12 seconds) - -___ - ### onComplete -• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md), message: [CompleteMessage](_message_.completemessage.md)) => Promise\ \| void +• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md)\, message: [CompleteMessage](_message_.completemessage.md)) => Promise\ \| void The complete callback is executed after the operation has completed right before sending @@ -112,7 +103,7 @@ ___ ### onConnect -• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)) => Promise\ \| boolean \| void> \| Record\ \| boolean \| void +• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)\) => Promise\ \| boolean \| void> \| Record\ \| boolean \| void Is the connection callback called when the client requests the connection initialisation @@ -141,7 +132,7 @@ ___ ### onError -• `Optional` **onError**: undefined \| (ctx: [Context](_server_.context.md), message: [ErrorMessage](_message_.errormessage.md), errors: readonly GraphQLError[]) => Promise\ \| readonly GraphQLError[] \| void +• `Optional` **onError**: undefined \| (ctx: [Context](_server_.context.md)\, message: [ErrorMessage](_message_.errormessage.md), errors: readonly GraphQLError[]) => Promise\ \| readonly GraphQLError[] \| void Executed after an error occured right before it has been dispatched to the client. @@ -159,7 +150,7 @@ ___ ### onNext -• `Optional` **onNext**: undefined \| (ctx: [Context](_server_.context.md), message: [NextMessage](_message_.nextmessage.md), args: ExecutionArgs, result: ExecutionResult) => Promise\ \| ExecutionResult \| void +• `Optional` **onNext**: undefined \| (ctx: [Context](_server_.context.md)\, message: [NextMessage](_message_.nextmessage.md), args: ExecutionArgs, result: ExecutionResult) => Promise\ \| ExecutionResult \| void Executed after an operation has emitted a result right before that result has been sent to the client. Results from both @@ -178,7 +169,7 @@ ___ ### onOperation -• `Optional` **onOperation**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs, result: [OperationResult](../modules/_server_.md#operationresult)) => Promise\<[OperationResult](../modules/_server_.md#operationresult) \| void> \| [OperationResult](../modules/_server_.md#operationresult) \| void +• `Optional` **onOperation**: undefined \| (ctx: [Context](_server_.context.md)\, message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs, result: [OperationResult](../modules/_server_.md#operationresult)) => Promise\<[OperationResult](../modules/_server_.md#operationresult) \| void> \| [OperationResult](../modules/_server_.md#operationresult) \| void Executed after the operation call resolves. For streaming operations, triggering this callback does not necessarely @@ -203,7 +194,7 @@ ___ ### onSubscribe -• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md)) => Promise\ \| ExecutionArgs \| readonly GraphQLError[] \| void +• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md)\, message: [SubscribeMessage](_message_.subscribemessage.md)) => Promise\ \| ExecutionArgs \| readonly GraphQLError[] \| void The subscribe callback executed right after acknowledging the request before any payload diff --git a/docs/interfaces/_server_.websocket.md b/docs/interfaces/_server_.websocket.md new file mode 100644 index 00000000..b333d583 --- /dev/null +++ b/docs/interfaces/_server_.websocket.md @@ -0,0 +1,101 @@ +**[graphql-ws](../README.md)** + +> [Globals](../README.md) / ["server"](../modules/_server_.md) / WebSocket + +# Interface: WebSocket + +## Hierarchy + +* **WebSocket** + +## Index + +### Properties + +* [protocol](_server_.websocket.md#protocol) + +### Methods + +* [close](_server_.websocket.md#close) +* [onMessage](_server_.websocket.md#onmessage) +* [send](_server_.websocket.md#send) + +## Properties + +### protocol + +• `Readonly` **protocol**: string + +The subprotocol of the WebSocket. Will be used +to validate agains the supported ones. + +## Methods + +### close + +▸ **close**(`code`: number, `reason`: string): Promise\ \| void + +Closes the socket gracefully. Will always provide +the appropriate code and close reason. + +The returned promise is used to control the graceful +closure. + +#### Parameters: + +Name | Type | +------ | ------ | +`code` | number | +`reason` | string | + +**Returns:** Promise\ \| void + +___ + +### onMessage + +▸ **onMessage**(`cb`: (data: string) => Promise\): void + +Called when message is received. The library requires the data +to be a `string`. + +All operations requested from the client will block the promise until +completed, this means that the callback will not resolve until all +subscription events have been emittet (or until the client has completed +the stream), or until the query/mutation resolves. + +Exceptions raised during any phase of operation processing will +reject the callback's promise, catch them and communicate them +to your clients however you wish. + +#### Parameters: + +Name | Type | +------ | ------ | +`cb` | (data: string) => Promise\ | + +**Returns:** void + +___ + +### send + +▸ **send**(`data`: string): Promise\ \| void + +Sends a message through the socket. Will always +provide a `string` message. + +Please take care that the send is ready. Meaning, +only provide a truly OPEN socket through the `opened` +method of the `Server`. + +The returned promise is used to control the flow of data +(like handling backpressure). + +#### Parameters: + +Name | Type | +------ | ------ | +`data` | string | + +**Returns:** Promise\ \| void diff --git a/docs/interfaces/_types_.disposable.md b/docs/interfaces/_types_.disposable.md index 4ccba627..f9b20e4a 100644 --- a/docs/interfaces/_types_.disposable.md +++ b/docs/interfaces/_types_.disposable.md @@ -10,8 +10,6 @@ ↳ [Client](_client_.client.md) - ↳ [Server](_server_.server.md) - ## Index ### Properties diff --git a/docs/interfaces/_use_ws_.extra.md b/docs/interfaces/_use_ws_.extra.md new file mode 100644 index 00000000..ea5e206e --- /dev/null +++ b/docs/interfaces/_use_ws_.extra.md @@ -0,0 +1,35 @@ +**[graphql-ws](../README.md)** + +> [Globals](../README.md) / ["use/ws"](../modules/_use_ws_.md) / Extra + +# Interface: Extra + +The extra that will be put in the `Context`. + +## Hierarchy + +* **Extra** + +## Index + +### Properties + +* [request](_use_ws_.extra.md#request) +* [socket](_use_ws_.extra.md#socket) + +## Properties + +### request + +• `Readonly` **request**: IncomingMessage + +The initial HTTP request before the actual +socket and connection is established. + +___ + +### socket + +• `Readonly` **socket**: WebSocket + +The actual socket connection between the server and the client. diff --git a/docs/modules/_server_.md b/docs/modules/_server_.md index 07c73d72..467a5770 100644 --- a/docs/modules/_server_.md +++ b/docs/modules/_server_.md @@ -11,6 +11,7 @@ * [Context](../interfaces/_server_.context.md) * [Server](../interfaces/_server_.server.md) * [ServerOptions](../interfaces/_server_.serveroptions.md) +* [WebSocket](../interfaces/_server_.websocket.md) ### Type aliases @@ -19,7 +20,7 @@ ### Functions -* [createServer](_server_.md#createserver) +* [makeServer](_server_.md#makeserver) ## Type aliases @@ -42,19 +43,26 @@ ___ ## Functions -### createServer +### makeServer -▸ **createServer**(`options`: [ServerOptions](../interfaces/_server_.serveroptions.md), `websocketOptionsOrServer`: WebSocketServerOptions \| WebSocketServer): [Server](../interfaces/_server_.server.md) +▸ **makeServer**\(`options`: [ServerOptions](../interfaces/_server_.serveroptions.md)\): [Server](../interfaces/_server_.server.md)\ -Creates a protocol complient WebSocket GraphQL -subscription server. Read more about the protocol -in the PROTOCOL.md documentation file. +Makes a Protocol complient WebSocket GraphQL server. The server +is actually an API which is to be used with your favourite WebSocket +server library! + +Read more about the Protocol in the PROTOCOL.md documentation file. + +#### Type parameters: + +Name | Default | +------ | ------ | +`E` | unknown | #### Parameters: Name | Type | ------ | ------ | -`options` | [ServerOptions](../interfaces/_server_.serveroptions.md) | -`websocketOptionsOrServer` | WebSocketServerOptions \| WebSocketServer | +`options` | [ServerOptions](../interfaces/_server_.serveroptions.md)\ | -**Returns:** [Server](../interfaces/_server_.server.md) +**Returns:** [Server](../interfaces/_server_.server.md)\ diff --git a/docs/modules/_use_ws_.md b/docs/modules/_use_ws_.md new file mode 100644 index 00000000..88819f70 --- /dev/null +++ b/docs/modules/_use_ws_.md @@ -0,0 +1,34 @@ +**[graphql-ws](../README.md)** + +> [Globals](../README.md) / "use/ws" + +# Module: "use/ws" + +## Index + +### Interfaces + +* [Extra](../interfaces/_use_ws_.extra.md) + +### Functions + +* [useServer](_use_ws_.md#useserver) + +## Functions + +### useServer + +▸ **useServer**(`options`: [ServerOptions](../interfaces/_server_.serveroptions.md)\<[Extra](../interfaces/_use_ws_.extra.md)>, `ws`: WebSocketServer, `keepAlive?`: number): [Disposable](../interfaces/_types_.disposable.md) + +Use the server on a [ws](https://github.com/websockets/ws) ws server. +This is a basic starter, feel free to copy the code over and adjust it to your needs + +#### Parameters: + +Name | Type | Default value | +------ | ------ | ------ | +`options` | [ServerOptions](../interfaces/_server_.serveroptions.md)\<[Extra](../interfaces/_use_ws_.extra.md)> | - | +`ws` | WebSocketServer | - | +`keepAlive` | number | 12 * 1000 | + +**Returns:** [Disposable](../interfaces/_types_.disposable.md) diff --git a/jest.config.js b/jest.config.js index 95a76c22..864bf74f 100644 --- a/jest.config.js +++ b/jest.config.js @@ -2,5 +2,5 @@ module.exports = { testEnvironment: 'node', moduleFileExtensions: ['ts', 'js'], testRegex: '/tests/.+.ts$', - testPathIgnorePatterns: ['/node_modules/', '/fixtures/'], + testPathIgnorePatterns: ['/node_modules/', '/fixtures/', '/utils/'], }; diff --git a/package.json b/package.json index 393e2b62..610f23a0 100644 --- a/package.json +++ b/package.json @@ -50,9 +50,6 @@ "peerDependencies": { "graphql": ">=0.11 <=15" }, - "dependencies": { - "ws": "^7.4.0" - }, "devDependencies": { "@babel/core": "^7.12.3", "@babel/plugin-proposal-class-properties": "^7.12.1", @@ -80,6 +77,7 @@ "semantic-release": "^17.2.2", "typedoc": "^0.19.2", "typedoc-plugin-markdown": "^3.0.11", - "typescript": "^4.0.5" + "typescript": "^4.0.5", + "ws": "^7.4.0" } } diff --git a/src/server.ts b/src/server.ts index 2dca782b..6e1cf96d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -4,8 +4,6 @@ * */ -import * as http from 'http'; -import * as WebSocket from 'ws'; import { OperationTypeNode, GraphQLSchema, @@ -17,7 +15,6 @@ import { SubscriptionArgs, ExecutionResult, } from 'graphql'; -import { Disposable } from './types'; import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from './protocol'; import { Message, @@ -29,13 +26,7 @@ import { ErrorMessage, CompleteMessage, } from './message'; -import { - isObject, - isAsyncIterable, - hasOwnObjectProperty, - hasOwnStringProperty, - areGraphQLErrors, -} from './utils'; +import { isObject, isAsyncIterable, areGraphQLErrors } from './utils'; import { ID } from './types'; export type OperationResult = @@ -61,7 +52,7 @@ export type GraphQLExecutionContextValue = | undefined | null; -export interface ServerOptions { +export interface ServerOptions { /** * The GraphQL schema on which the operations * will be executed and validated against. @@ -88,7 +79,7 @@ export interface ServerOptions { context?: | GraphQLExecutionContextValue | (( - ctx: Context, + ctx: Context, message: SubscribeMessage, args: ExecutionArgs, ) => @@ -141,16 +132,6 @@ export interface ServerOptions { * @default 3 * 1000 (3 seconds) */ connectionInitWaitTimeout?: number; - /** - * The timout between dispatched keep-alive messages. Internally the lib - * uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between - * the clients and the server is operating and to prevent the link from being broken due to idling. - * - * Set to nullish value to disable. - * - * @default 12 * 1000 (12 seconds) - */ - keepAlive?: number; /** * Is the connection callback called when the * client requests the connection initialisation @@ -176,7 +157,7 @@ export interface ServerOptions { * in the close event reason. */ onConnect?: ( - ctx: Context, + ctx: Context, ) => | Promise | boolean | void> | Record @@ -212,7 +193,7 @@ export interface ServerOptions { * in the close event reason. */ onSubscribe?: ( - ctx: Context, + ctx: Context, message: SubscribeMessage, ) => | Promise @@ -240,7 +221,7 @@ export interface ServerOptions { * in the close event reason. */ onOperation?: ( - ctx: Context, + ctx: Context, message: SubscribeMessage, args: ExecutionArgs, result: OperationResult, @@ -259,7 +240,7 @@ export interface ServerOptions { * in the close event reason. */ onError?: ( - ctx: Context, + ctx: Context, message: ErrorMessage, errors: readonly GraphQLError[], ) => Promise | readonly GraphQLError[] | void; @@ -278,7 +259,7 @@ export interface ServerOptions { * in the close event reason. */ onNext?: ( - ctx: Context, + ctx: Context, message: NextMessage, args: ExecutionArgs, result: ExecutionResult, @@ -296,61 +277,108 @@ export interface ServerOptions { * operations even after an abrupt closure, this callback * will still be called. */ - onComplete?: (ctx: Context, message: CompleteMessage) => Promise | void; + onComplete?: ( + ctx: Context, + message: CompleteMessage, + ) => Promise | void; } -export interface Context { +export interface Server { /** - * The actual WebSocket connection between the server and the client. + * New socket has beeen established. The lib will validate + * the protocol and use the socket accordingly. Returned promise + * will resolve after the socket closes. + * + * The second argument will be passed in the `extra` field + * of the `Context`. You may pass the initial request or the + * original WebSocket, if you need it down the road. + * + * Returns a function that should be called when the same socket + * has been closed, for whatever reason. The returned promise will + * resolve once the internal cleanup is complete. */ - readonly socket: WebSocket; + opened(socket: WebSocket, ctxExtra: E): () => Promise; // closed +} + +export interface WebSocket { /** - * The initial HTTP request before the actual - * socket and connection is established. + * The subprotocol of the WebSocket. Will be used + * to validate agains the supported ones. */ - readonly request: http.IncomingMessage; + readonly protocol: string; + /** + * Sends a message through the socket. Will always + * provide a `string` message. + * + * Please take care that the send is ready. Meaning, + * only provide a truly OPEN socket through the `opened` + * method of the `Server`. + * + * The returned promise is used to control the flow of data + * (like handling backpressure). + */ + send(data: string): Promise | void; + /** + * Closes the socket gracefully. Will always provide + * the appropriate code and close reason. + * + * The returned promise is used to control the graceful + * closure. + */ + close(code: number, reason: string): Promise | void; + /** + * Called when message is received. The library requires the data + * to be a `string`. + * + * All operations requested from the client will block the promise until + * completed, this means that the callback will not resolve until all + * subscription events have been emittet (or until the client has completed + * the stream), or until the query/mutation resolves. + * + * Exceptions raised during any phase of operation processing will + * reject the callback's promise, catch them and communicate them + * to your clients however you wish. + */ + onMessage(cb: (data: string) => Promise): void; +} + +export interface Context { /** * Indicates that the `ConnectionInit` message * has been received by the server. If this is * `true`, the client wont be kicked off after * the wait timeout has passed. */ - connectionInitReceived: boolean; + readonly connectionInitReceived: boolean; /** * Indicates that the connection was acknowledged * by having dispatched the `ConnectionAck` message * to the related client. */ - acknowledged: boolean; + readonly acknowledged: boolean; /** The parameters passed during the connection initialisation. */ - connectionParams?: Readonly>; + readonly connectionParams?: Readonly>; /** * Holds the active subscriptions for this context. * Subscriptions are for **streaming operations only**, * those that resolve once wont be added here. */ - subscriptions: Record>; -} - -export interface Server extends Disposable { - webSocketServer: WebSocket.Server; + readonly subscriptions: Record>; + /** + * An extra field where you can store your own context values + * to pass between callbacks. + */ + extra: E; } -// for documentation gen only -type WebSocketServerOptions = WebSocket.ServerOptions; -type WebSocketServer = WebSocket.Server; - /** - * Creates a protocol complient WebSocket GraphQL - * subscription server. Read more about the protocol - * in the PROTOCOL.md documentation file. + * Makes a Protocol complient WebSocket GraphQL server. The server + * is actually an API which is to be used with your favourite WebSocket + * server library! + * + * Read more about the Protocol in the PROTOCOL.md documentation file. */ -export function createServer( - options: ServerOptions, - websocketOptionsOrServer: WebSocketServerOptions | WebSocketServer, -): Server { - const isProd = process.env.NODE_ENV === 'production'; - +export function makeServer(options: ServerOptions): Server { const { schema, context, @@ -358,7 +386,6 @@ export function createServer( execute, subscribe, connectionInitWaitTimeout = 3 * 1000, // 3 seconds - keepAlive = 12 * 1000, // 12 seconds onConnect, onSubscribe, onOperation, @@ -366,177 +393,87 @@ export function createServer( onError, onComplete, } = options; - const webSocketServer = isWebSocketServer(websocketOptionsOrServer) - ? websocketOptionsOrServer - : new WebSocket.Server(websocketOptionsOrServer); - webSocketServer.on('connection', handleConnection); - function handleConnection(socket: WebSocket, request: http.IncomingMessage) { - if ( - Array.isArray(socket.protocol) - ? socket.protocol.indexOf(GRAPHQL_TRANSPORT_WS_PROTOCOL) === -1 - : socket.protocol !== GRAPHQL_TRANSPORT_WS_PROTOCOL - ) { - return socket.close(1002, 'Protocol Error'); - } + return { + opened(socket, extra) { + if (socket.protocol !== GRAPHQL_TRANSPORT_WS_PROTOCOL) { + socket.close(1002, 'Protocol Error'); + return async () => { + /* nothing was set up */ + }; + } - const ctxRef: { current: Context } = { - current: { - socket, - request, + const ctx: Context = { connectionInitReceived: false, acknowledged: false, subscriptions: {}, - }, - }; - - // kick the client off (close socket) if the connection has - // not been initialised after the specified wait timeout - const connectionInitWait = - connectionInitWaitTimeout > 0 && isFinite(connectionInitWaitTimeout) - ? setTimeout(() => { - if (!ctxRef.current.connectionInitReceived) { - ctxRef.current.socket.close( - 4408, - 'Connection initialisation timeout', - ); - } - }, connectionInitWaitTimeout) - : null; - - // keep alive through ping-pong messages - // read more about the websocket heartbeat here: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets - let pongWait: NodeJS.Timeout | null = null; - const pingInterval = - keepAlive > 0 && isFinite(keepAlive) - ? setInterval(() => { - // ping pong on open sockets only - if (socket.readyState === WebSocket.OPEN) { - // terminate the connection after pong wait has passed because the client is idle - pongWait = setTimeout(() => { - socket.terminate(); - }, keepAlive); - - // listen for client's pong and stop socket termination - socket.once('pong', () => { - if (pongWait) { - clearTimeout(pongWait); - pongWait = null; - } - }); - - socket.ping(); - } - }, keepAlive) - : null; - - function errorOrCloseHandler( - errorOrClose: WebSocket.ErrorEvent | WebSocket.CloseEvent, - ) { - if (connectionInitWait) { - clearTimeout(connectionInitWait); - } - if (pongWait) { - clearTimeout(pongWait); - } - if (pingInterval) { - clearInterval(pingInterval); - } - - if (isErrorEvent(errorOrClose)) { - ctxRef.current.socket.close( - 1011, - isProd ? 'Internal Error' : errorOrClose.message, - ); - } - - Object.values(ctxRef.current.subscriptions).forEach((subscription) => { - subscription.return?.(); - }); - } - - socket.onerror = errorOrCloseHandler; - socket.onclose = errorOrCloseHandler; - socket.onmessage = makeOnMessage(ctxRef.current); - } - - webSocketServer.on('error', (err) => { - // catch the first thrown error and re-throw it once all clients have been notified - let firstErr: Error | null = null; - - // report server errors by erroring out all clients with the same error - for (const client of webSocketServer.clients) { - try { - client.emit('error', err); - } catch (err) { - firstErr = firstErr ?? err; - } - } - - if (firstErr) { - throw firstErr; - } - }); - - // Sends through a message only if the socket is open. - async function sendMessage( - ctx: Context, - message: Message, - ) { - if (ctx.socket.readyState === WebSocket.OPEN) { - return new Promise((resolve, reject) => { - ctx.socket.send(stringifyMessage(message), (err) => - err ? reject(err) : resolve(), - ); - }); - } - } - - function makeOnMessage(ctx: Context) { - return async function onMessage(event: WebSocket.MessageEvent) { - try { - const message = parseMessage(event.data); + extra, + }; + + // kick the client off (close socket) if the connection has + // not been initialised after the specified wait timeout + const connectionInitWait = + connectionInitWaitTimeout > 0 && isFinite(connectionInitWaitTimeout) + ? setTimeout(() => { + if (!ctx.connectionInitReceived) { + socket.close(4408, 'Connection initialisation timeout'); + } + }, connectionInitWaitTimeout) + : null; + + socket.onMessage(async function onMessage(data) { + let message: Message; + try { + message = parseMessage(data); + } catch (err) { + return socket.close(4400, 'Invalid message received'); + } switch (message.type) { case MessageType.ConnectionInit: { if (ctx.connectionInitReceived) { - return ctx.socket.close(4429, 'Too many initialisation requests'); + return socket.close(4429, 'Too many initialisation requests'); } + // @ts-expect-error: I can write ctx.connectionInitReceived = true; if (isObject(message.payload)) { + // @ts-expect-error: I can write ctx.connectionParams = message.payload; } const permittedOrPayload = await onConnect?.(ctx); if (permittedOrPayload === false) { - return ctx.socket.close(4403, 'Forbidden'); + return socket.close(4403, 'Forbidden'); } - await sendMessage( - ctx, - isObject(permittedOrPayload) - ? { - type: MessageType.ConnectionAck, - payload: permittedOrPayload, - } - : { - type: MessageType.ConnectionAck, - // payload is completely absent if not provided - }, + await socket.send( + stringifyMessage( + isObject(permittedOrPayload) + ? { + type: MessageType.ConnectionAck, + payload: permittedOrPayload, + } + : { + type: MessageType.ConnectionAck, + // payload is completely absent if not provided + }, + ), ); + // @ts-expect-error: I can write ctx.acknowledged = true; break; } case MessageType.Subscribe: { if (!ctx.acknowledged) { - return ctx.socket.close(4401, 'Unauthorized'); + return socket.close(4401, 'Unauthorized'); } + const id = message.id; const emit = { next: async (result: ExecutionResult, args: ExecutionArgs) => { let nextMessage: NextMessage = { - id: message.id, + id, type: MessageType.Next, payload: result, }; @@ -554,11 +491,13 @@ export function createServer( }; } } - await sendMessage(ctx, nextMessage); + await socket.send( + stringifyMessage(nextMessage), + ); }, error: async (errors: readonly GraphQLError[]) => { let errorMessage: ErrorMessage = { - id: message.id, + id, type: MessageType.Error, payload: errors, }; @@ -571,16 +510,20 @@ export function createServer( }; } } - await sendMessage(ctx, errorMessage); + await socket.send( + stringifyMessage(errorMessage), + ); }, complete: async (notifyClient: boolean) => { const completeMessage: CompleteMessage = { - id: message.id, + id, type: MessageType.Complete, }; await onComplete?.(ctx, completeMessage); if (notifyClient) { - await sendMessage(ctx, completeMessage); + await socket.send( + stringifyMessage(completeMessage), + ); } }, }; @@ -601,10 +544,7 @@ export function createServer( if (!schema) { // you either provide a schema dynamically through // `onSubscribe` or you set one up during the server setup - return webSocketServer.emit( - 'error', - new Error('The GraphQL schema is not provided'), - ); + throw new Error('The GraphQL schema is not provided'); } const { operationName, query, variables } = message.payload; @@ -614,7 +554,6 @@ export function createServer( document: parse(query), variableValues: variables, }; - const validationErrors = validate( execArgs.schema, execArgs.document, @@ -673,13 +612,13 @@ export function createServer( /** multiple emitted results */ // iterable subscriptions are distinct on ID - if (ctx.subscriptions[message.id]) { - return ctx.socket.close( + if (ctx.subscriptions[id]) { + return socket.close( 4409, - `Subscriber for ${message.id} already exists`, + `Subscriber for ${id} already exists`, ); } - ctx.subscriptions[message.id] = operationResult; + ctx.subscriptions[id] = operationResult; for await (const result of operationResult) { await emit.next(result, execArgs); @@ -687,8 +626,8 @@ export function createServer( // lack of subscription at this point indicates that the client // completed the stream, he doesnt need to be reminded - await emit.complete(Boolean(ctx.subscriptions[message.id])); - delete ctx.subscriptions[message.id]; + await emit.complete(Boolean(ctx.subscriptions[id])); + delete ctx.subscriptions[id]; } else { /** single emitted result */ @@ -707,38 +646,15 @@ export function createServer( `Unexpected message of type ${message.type} received`, ); } - } catch (err) { - // TODO-db-201031 we perceive this as a client bad request error, but is it always? - ctx.socket.close(4400, isProd ? 'Bad Request' : err.message); - } - }; - } - - return { - webSocketServer, - dispose: async () => { - for (const client of webSocketServer.clients) { - client.close(1001, 'Going away'); - } - - webSocketServer.removeAllListeners(); + }); - await new Promise((resolve, reject) => - webSocketServer.close((err) => (err ? reject(err) : resolve())), - ); + // wait for close and cleanup + return async () => { + if (connectionInitWait) clearTimeout(connectionInitWait); + for (const sub of Object.values(ctx.subscriptions)) { + await sub.return?.(); + } + }; }, }; } - -function isErrorEvent(obj: unknown): obj is WebSocket.ErrorEvent { - return ( - isObject(obj) && - hasOwnObjectProperty(obj, 'error') && - hasOwnStringProperty(obj, 'message') && - hasOwnStringProperty(obj, 'type') - ); -} - -function isWebSocketServer(obj: unknown): obj is WebSocketServer { - return isObject(obj) && typeof obj.on === 'function'; -} diff --git a/src/tests/client.ts b/src/tests/client.ts index 4658bdeb..fc4ef8c9 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -451,7 +451,7 @@ describe('subscription operation', () => { it('should stop dispatching messages after completing a subscription', async () => { const { url, - server, + clients, waitForOperation, waitForComplete, } = await startTServer(); @@ -461,7 +461,7 @@ describe('subscription operation', () => { }); await waitForOperation(); - for (const client of server.webSocketServer.clients) { + for (const client of clients) { client.once('message', () => { // no more messages from the client fail("Shouldn't have dispatched a message"); diff --git a/src/tests/fixtures/simple.ts b/src/tests/fixtures/simple.ts index 17e2649f..3bc8b8f1 100644 --- a/src/tests/fixtures/simple.ts +++ b/src/tests/fixtures/simple.ts @@ -10,7 +10,8 @@ import { EventEmitter } from 'events'; import WebSocket from 'ws'; import net from 'net'; import http from 'http'; -import { createServer, ServerOptions, Server, Context } from '../../server'; +import { ServerOptions, Context } from '../../server'; +import { useServer, Extra } from '../../use/ws'; // distinct server for each test; if you forget to dispose, the fixture wont const leftovers: Dispose[] = []; @@ -25,7 +26,7 @@ afterEach(async () => { export interface TServer { url: string; - server: Server; + ws: WebSocket.Server; clients: Set; pong: (key?: string) => void; waitForClient: ( @@ -33,7 +34,7 @@ export interface TServer { expire?: number, ) => Promise; waitForConnect: ( - test?: (ctx: Context) => void, + test?: (ctx: Context) => void, expire?: number, ) => Promise; waitForOperation: (test?: () => void, expire?: number) => Promise; @@ -119,7 +120,8 @@ export const schema = new GraphQLSchema({ }); export async function startTServer( - options: Partial = {}, + options: Partial> = {}, + keepAlive?: number, // for ws tests sake ): Promise { const path = '/simple'; const emitter = new EventEmitter(); @@ -138,10 +140,14 @@ export async function startTServer( }); // create server and hook up for tracking operations - const pendingConnections: Context[] = []; + const pendingConnections: Context[] = []; let pendingOperations = 0, pendingCompletes = 0; - const server = await createServer( + const ws = new WebSocket.Server({ + server: httpServer, + path, + }); + const server = await useServer( { schema, execute, @@ -170,10 +176,8 @@ export async function startTServer( emitter.emit('compl'); }, }, - { - server: httpServer, - path, - }, + ws, + keepAlive, ); // search for open port from the starting port @@ -208,7 +212,7 @@ export async function startTServer( // pending websocket clients let pendingCloses = 0; const pendingClients: WebSocket[] = []; - server.webSocketServer.on('connection', (client) => { + ws.on('connection', (client) => { pendingClients.push(client); client.once('close', () => { pendingCloses++; @@ -243,9 +247,9 @@ export async function startTServer( return { url: `ws://localhost:${addr.port}${path}`, - server, + ws, get clients() { - return server.webSocketServer.clients; + return ws.clients; }, pong, waitForClient(test, expire) { @@ -260,10 +264,10 @@ export async function startTServer( if (pendingClients.length > 0) { return done(); } - server.webSocketServer.once('connection', done); + ws.once('connection', done); if (expire) { setTimeout(() => { - server.webSocketServer.off('connection', done); // expired + ws.off('connection', done); // expired resolve(); }, expire); } diff --git a/src/tests/server.ts b/src/tests/server.ts index 4706e0df..a0120b48 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -1,4 +1,3 @@ -import WebSocket from 'ws'; import { parse, buildSchema, @@ -8,85 +7,9 @@ import { ExecutionArgs, } from 'graphql'; import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol'; -import { - MessageType, - parseMessage, - stringifyMessage, - SubscribePayload, -} from '../message'; +import { MessageType, parseMessage, stringifyMessage } from '../message'; import { schema, startTServer } from './fixtures/simple'; - -function createTClient( - url: string, - protocols: string | string[] = GRAPHQL_TRANSPORT_WS_PROTOCOL, -) { - let closeEvent: WebSocket.CloseEvent; - const queue: WebSocket.MessageEvent[] = []; - return new Promise<{ - ws: WebSocket; - waitForMessage: ( - test?: (data: WebSocket.MessageEvent) => void, - expire?: number, - ) => Promise; - waitForClose: ( - test?: (event: WebSocket.CloseEvent) => void, - expire?: number, - ) => Promise; - }>((resolve) => { - const ws = new WebSocket(url, protocols); - ws.onclose = (event) => (closeEvent = event); // just so that none are missed - ws.onmessage = (message) => queue.push(message); // guarantee message delivery with a queue - ws.once('open', () => - resolve({ - ws, - async waitForMessage(test, expire) { - return new Promise((resolve) => { - const done = () => { - // the onmessage listener above will be called before our listener, populating the queue - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const next = queue.shift()!; - test?.(next); - resolve(); - }; - if (queue.length > 0) { - return done(); - } - ws.once('message', done); - if (expire) { - setTimeout(() => { - ws.removeListener('message', done); // expired - resolve(); - }, expire); - } - }); - }, - async waitForClose( - test?: (event: WebSocket.CloseEvent) => void, - expire?: number, - ) { - return new Promise((resolve) => { - if (closeEvent) { - test?.(closeEvent); - return resolve(); - } - ws.onclose = (event) => { - closeEvent = event; - test?.(event); - resolve(); - }; - if (expire) { - setTimeout(() => { - // @ts-expect-error: its ok - ws.onclose = null; // expired - resolve(); - }, expire); - } - }); - }, - }), - ); - }); -} +import { createTClient } from './utils'; /** * Tests @@ -126,44 +49,6 @@ it('should allow connections with valid protocols only', async () => { ); }); -it('should gracefully go away when disposing', async () => { - const server = await startTServer(); - - const client1 = await createTClient(server.url); - const client2 = await createTClient(server.url); - - await server.dispose(true); - - await client1.waitForClose((event) => { - expect(event.code).toBe(1001); - expect(event.reason).toBe('Going away'); - expect(event.wasClean).toBeTruthy(); - }); - await client2.waitForClose((event) => { - expect(event.code).toBe(1001); - expect(event.reason).toBe('Going away'); - expect(event.wasClean).toBeTruthy(); - }); -}); - -it('should report server errors to clients by closing the connection', async () => { - const { - url, - server: { webSocketServer }, - } = await startTServer(); - - const client = await createTClient(url); - - const emittedError = new Error("I'm a teapot"); - webSocketServer.emit('error', emittedError); - - await client.waitForClose((event) => { - expect(event.code).toBe(1011); // 1011: Internal Error - expect(event.reason).toBe(emittedError.message); - expect(event.wasClean).toBeTruthy(); // because the server reported the error - }); -}); - it('should use the provided roots as resolvers', async () => { const schema = buildSchema(` type Query { @@ -452,41 +337,6 @@ it('should prefer the `onSubscribe` context value even if `context` option is se ); }); -it('should handle errors thrown from client error listeners', async () => { - const { server, url } = await startTServer(); - - const client = await createTClient(url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - await client.waitForMessage(({ data }) => { - expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); - }); - - const surpriseErr1 = new Error('Well hello there!'); - const surpriseErr2 = new Error('I wont be thrown!'); // first to throw stops emission - for (const client of server.webSocketServer.clients) { - client.on('error', () => { - throw surpriseErr1; - }); - client.on('error', () => { - throw surpriseErr2; - }); - } - - expect(() => { - server.webSocketServer.emit('error', new Error('I am a nice error')); - }).toThrowError(surpriseErr1); - - await client.waitForClose((event) => { - expect(event.code).toBe(1011); - expect(event.reason).toBe('I am a nice error'); - expect(event.wasClean).toBeTruthy(); - }); -}); - describe('Connect', () => { it('should refuse connection and close socket if returning `false`', async () => { const { url } = await startTServer({ @@ -510,29 +360,6 @@ describe('Connect', () => { }); }); - it('should close socket with error thrown from the callback', async () => { - const error = new Error("I'm a teapot"); - - const { url } = await startTServer({ - onConnect: () => { - throw error; - }, - }); - - const client = await createTClient(url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - await client.waitForClose((event) => { - expect(event.code).toBe(4400); - expect(event.reason).toBe(error.message); - expect(event.wasClean).toBeTruthy(); - }); - }); - it('should acknowledge connection if not implemented, returning `true` or nothing', async () => { async function test(url: string) { const client = await createTClient(url); @@ -736,159 +563,6 @@ describe('Subscribe', () => { }); }); - it('should close the socket on request if schema is left undefined', async () => { - const { url } = await startTServer({ - schema: undefined, - }); - - const client = await createTClient(url); - - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - await client.waitForMessage(({ data }) => { - expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); - client.ws.send( - stringifyMessage({ - id: '1', - type: MessageType.Subscribe, - payload: { - operationName: 'TestString', - query: `query TestString { - getValue - }`, - variables: {}, - }, - }), - ); - }); - - await client.waitForClose((event) => { - expect(event.code).toBe(1011); - expect(event.reason).toBe('The GraphQL schema is not provided'); - expect(event.wasClean).toBeTruthy(); - }); - }); - - it('should close the socket with errors thrown from any callback', async () => { - const error = new Error('Stop'); - - // onConnect - let server = await startTServer({ - onConnect: () => { - throw error; - }, - }); - const client = await createTClient(server.url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - await client.waitForClose((event) => { - expect(event.code).toBe(4400); - expect(event.reason).toBe(error.message); - expect(event.wasClean).toBeTruthy(); - }); - await server.dispose(); - - async function test( - url: string, - payload: SubscribePayload = { - query: `query { getValue }`, - }, - ) { - const client = await createTClient(url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - await client.waitForMessage(({ data }) => { - expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); - client.ws.send( - stringifyMessage({ - id: '1', - type: MessageType.Subscribe, - payload, - }), - ); - }); - - await client.waitForClose((event) => { - expect(event.code).toBe(4400); - expect(event.reason).toBe(error.message); - expect(event.wasClean).toBeTruthy(); - }); - } - - // onSubscribe - server = await startTServer({ - onSubscribe: () => { - throw error; - }, - }); - await test(server.url); - await server.dispose(); - - server = await startTServer({ - onOperation: () => { - throw error; - }, - }); - await test(server.url); - await server.dispose(); - - // execute - server = await startTServer({ - execute: () => { - throw error; - }, - }); - await test(server.url); - await server.dispose(); - - // subscribe - server = await startTServer({ - subscribe: () => { - throw error; - }, - }); - await test(server.url, { query: 'subscription { greetings }' }); - await server.dispose(); - - // onNext - server = await startTServer({ - onNext: () => { - throw error; - }, - }); - await test(server.url); - await server.dispose(); - - // onError - server = await startTServer({ - onError: () => { - throw error; - }, - }); - await test(server.url, { query: 'query { noExisto }' }); - await server.dispose(); - - // onComplete - server = await startTServer({ - onComplete: () => { - throw error; - }, - }); - await test(server.url); - await server.dispose(); - }); - it('should directly use the execution arguments returned from `onSubscribe`', async () => { const nopeArgs = { schema, @@ -1000,44 +674,6 @@ describe('Subscribe', () => { }, 30); }); - it('should close the socket on empty arrays returned from `onSubscribe`', async () => { - const { url } = await startTServer({ - onSubscribe: () => { - return []; - }, - }); - - const client = await createTClient(url); - - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - await client.waitForMessage(({ data }) => { - expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); - }); - - client.ws.send( - stringifyMessage({ - id: '1', - type: MessageType.Subscribe, - payload: { - query: 'subscription { ping }', - }, - }), - ); - - await client.waitForClose((event) => { - expect(event.code).toBe(4400); - expect(event.reason).toBe( - 'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects', - ); - expect(event.wasClean).toBeTruthy(); - }); - }); - it('should use the execution result returned from `onNext`', async () => { const { url } = await startTServer({ onNext: (_ctx, _message) => { @@ -1638,64 +1274,3 @@ describe('Subscribe', () => { client.ws.terminate(); }); }); - -describe('Keep-Alive', () => { - it('should dispatch pings after the timeout has passed', async (done) => { - const { url } = await startTServer({ - keepAlive: 50, - }); - - const client = await createTClient(url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - client.ws.once('ping', () => done()); - }); - - it('should not dispatch pings if disabled with nullish timeout', async (done) => { - const { url } = await startTServer({ - keepAlive: 0, - }); - - const client = await createTClient(url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - client.ws.once('ping', () => fail('Shouldnt have pinged')); - - setTimeout(done, 50); - }); - - it('should terminate the socket if no pong is sent in response to a ping', async () => { - const { url } = await startTServer({ - keepAlive: 50, - }); - - const client = await createTClient(url); - client.ws.send( - stringifyMessage({ - type: MessageType.ConnectionInit, - }), - ); - - // disable pong - client.ws.pong = () => { - /**/ - }; - - // ping is received - await new Promise((resolve) => client.ws.once('ping', resolve)); - - // termination is not graceful or clean - await client.waitForClose((event) => { - expect(event.code).toBe(1006); - expect(event.wasClean).toBeFalsy(); - }); - }); -}); diff --git a/src/tests/use/ws.ts b/src/tests/use/ws.ts new file mode 100644 index 00000000..e7b45a9c --- /dev/null +++ b/src/tests/use/ws.ts @@ -0,0 +1,332 @@ +import WebSocket from 'ws'; +import http from 'http'; +import { + MessageType, + stringifyMessage, + parseMessage, + SubscribePayload, +} from '../../message'; +import { startTServer } from '../fixtures/simple'; +import { createTClient } from '../utils'; + +it('should gracefully go away when disposing', async () => { + const server = await startTServer(); + + const client1 = await createTClient(server.url); + const client2 = await createTClient(server.url); + + await server.dispose(true); + + await client1.waitForClose((event) => { + expect(event.code).toBe(1001); + expect(event.reason).toBe('Going away'); + expect(event.wasClean).toBeTruthy(); + }); + await client2.waitForClose((event) => { + expect(event.code).toBe(1001); + expect(event.reason).toBe('Going away'); + expect(event.wasClean).toBeTruthy(); + }); +}); + +it('should add the initial request and websocket in the context extra', async (done) => { + const server = await startTServer({ + onConnect: (ctx) => { + expect(ctx.extra.socket).toBeInstanceOf(WebSocket); + expect(ctx.extra.request).toBeInstanceOf(http.IncomingMessage); + done(); + return false; // reject client for sake of test + }, + }); + + const client = await createTClient(server.url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); +}); + +it('should close the socket with errors thrown from any callback', async () => { + const error = new Error('Stop'); + + // onConnect + let server = await startTServer({ + onConnect: () => { + throw error; + }, + }); + const client = await createTClient(server.url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + await client.waitForClose((event) => { + expect(event.code).toBe(1011); + expect(event.reason).toBe(error.message); + expect(event.wasClean).toBeTruthy(); + }); + await server.dispose(); + + async function test( + url: string, + payload: SubscribePayload = { + query: `query { getValue }`, + }, + ) { + const client = await createTClient(url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload, + }), + ); + }); + + await client.waitForClose((event) => { + expect(event.code).toBe(1011); + expect(event.reason).toBe(error.message); + expect(event.wasClean).toBeTruthy(); + }); + } + + // onSubscribe + server = await startTServer({ + onSubscribe: () => { + throw error; + }, + }); + await test(server.url); + await server.dispose(); + + server = await startTServer({ + onOperation: () => { + throw error; + }, + }); + await test(server.url); + await server.dispose(); + + // execute + server = await startTServer({ + execute: () => { + throw error; + }, + }); + await test(server.url); + await server.dispose(); + + // subscribe + server = await startTServer({ + subscribe: () => { + throw error; + }, + }); + await test(server.url, { query: 'subscription { greetings }' }); + await server.dispose(); + + // onNext + server = await startTServer({ + onNext: () => { + throw error; + }, + }); + await test(server.url); + await server.dispose(); + + // onError + server = await startTServer({ + onError: () => { + throw error; + }, + }); + await test(server.url, { query: 'query { noExisto }' }); + await server.dispose(); + + // onComplete + server = await startTServer({ + onComplete: () => { + throw error; + }, + }); + await test(server.url); + await server.dispose(); +}); + +it('should close the socket on request if schema is left undefined', async () => { + const { url } = await startTServer({ + schema: undefined, + }); + + const client = await createTClient(url); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestString', + query: `query TestString { + getValue + }`, + variables: {}, + }, + }), + ); + }); + + await client.waitForClose((event) => { + expect(event.code).toBe(1011); + expect(event.reason).toBe('The GraphQL schema is not provided'); + expect(event.wasClean).toBeTruthy(); + }); +}); + +it('should close the socket on empty arrays returned from `onSubscribe`', async () => { + const { url } = await startTServer({ + onSubscribe: () => { + return []; + }, + }); + + const client = await createTClient(url); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + }); + + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: 'subscription { ping }', + }, + }), + ); + + await client.waitForClose((event) => { + expect(event.code).toBe(1011); + expect(event.reason).toBe( + 'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects', + ); + expect(event.wasClean).toBeTruthy(); + }); +}); + +it('should close socket with error thrown from the callback', async () => { + const error = new Error("I'm a teapot"); + + const { url } = await startTServer({ + onConnect: () => { + throw error; + }, + }); + + const client = await createTClient(url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForClose((event) => { + expect(event.code).toBe(1011); + expect(event.reason).toBe(error.message); + expect(event.wasClean).toBeTruthy(); + }); +}); + +it('should report server errors to clients by closing the connection', async () => { + const { url, ws } = await startTServer(); + + const client = await createTClient(url); + + const emittedError = new Error("I'm a teapot"); + ws.emit('error', emittedError); + + await client.waitForClose((event) => { + expect(event.code).toBe(1011); // 1011: Internal Error + expect(event.reason).toBe(emittedError.message); + expect(event.wasClean).toBeTruthy(); // because the server reported the error + }); +}); + +describe('Keep-Alive', () => { + it('should dispatch pings after the timeout has passed', async (done) => { + const { url } = await startTServer(undefined, 50); + + const client = await createTClient(url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + client.ws.once('ping', () => done()); + }); + + it('should not dispatch pings if disabled with nullish timeout', async (done) => { + const { url } = await startTServer(undefined, 0); + + const client = await createTClient(url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + client.ws.once('ping', () => fail('Shouldnt have pinged')); + + setTimeout(done, 50); + }); + + it('should terminate the socket if no pong is sent in response to a ping', async () => { + const { url } = await startTServer(undefined, 50); + + const client = await createTClient(url); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + // disable pong + client.ws.pong = () => { + /**/ + }; + + // ping is received + await new Promise((resolve) => client.ws.once('ping', resolve)); + + // termination is not graceful or clean + await client.waitForClose((event) => { + expect(event.code).toBe(1006); + expect(event.wasClean).toBeFalsy(); + }); + }); +}); diff --git a/src/tests/utils/index.ts b/src/tests/utils/index.ts new file mode 100644 index 00000000..75f07a56 --- /dev/null +++ b/src/tests/utils/index.ts @@ -0,0 +1 @@ +export * from './tclient'; diff --git a/src/tests/utils/tclient.ts b/src/tests/utils/tclient.ts new file mode 100644 index 00000000..0949e376 --- /dev/null +++ b/src/tests/utils/tclient.ts @@ -0,0 +1,75 @@ +import WebSocket from 'ws'; +import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../../protocol'; + +// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types +export function createTClient( + url: string, + protocols: string | string[] = GRAPHQL_TRANSPORT_WS_PROTOCOL, +) { + let closeEvent: WebSocket.CloseEvent; + const queue: WebSocket.MessageEvent[] = []; + return new Promise<{ + ws: WebSocket; + waitForMessage: ( + test?: (data: WebSocket.MessageEvent) => void, + expire?: number, + ) => Promise; + waitForClose: ( + test?: (event: WebSocket.CloseEvent) => void, + expire?: number, + ) => Promise; + }>((resolve) => { + const ws = new WebSocket(url, protocols); + ws.onclose = (event) => (closeEvent = event); // just so that none are missed + ws.onmessage = (message) => queue.push(message); // guarantee message delivery with a queue + ws.once('open', () => + resolve({ + ws, + async waitForMessage(test, expire) { + return new Promise((resolve) => { + const done = () => { + // the onmessage listener above will be called before our listener, populating the queue + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const next = queue.shift()!; + test?.(next); + resolve(); + }; + if (queue.length > 0) { + return done(); + } + ws.once('message', done); + if (expire) { + setTimeout(() => { + ws.removeListener('message', done); // expired + resolve(); + }, expire); + } + }); + }, + async waitForClose( + test?: (event: WebSocket.CloseEvent) => void, + expire?: number, + ) { + return new Promise((resolve) => { + if (closeEvent) { + test?.(closeEvent); + return resolve(); + } + ws.onclose = (event) => { + closeEvent = event; + test?.(event); + resolve(); + }; + if (expire) { + setTimeout(() => { + // @ts-expect-error: its ok + ws.onclose = null; // expired + resolve(); + }, expire); + } + }); + }, + }), + ); + }); +} diff --git a/src/use/ws.ts b/src/use/ws.ts new file mode 100644 index 00000000..00312487 --- /dev/null +++ b/src/use/ws.ts @@ -0,0 +1,126 @@ +import type * as http from 'http'; +import type * as ws from 'ws'; +import { makeServer, ServerOptions } from '../server'; +import { Disposable } from '../types'; + +// for nicer documentation +type WebSocket = typeof ws.prototype; +type WebSocketServer = ws.Server; + +/** + * The extra that will be put in the `Context`. + */ +export interface Extra { + /** + * The actual socket connection between the server and the client. + */ + readonly socket: WebSocket; + /** + * The initial HTTP request before the actual + * socket and connection is established. + */ + readonly request: http.IncomingMessage; +} + +/** + * Use the server on a [ws](https://github.com/websockets/ws) ws server. + * This is a basic starter, feel free to copy the code over and adjust it to your needs + */ +export function useServer( + options: ServerOptions, + ws: WebSocketServer, + /** + * The timout between dispatched keep-alive messages. Internally uses the [ws Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/wss_API/Writing_ws_servers#Pings_and_Pongs_The_Heartbeat_of_wss)) + * to check that the link between the clients and the server is operating and to prevent the link + * from being broken due to idling. + * + * @default 12 * 1000 // 12 seconds + */ + keepAlive = 12 * 1000, +): Disposable { + const isProd = process.env.NODE_ENV === 'production'; + const server = makeServer(options); + + ws.on('error', (err) => { + // catch the first thrown error and re-throw it once all clients have been notified + let firstErr: Error | null = null; + + // report server errors by erroring out all clients with the same error + for (const client of ws.clients) { + try { + client.close(1011, isProd ? 'Internal Error' : err.message); + } catch (err) { + firstErr = firstErr ?? err; + } + } + + if (firstErr) { + throw firstErr; + } + }); + + ws.on('connection', (socket, request) => { + // keep alive through ping-pong messages + let pongWait: NodeJS.Timeout | null = null; + const pingInterval = + keepAlive > 0 && isFinite(keepAlive) + ? setInterval(() => { + // ping pong on open sockets only + if (socket.readyState === socket.OPEN) { + // terminate the connection after pong wait has passed because the client is idle + pongWait = setTimeout(() => { + socket.terminate(); + }, keepAlive); + + // listen for client's pong and stop socket termination + socket.once('pong', () => { + if (pongWait) { + clearTimeout(pongWait); + pongWait = null; + } + }); + + socket.ping(); + } + }, keepAlive) + : null; + + const closed = server.opened( + { + protocol: socket.protocol, + send: (data) => + new Promise((resolve, reject) => { + socket.send(data, (err) => (err ? reject(err) : resolve())); + }), + close: (code, reason) => socket.close(code, reason), + onMessage: (cb) => + socket.on('message', async (event) => { + try { + await cb(event.toString()); + } catch (err) { + socket.close(1011, isProd ? 'Internal Error' : err.message); + } + }), + }, + { socket, request }, + ); + + socket.once('close', () => { + if (pongWait) clearTimeout(pongWait); + if (pingInterval) clearInterval(pingInterval); + closed(); + }); + }); + + return { + dispose: async () => { + for (const client of ws.clients) { + client.close(1001, 'Going away'); + } + ws.removeAllListeners(); + await new Promise((resolve, reject) => { + ws.close((err) => (err ? reject(err) : resolve())); + }); + }, + }; +}