From a412d2570e484046a058c11f39813c7794ec9147 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 17 Aug 2020 22:16:32 +0200 Subject: [PATCH] feat(server): Implement following the new transport protocol (#1) * feat(server): begin # Conflicts: # package.json * feat(server): be more strict about protocol * test: manually control server * refactor: dispose waits for server to close * test(server): failing for graceful dispose * refactor: improve disposal and event listeners * feat: return resulting server on creation * refactor: explicit server naming as creation result * refactor: a bit of cleanup * feat: report server errors by closing the connections * refactor: consistency * refactor: unnecessary client setup [skip ci] * refactor: comply with the new protocol * refactor: use parseMessage [skip ci] * refactor: documentation comments * feat: more type safety * feat: add more server options * feat: comply with updated protocol * refactor: use promise resolutions and smaller waits * feat: implement onConnect callback and wait timeout * test(onConnect): longer promise resolution * refactor: smaller refinements [skip ci] * chore(deps): add missing ws types after rebase * style: make linter happy * feat(stringifyMessage): implement and use * refactor(message): simplify message parsing errors * style: typescript can infer * feat(Subscribe): terminate socket if connection is not acknowledged * feat(message): query can be of DocumentNode type too * refactor(Error): payload is an array of errors * refactor(message): parameters are readonly * refactor: schema is optional in onSubscribe exec args * feat: implement query and mutation operation * test(Subscribe): simple query operation * test(Subscribe): simple query validation errors * test(Subscribe): socket shouldnt close or error because of GraphQL errors * refactor: typo * test(Subscribe): support operations with `DocumentNode` type query * test(Subscribe): close socket on request if schema is undefined * test(Subscribe): pick up the schema from `onSubscribe` * feat: implement subscription operation * refactor: typo * refactor: use fixture --- .eslintrc.js | 4 + jest.config.js | 2 +- package.json | 9 +- src/message.ts | 147 ++++++++ src/protocol.ts | 7 + src/server/.gitkeep | 0 src/server/index.ts | 1 + src/server/server.ts | 481 +++++++++++++++++++++++++ src/tests/client.ts | 1 - src/tests/fixtures/simple.ts | 83 +++++ src/tests/server.ts | 670 +++++++++++++++++++++++++++++++++++ src/types.d.ts | 15 + src/utils.ts | 57 +++ yarn.lock | 42 ++- 14 files changed, 1515 insertions(+), 4 deletions(-) create mode 100644 src/message.ts create mode 100644 src/protocol.ts delete mode 100644 src/server/.gitkeep create mode 100644 src/server/index.ts create mode 100644 src/server/server.ts delete mode 100644 src/tests/client.ts create mode 100644 src/tests/fixtures/simple.ts create mode 100644 src/tests/server.ts create mode 100644 src/types.d.ts create mode 100644 src/utils.ts diff --git a/.eslintrc.js b/.eslintrc.js index 44541dec..60104477 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -12,4 +12,8 @@ module.exports = { ], parser: '@typescript-eslint/parser', plugins: ['@typescript-eslint', 'prettier'], + rules: { + // unused vars will be handled by the TS compiler + '@typescript-eslint/no-unused-vars': 'off', + }, }; diff --git a/jest.config.js b/jest.config.js index d16dbdae..95a76c22 100644 --- a/jest.config.js +++ b/jest.config.js @@ -2,5 +2,5 @@ module.exports = { testEnvironment: 'node', moduleFileExtensions: ['ts', 'js'], testRegex: '/tests/.+.ts$', - testPathIgnorePatterns: ['/node_modules/'], + testPathIgnorePatterns: ['/node_modules/', '/fixtures/'], }; diff --git a/package.json b/package.json index b1f7c4fb..4ca2342c 100644 --- a/package.json +++ b/package.json @@ -27,8 +27,12 @@ "build": "tsc -b", "release": "semantic-release" }, + "peerDependencies": { + "graphql": ">=15.0.0" + }, "dependencies": { - "websocket-as-promised": "^1.0.1" + "websocket-as-promised": "^1.0.1", + "ws": "^7.3.1" }, "devDependencies": { "@babel/core": "^7.11.1", @@ -41,12 +45,15 @@ "@semantic-release/changelog": "^5.0.1", "@semantic-release/git": "^9.0.0", "@types/jest": "^26.0.9", + "@types/ws": "^7.2.6", "@typescript-eslint/eslint-plugin": "^3.9.0", "@typescript-eslint/parser": "^3.9.0", "babel-jest": "^26.3.0", "eslint": "^7.6.0", "eslint-config-prettier": "^6.11.0", "eslint-plugin-prettier": "^3.1.4", + "graphql": "^15.3.0", + "graphql-subscriptions": "^1.1.0", "jest": "^26.3.0", "prettier": "^2.0.5", "semantic-release": "^17.1.1", diff --git a/src/message.ts b/src/message.ts new file mode 100644 index 00000000..50aeb176 --- /dev/null +++ b/src/message.ts @@ -0,0 +1,147 @@ +/** + * + * message + * + */ + +import { GraphQLError, ExecutionResult, DocumentNode } from 'graphql'; +import { + isObject, + hasOwnProperty, + hasOwnObjectProperty, + hasOwnStringProperty, + hasOwnArrayProperty, +} from './utils'; + +/** Types of messages allowed to be sent by the client/server over the WS protocol. */ +export enum MessageType { + ConnectionInit = 'connection_init', // Client -> Server + ConnectionAck = 'connection_ack', // Server -> Client + + Subscribe = 'subscribe', // Client -> Server + Next = 'next', // Server -> Client + Error = 'error', // Server -> Client + Complete = 'complete', // bidirectional +} + +export interface ConnectionInitMessage { + readonly type: MessageType.ConnectionInit; + readonly payload?: Record; // connectionParams +} + +export interface ConnectionAckMessage { + readonly type: MessageType.ConnectionAck; +} + +export interface SubscribeMessage { + readonly id: string; + readonly type: MessageType.Subscribe; + readonly payload: { + readonly operationName: string; + readonly query: string | DocumentNode; + readonly variables: Record; + }; +} + +export interface NextMessage { + readonly id: string; + readonly type: MessageType.Next; + readonly payload: ExecutionResult; +} + +export interface ErrorMessage { + readonly id: string; + readonly type: MessageType.Error; + readonly payload: readonly GraphQLError[]; +} + +export interface CompleteMessage { + readonly id: string; + readonly type: MessageType.Complete; +} + +export type Message< + T extends MessageType = MessageType +> = T extends MessageType.ConnectionAck + ? ConnectionAckMessage + : T extends MessageType.ConnectionInit + ? ConnectionInitMessage + : T extends MessageType.Subscribe + ? SubscribeMessage + : T extends MessageType.Next + ? NextMessage + : T extends MessageType.Error + ? ErrorMessage + : T extends MessageType.Complete + ? CompleteMessage + : never; + +export function isMessage(val: unknown): val is Message { + if (isObject(val)) { + // all messages must have the `type` prop + if (!hasOwnProperty(val, 'type')) { + return false; + } + // validate other properties depending on the `type` + switch (val.type) { + case MessageType.ConnectionInit: + // the connection init message can have optional object `connectionParams` in the payload + return !hasOwnProperty(val, 'payload') || isObject(val.payload); + case MessageType.ConnectionAck: + return true; + case MessageType.Subscribe: + return ( + hasOwnStringProperty(val, 'id') && + hasOwnObjectProperty(val, 'payload') && + hasOwnStringProperty(val.payload, 'operationName') && + (hasOwnStringProperty(val.payload, 'query') || // string query + hasOwnObjectProperty(val.payload, 'query')) && // document node query + hasOwnObjectProperty(val.payload, 'variables') + ); + case MessageType.Next: + return ( + hasOwnStringProperty(val, 'id') && + hasOwnObjectProperty(val, 'payload') && + // ExecutionResult + (hasOwnObjectProperty(val.payload, 'data') || + hasOwnObjectProperty(val.payload, 'errors')) + ); + case MessageType.Error: + return ( + hasOwnStringProperty(val, 'id') && + // GraphQLError + hasOwnArrayProperty(val, 'payload') && + val.payload.length > 0 // must be at least one error + ); + case MessageType.Complete: + return hasOwnStringProperty(val, 'id'); + default: + return false; + } + } + return false; +} + +export function parseMessage(data: unknown): Message { + if (isMessage(data)) { + return data; + } + if (typeof data === 'string') { + const message = JSON.parse(data); + if (!isMessage(message)) { + throw new Error('Invalid message'); + } + return message; + } + throw new Error('Message not parsable'); +} + +/** Helps stringifying a valid message ready to be sent through the socket. */ +export function stringifyMessage( + msg: Message, +): string { + if (!isMessage(msg)) { + throw new Error('Cannot stringify invalid message'); + } + return JSON.stringify(msg); +} diff --git a/src/protocol.ts b/src/protocol.ts new file mode 100644 index 00000000..aebd0c13 --- /dev/null +++ b/src/protocol.ts @@ -0,0 +1,7 @@ +/** + * + * protocol + * + */ + +export const GRAPHQL_TRANSPORT_WS_PROTOCOL = 'graphql-transport-ws'; diff --git a/src/server/.gitkeep b/src/server/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/src/server/index.ts b/src/server/index.ts new file mode 100644 index 00000000..0ce5251a --- /dev/null +++ b/src/server/index.ts @@ -0,0 +1 @@ +export * from './server'; diff --git a/src/server/server.ts b/src/server/server.ts new file mode 100644 index 00000000..4f0ad01e --- /dev/null +++ b/src/server/server.ts @@ -0,0 +1,481 @@ +/** + * + * server + * + */ + +import http from 'http'; +import WebSocket from 'ws'; +import { + GraphQLSchema, + ValidationRule, + ExecutionResult, + ExecutionArgs, + parse, + validate, + getOperationAST, + subscribe, + GraphQLError, +} from 'graphql'; +import { Disposable } from '../types'; +import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol'; +import { + Message, + MessageType, + parseMessage, + SubscribeMessage, + CompleteMessage, + stringifyMessage, +} from '../message'; +import { + Optional, + isObject, + isAsyncIterable, + hasOwnObjectProperty, + hasOwnStringProperty, + noop, +} from '../utils'; + +export interface ServerOptions { + /** + * The GraphQL schema on which the operations + * will be executed and validated against. If + * the schema is left undefined, one must be + * provided by in the resulting `ExecutionArgs` + * from the `onSubscribe` callback. + */ + schema?: GraphQLSchema; + /** + * Is the `subscribe` function + * from GraphQL which is used to + * execute the subscription operation + * upon. + */ + execute: (args: ExecutionArgs) => Promise | ExecutionResult; + /** + * Is the `subscribe` function + * from GraphQL which is used to + * execute the subscription operation + * upon. + */ + subscribe: ( + args: ExecutionArgs, + ) => Promise | ExecutionResult>; + /** + * Is the connection callback called when the + * client requests the connection initialisation + * through the message `ConnectionInit`. The message + * payload (`connectionParams` on the client) is + * present in the `Context.connectionParams`. + * + * - Returning `true` from the callback will + * allow the client to connect. + * + * - Returning `false` from the callback will + * terminate the socket by dispatching the + * close event `4403: Forbidden`. + * + * - Throwing an error from the callback will + * terminate the socket by dispatching the + * close event `4400: `, where + * the `` is the message of the + * thrown `Error`. + */ + onConnect?: (ctx: Context) => Promise | boolean; + /** + * The amount of time for which the + * server will wait for `ConnectionInit` message. + * Defaults to: **3 seconds**. + * + * Set the value to `Infinity` to skip waiting. + * + * If the wait timeout has passed and the client + * has not sent the `ConnectionInit` message, + * the server will terminate the socket by + * dispatching a close event `4408: Connection initialisation timeout` + */ + connectionInitWaitTimeout?: number; + /** + * Custom validation rules overriding all + * validation rules defined by the GraphQL spec. + */ + validationRules?: readonly ValidationRule[]; + /** + * Format the operation execution results + * if the implementation requires an adjusted + * result. + */ + formatExecutionResult?: ( + ctx: Context, + result: ExecutionResult, + ) => Promise | ExecutionResult; + /** + * The subscribe callback executed before + * the actual operation execution. Useful + * for manipulating the execution arguments + * before the doing the operation. + */ + onSubscribe?: ( + ctx: Context, + message: SubscribeMessage, + args: Optional, + ) => Promise | ExecutionArgs; + /** + * The complete callback is executed after the + * operation has completed or the subscription + * has been closed. + */ + onComplete?: (ctx: Context, message: CompleteMessage) => void; +} + +export interface Context { + readonly socket: WebSocket; + /** + * Indicates that the `ConnectionInit` message + * has been received by the server. If this is + * `true`, the client wont be kicked off after + * the wait timeout has passed. + */ + connectionInitReceived: boolean; + /** + * Indicates that the connection was acknowledged + * by having dispatched the `ConnectionAck` message + * to the related client. + */ + acknowledged: boolean; + connectionParams?: Readonly>; + /** + * Holds the active subscriptions for this context. + * Subscriptions are for `subscription` operations **only**, + * other operations (`query`/`mutation`) are resolved immediately. + */ + subscriptions: Record>; +} + +export interface Server extends Disposable { + webSocketServer: WebSocket.Server; +} + +/** + * Creates a protocol complient WebSocket GraphQL + * subscription server. Read more about the protocol + * in the PROTOCOL.md documentation file. + */ +export function createServer( + { + schema, + execute, + onConnect, + connectionInitWaitTimeout = 3 * 1000, // 3 seconds + validationRules, + formatExecutionResult, + onSubscribe, + onComplete, + }: ServerOptions, + websocketOptionsOrServer: WebSocket.ServerOptions | WebSocket.Server, +): Server { + const webSocketServer = + websocketOptionsOrServer instanceof WebSocket.Server + ? websocketOptionsOrServer + : new WebSocket.Server(websocketOptionsOrServer); + + function handleConnection(socket: WebSocket, _request: http.IncomingMessage) { + if ( + socket.protocol === undefined || + socket.protocol !== GRAPHQL_TRANSPORT_WS_PROTOCOL || + (Array.isArray(socket.protocol) && + socket.protocol.indexOf(GRAPHQL_TRANSPORT_WS_PROTOCOL) === -1) + ) { + // 1002: Protocol Error + socket.close(1002, 'Protocol Error'); + return; + } + + const ctxRef: { current: Context } = { + current: { + socket, + connectionInitReceived: false, + acknowledged: false, + subscriptions: {}, + }, + }; + + // kick the client off (close socket) if the connection has + // not been initialised after the specified wait timeout + const connectionInitWait = + connectionInitWaitTimeout !== Infinity && + setTimeout(() => { + if (!ctxRef.current.connectionInitReceived) { + ctxRef.current.socket.close( + 4408, + 'Connection initialisation timeout', + ); + } + }, connectionInitWaitTimeout); + + function errorOrCloseHandler( + errorOrClose: WebSocket.ErrorEvent | WebSocket.CloseEvent, + ) { + if (connectionInitWait) { + clearTimeout(connectionInitWait); + } + + if (isErrorEvent(errorOrClose)) { + // TODO-db-200805 leaking sensitive information by sending the error message too? + // 1011: Internal Error + ctxRef.current.socket.close(1011, errorOrClose.message); + } + + Object.entries(ctxRef.current.subscriptions).forEach( + ([, subscription]) => { + (subscription.return || noop)(); + }, + ); + } + + socket.onerror = errorOrCloseHandler; + socket.onclose = errorOrCloseHandler; + socket.onmessage = makeOnMessage(ctxRef.current); + } + webSocketServer.on('connection', handleConnection); + webSocketServer.on('error', (err) => { + for (const client of webSocketServer.clients) { + // report server errors by erroring out all clients with the same error + client.emit('error', err); + } + }); + + // Sends through a message only if the socket is open. + function sendMessage( + ctx: Context, + message: Message, + callback?: (err?: Error) => void, + ) { + return new Promise((resolve, reject) => { + if (ctx.socket.readyState === WebSocket.OPEN) { + try { + ctx.socket.send(stringifyMessage(message), (err) => { + if (callback) callback(err); + if (err) { + return reject(err); + } + return resolve(); + }); + } catch (err) { + reject(err); + } + } else { + if (callback) callback(); + resolve(); + } + }); + } + + function makeOnMessage(ctx: Context) { + return async function (event: WebSocket.MessageEvent) { + try { + const message = parseMessage(event.data); + switch (message.type) { + case MessageType.ConnectionInit: { + ctx.connectionInitReceived = true; + + if (isObject(message.payload)) { + ctx.connectionParams = message.payload; + } + + if (onConnect) { + const permitted = await onConnect(ctx); + if (!permitted) { + return ctx.socket.close(4403, 'Forbidden'); + } + } + + await sendMessage(ctx, { + type: MessageType.ConnectionAck, + }); + + ctx.acknowledged = true; + break; + } + case MessageType.Subscribe: { + if (!ctx.acknowledged) { + return ctx.socket.close(4401, 'Unauthorized'); + } + + const operation = message.payload; + + let execArgsMaybeSchema: Optional = { + schema, + operationName: operation.operationName, + document: + typeof operation.query === 'string' + ? parse(operation.query) + : operation.query, + variableValues: operation.variables, + }; + if (onSubscribe) { + execArgsMaybeSchema = await onSubscribe( + ctx, + message, + execArgsMaybeSchema, + ); + } + if (!execArgsMaybeSchema.schema) { + // not providing a schema is a fatal server error + return webSocketServer.emit( + 'error', + new Error('The GraphQL schema is not provided'), + ); + } + + // the execution arguments should be complete now + const execArgs = execArgsMaybeSchema as ExecutionArgs; + + // validate + const validationErrors = validate( + execArgs.schema, + execArgs.document, + validationRules, + ); + if (validationErrors.length > 0) { + return await sendMessage(ctx, { + id: message.id, + type: MessageType.Error, + payload: validationErrors, + }); + } + + // execute + const operationAST = getOperationAST( + execArgs.document, + execArgs.operationName, + ); + if (!operationAST) { + throw new Error('Unable to get operation AST'); + } + if (operationAST.operation === 'subscription') { + const subscriptionOrResult = await subscribe(execArgs); + if (isAsyncIterable(subscriptionOrResult)) { + ctx.subscriptions[message.id] = subscriptionOrResult; + + try { + for await (let result of subscriptionOrResult) { + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: MessageType.Next, + payload: result, + }); + } + + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } catch (err) { + await sendMessage(ctx, { + id: message.id, + type: MessageType.Error, + payload: [ + new GraphQLError( + err instanceof Error + ? err.message + : new Error(err).message, + ), + ], + }); + } finally { + delete ctx.subscriptions[message.id]; + } + } else { + let result = subscriptionOrResult; + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: MessageType.Next, + payload: result, + }); + + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } + } else { + // operationAST.operation === 'query' || 'mutation' + + let result = await execute(execArgs); + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: MessageType.Next, + payload: result, + }); + + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } + break; + } + case MessageType.Complete: { + if (ctx.subscriptions[message.id]) { + await (ctx.subscriptions[message.id].return ?? noop)(); + } + break; + } + default: + throw new Error( + `Unexpected message of type ${message.type} received`, + ); + } + } catch (err) { + ctx.socket.close(4400, err.message); + } + }; + } + + return { + webSocketServer, + dispose: async () => { + for (const client of webSocketServer.clients) { + // 1001: Going away + client.close(1001, 'Going away'); + } + + webSocketServer.removeAllListeners(); + + await new Promise((resolve, reject) => + webSocketServer.close((err) => (err ? reject(err) : resolve())), + ); + }, + }; +} + +function isErrorEvent(obj: unknown): obj is WebSocket.ErrorEvent { + return ( + isObject(obj) && + hasOwnObjectProperty(obj, 'error') && + hasOwnStringProperty(obj, 'message') && + hasOwnStringProperty(obj, 'type') + ); +} diff --git a/src/tests/client.ts b/src/tests/client.ts deleted file mode 100644 index 5bc59b21..00000000 --- a/src/tests/client.ts +++ /dev/null @@ -1 +0,0 @@ -it.todo('should be a client test'); diff --git a/src/tests/fixtures/simple.ts b/src/tests/fixtures/simple.ts new file mode 100644 index 00000000..4a342cf9 --- /dev/null +++ b/src/tests/fixtures/simple.ts @@ -0,0 +1,83 @@ +import { + GraphQLSchema, + GraphQLObjectType, + GraphQLString, + execute, + subscribe, + GraphQLNonNull, +} from 'graphql'; +import http from 'http'; +import { PubSub } from 'graphql-subscriptions'; +import { createServer, ServerOptions, Server } from '../../server'; + +export const pubsub = new PubSub(); + +export const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + getValue: { + type: new GraphQLNonNull(GraphQLString), + resolve: () => 'value', + }, + }, + }), + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + becameHappy: { + type: new GraphQLObjectType({ + name: 'Person', + fields: { + id: { type: new GraphQLNonNull(GraphQLString) }, + name: { type: new GraphQLNonNull(GraphQLString) }, + }, + }), + subscribe: () => { + return pubsub.asyncIterator('becameHappy'); + }, + }, + }, + }), +}); + +export const port = 8273, + path = '/graphql-simple', + url = `ws://localhost:${port}${path}`; + +export async function startServer( + options: Partial = {}, +): Promise<[Server, () => Promise]> { + const httpServer = http.createServer((_req, res) => { + res.writeHead(404); + res.end(); + }); + + const server = await createServer( + { + schema, + execute, + subscribe, + ...options, + }, + { + server: httpServer, + path, + }, + ); + + await new Promise((resolve) => httpServer.listen(port, resolve)); + + return [ + server, + () => + new Promise((resolve, reject) => { + server + .dispose() + .catch(reject) + .then(() => { + httpServer.close((err) => (err ? reject(err) : resolve())); + }); + }), + ]; +} diff --git a/src/tests/server.ts b/src/tests/server.ts new file mode 100644 index 00000000..5a5b97c7 --- /dev/null +++ b/src/tests/server.ts @@ -0,0 +1,670 @@ +import WebSocket from 'ws'; +import { parse } from 'graphql'; +import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol'; +import { MessageType, parseMessage, stringifyMessage } from '../message'; +import { startServer, url, schema, pubsub } from './fixtures/simple'; + +/** Waits for the specified timeout and then resolves the promise. */ +const wait = (timeout: number) => + new Promise((resolve) => setTimeout(resolve, timeout)); + +let dispose: () => Promise | undefined; +async function makeServer(...args: Parameters) { + let server; + [server, dispose] = await startServer(...args); + return [ + server, + async () => { + await dispose(); + dispose = undefined; + }, + ]; +} +afterEach(async () => { + if (dispose) { + await dispose(); + dispose = undefined; + } +}); + +/** + * Tests + */ + +it('should allow connections with valid protocols only', async () => { + expect.assertions(10); + + await makeServer(); + + let client = new WebSocket(url); + client.onclose = (event) => { + expect(event.code).toBe(1002); + expect(event.reason).toBe('Protocol Error'); + expect(event.wasClean).toBeTruthy(); + }; + + await wait(5); + + client = new WebSocket(url, ['graphql', 'json']); + client.onclose = (event) => { + expect(event.code).toBe(1002); + expect(event.reason).toBe('Protocol Error'); + expect(event.wasClean).toBeTruthy(); + }; + + await wait(5); + + client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL + 'gibberish'); + client.onclose = (event) => { + expect(event.code).toBe(1002); + expect(event.reason).toBe('Protocol Error'); + expect(event.wasClean).toBeTruthy(); + }; + + await wait(5); + + client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + const closeFn = jest.fn(); + client.onclose = closeFn; + + await wait(10); + + expect(closeFn).not.toBeCalled(); +}); + +it('should gracefully go away when disposing', async () => { + expect.assertions(9); + + const [, dispose] = await makeServer(); + + const errorFn = jest.fn(); + + const client1 = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client1.onerror = errorFn; + client1.onclose = (event) => { + expect(event.code).toBe(1001); + expect(event.reason).toBe('Going away'); + expect(event.wasClean).toBeTruthy(); + }; + + const client2 = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client2.onerror = errorFn; + client2.onclose = (event) => { + expect(event.code).toBe(1001); + expect(event.reason).toBe('Going away'); + expect(event.wasClean).toBeTruthy(); + }; + + await wait(10); + + await dispose(); + + await wait(10); + + expect(errorFn).not.toBeCalled(); + expect(client1.readyState).toBe(WebSocket.CLOSED); + expect(client2.readyState).toBe(WebSocket.CLOSED); +}); + +it('should report server errors to clients by closing the connection', async () => { + expect.assertions(3); + + const [{ webSocketServer }] = await makeServer(); + + const emittedError = new Error("I'm a teapot"); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = (event) => { + expect(event.code).toBe(1011); // 1011: Internal Error + expect(event.reason).toBe(emittedError.message); + expect(event.wasClean).toBeTruthy(); // because the server reported the error + }; + + await wait(10); + + webSocketServer.emit('error', emittedError); + + await wait(10); +}); + +describe('onConnect', () => { + it('should refuse connection and close socket if returning `false`', async () => { + expect.assertions(3); + + await makeServer({ + onConnect: () => { + return false; + }, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = (event) => { + expect(event.code).toBe(4403); + expect(event.reason).toBe('Forbidden'); + expect(event.wasClean).toBeTruthy(); + }; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + await wait(10); + }); + + it('should close socket with error thrown from the callback', async () => { + expect.assertions(3); + + const error = new Error("I'm a teapot"); + + await makeServer({ + onConnect: () => { + throw error; + }, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = (event) => { + expect(event.code).toBe(4400); + expect(event.reason).toBe(error.message); + expect(event.wasClean).toBeTruthy(); + }; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + await wait(10); + }); + + it('should acknowledge connection if not implemented or returning `true`', async () => { + expect.assertions(2); + + function test() { + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onmessage = ({ data }) => { + const message = parseMessage(data); + expect(message.type).toBe(MessageType.ConnectionAck); + }; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + } + + // no implementation + const [, dispose] = await makeServer(); + test(); + await wait(10); + await dispose(); + + // returns true + await makeServer({ + onConnect: () => { + return true; + }, + }); + test(); + await wait(10); + }); + + it('should pass in the `connectionParams` through the context and have other flags correctly set', async () => { + expect.assertions(3); + + const connectionParams = { + some: 'string', + with: 'a', + number: 10, + }; + + await makeServer({ + onConnect: (ctx) => { + expect(ctx.connectionParams).toEqual(connectionParams); + expect(ctx.connectionInitReceived).toBeTruthy(); // obviously received + expect(ctx.acknowledged).toBeFalsy(); // not yet acknowledged + return true; + }, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + payload: connectionParams, + }), + ); + }; + + await wait(10); + }); + + it('should close the socket after the `connectionInitWaitTimeout` has passed without having received a `ConnectionInit` message', async () => { + expect.assertions(3); + + await makeServer({ connectionInitWaitTimeout: 10 }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = (event) => { + expect(event.code).toBe(4408); + expect(event.reason).toBe('Connection initialisation timeout'); + expect(event.wasClean).toBeTruthy(); + }; + + await wait(20); + }); + + it('should not close the socket after the `connectionInitWaitTimeout` has passed but the callback is still resolving', async () => { + expect.assertions(2); + + await makeServer({ + connectionInitWaitTimeout: 10, + onConnect: () => + new Promise((resolve) => setTimeout(() => resolve(true), 20)), + }); + + const closeFn = jest.fn(); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = closeFn; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + expect(message.type).toBe(MessageType.ConnectionAck); + }; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + await wait(30); + + expect(closeFn).not.toBeCalled(); + }); +}); + +describe('Subscribe', () => { + it('should close the socket on request if connection is not acknowledged', async () => { + expect.assertions(3); + + await makeServer(); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = (event) => { + expect(event.code).toBe(4401); + expect(event.reason).toBe('Unauthorized'); + expect(event.wasClean).toBeTruthy(); + }; + client.onopen = () => { + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'NoAck', + query: `subscription NoAck {}`, + variables: {}, + }, + }), + ); + }; + + await wait(20); + }); + + it('should close the socket on request if schema is left undefined', async () => { + expect.assertions(3); + + await makeServer({ + schema: undefined, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onclose = (event) => { + expect(event.code).toBe(1011); + expect(event.reason).toBe('The GraphQL schema is not provided'); + expect(event.wasClean).toBeTruthy(); + }; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestString', + query: `query TestString { + getValue + }`, + variables: {}, + }, + }), + ); + break; + default: + fail(`Not supposed to receive a message of type ${message.type}`); + } + }; + + await wait(10); + }); + + it('should pick up the schema from `onSubscribe`', async () => { + expect.assertions(2); + + await makeServer({ + schema: undefined, + onSubscribe: (_ctx, _message, args) => { + return { + ...args, + schema, + }; + }, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + const closeOrErrorFn = jest.fn(); + client.onerror = closeOrErrorFn; + client.onclose = closeOrErrorFn; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestString', + query: `query TestString { + getValue + }`, + variables: {}, + }, + }), + ); + break; + case MessageType.Next: + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'value' } }, + }); + break; + } + }; + + await wait(20); + + expect(closeOrErrorFn).not.toBeCalled(); + }); + + it('should execute the query of `string` type, "next" the result and then "complete"', async () => { + expect.assertions(3); + + await makeServer({ + schema, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + let receivedNext = false; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestString', + query: `query TestString { + getValue + }`, + variables: {}, + }, + }), + ); + break; + case MessageType.Next: + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'value' } }, + }); + receivedNext = true; + break; + case MessageType.Complete: + expect(receivedNext).toBeTruthy(); + expect(message).toEqual({ + id: '1', + type: MessageType.Complete, + }); + break; + default: + fail(`Not supposed to receive a message of type ${message.type}`); + } + }; + + await wait(20); + }); + + it('should execute the query of `DocumentNode` type, "next" the result and then "complete"', async () => { + expect.assertions(3); + + await makeServer({ + schema, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + let receivedNext = false; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestString', + query: parse(`query TestString { + getValue + }`), + variables: {}, + }, + }), + ); + break; + case MessageType.Next: + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'value' } }, + }); + receivedNext = true; + break; + case MessageType.Complete: + expect(receivedNext).toBeTruthy(); + expect(message).toEqual({ + id: '1', + type: MessageType.Complete, + }); + break; + default: + fail(`Not supposed to receive a message of type ${message.type}`); + } + }; + + await wait(20); + }); + + it('should execute the query and "error" out because of validation errors', async () => { + expect.assertions(8); + + await makeServer({ + schema, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + const closeOrErrorFn = jest.fn(); + client.onerror = closeOrErrorFn; + client.onclose = closeOrErrorFn; + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestNoField', + query: `query TestNoField { + testNumber + testBoolean + }`, + variables: {}, + }, + }), + ); + break; + case MessageType.Error: + expect(message.id).toBe('1'); + expect(message.payload).toBeInstanceOf(Array); + expect(message.payload.length).toBe(2); + + // testNumber + expect(message.payload[0].message).toBe( + 'Cannot query field "testNumber" on type "Query".', + ); + expect(message.payload[0].locations).toBeInstanceOf(Array); + + // testBoolean + expect(message.payload[1].message).toBe( + 'Cannot query field "testBoolean" on type "Query".', + ); + expect(message.payload[1].locations).toBeInstanceOf(Array); + break; + default: + fail(`Not supposed to receive a message of type ${message.type}`); + } + }; + + await wait(20); + + // socket shouldnt close or error because of GraphQL errors + expect(closeOrErrorFn).not.toBeCalled(); + }); + + it('should execute the subscription and "next" the published payload', async () => { + expect.assertions(1); + + await makeServer({ + schema, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: { + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'BecomingHappy', + query: `subscription BecomingHappy { + becameHappy { + name + } + }`, + variables: {}, + }, + }), + () => + setTimeout( + () => + pubsub.publish('becameHappy', { + becameHappy: { + name: 'john', + }, + }), + 0, + ), + ); + break; + } + case MessageType.Next: + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { becameHappy: { name: 'john' } } }, + }); + break; + default: + fail(`Not supposed to receive a message of type ${message.type}`); + } + }; + + await wait(20); + }); +}); diff --git a/src/types.d.ts b/src/types.d.ts new file mode 100644 index 00000000..2ea60044 --- /dev/null +++ b/src/types.d.ts @@ -0,0 +1,15 @@ +/** + * + * types + * + */ + +export interface Disposable { + dispose: () => Promise; +} + +export interface Sink { + next(value: T): void; + error(error: Error): void; + complete(): void; +} diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 00000000..cc8c0d47 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,57 @@ +/** + * + * utils + * + */ + +export type Optional = Pick> & + Partial>; + +export function isObject(val: unknown): val is Record { + return typeof val === 'object' && val !== null; +} + +export function isArray(val: unknown): val is unknown[] { + return typeof val === 'object' && val !== null && Array.isArray(val); +} + +export function isAsyncIterable( + val: unknown, +): val is AsyncIterableIterator { + return typeof Object(val)[Symbol.asyncIterator] === 'function'; +} + +export function hasOwnProperty< + O extends Record, + P extends PropertyKey +>(obj: O, prop: P): obj is O & Record { + return Object.prototype.hasOwnProperty.call(obj, prop); +} + +export function hasOwnObjectProperty< + O extends Record, + P extends PropertyKey +>(obj: O, prop: P): obj is O & Record> { + return Object.prototype.hasOwnProperty.call(obj, prop) && isObject(obj[prop]); +} + +export function hasOwnArrayProperty< + O extends Record, + P extends PropertyKey +>(obj: O, prop: P): obj is O & Record { + return Object.prototype.hasOwnProperty.call(obj, prop) && isArray(obj[prop]); +} + +export function hasOwnStringProperty< + O extends Record, + P extends PropertyKey +>(obj: O, prop: P): obj is O & Record { + return ( + Object.prototype.hasOwnProperty.call(obj, prop) && + typeof obj[prop] === 'string' + ); +} + +export function noop(): void { + /**/ +} diff --git a/yarn.lock b/yarn.lock index 8f7c8982..889b6e9b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1236,17 +1236,23 @@ __metadata: "@semantic-release/changelog": ^5.0.1 "@semantic-release/git": ^9.0.0 "@types/jest": ^26.0.9 + "@types/ws": ^7.2.6 "@typescript-eslint/eslint-plugin": ^3.9.0 "@typescript-eslint/parser": ^3.9.0 babel-jest: ^26.3.0 eslint: ^7.6.0 eslint-config-prettier: ^6.11.0 eslint-plugin-prettier: ^3.1.4 + graphql: ^15.3.0 + graphql-subscriptions: ^1.1.0 jest: ^26.3.0 prettier: ^2.0.5 semantic-release: ^17.1.1 typescript: ^3.9.7 websocket-as-promised: ^1.0.1 + ws: ^7.3.1 + peerDependencies: + graphql: ">=15.0.0" languageName: unknown linkType: soft @@ -1976,6 +1982,15 @@ __metadata: languageName: node linkType: hard +"@types/ws@npm:^7.2.6": + version: 7.2.6 + resolution: "@types/ws@npm:7.2.6" + dependencies: + "@types/node": "*" + checksum: 2f7c358c4b2798f1885c3edecf4b09ecfa35933eff0c37bf8b99eab722032db5c1ab9e26899d13f035a8cf1d92a681578016c8837c5e6725892c24ee10f2f63f + languageName: node + linkType: hard + "@types/yargs-parser@npm:*": version: 15.0.0 resolution: "@types/yargs-parser@npm:15.0.0" @@ -4804,6 +4819,24 @@ fsevents@^2.1.2: languageName: node linkType: hard +"graphql-subscriptions@npm:^1.1.0": + version: 1.1.0 + resolution: "graphql-subscriptions@npm:1.1.0" + dependencies: + iterall: ^1.2.1 + peerDependencies: + graphql: ^0.10.5 || ^0.11.3 || ^0.12.0 || ^0.13.0 || ^14.0.0 + checksum: 3e10eebf446ebc3f4a2ec9c5798495234cbb446ad4b37e292c0419249366795168c3dc46fa981dd46f11da7c470cab4dc4e3b7ee98831ccd57e71d3d0d9567b8 + languageName: node + linkType: hard + +"graphql@npm:^15.3.0": + version: 15.3.0 + resolution: "graphql@npm:15.3.0" + checksum: f01c05eb9102a88f33b4276b38399f066067d5be03c66c3bfec3e71e1793a91750e408f7abb001831bc2daf09746ab70ac0af16894d0ef1f1c1246fb0c2f4b0e + languageName: node + linkType: hard + "growly@npm:^1.3.0": version: 1.3.0 resolution: "growly@npm:1.3.0" @@ -5697,6 +5730,13 @@ fsevents@^2.1.2: languageName: node linkType: hard +"iterall@npm:^1.2.1": + version: 1.3.0 + resolution: "iterall@npm:1.3.0" + checksum: 25ae2d07cf97fc35d43fa7af814839689416b83d3ade0fec97a62c58b7b9fad5ff89dd0ede99f2d67cae2697ffa6987f0ab10876f40ae6466e802609a05b1006 + languageName: node + linkType: hard + "java-properties@npm:^1.0.0": version: 1.0.2 resolution: "java-properties@npm:1.0.2" @@ -10826,7 +10866,7 @@ typescript@^3.9.7: languageName: node linkType: hard -"ws@npm:^7.2.3": +"ws@npm:^7.2.3, ws@npm:^7.3.1": version: 7.3.1 resolution: "ws@npm:7.3.1" peerDependencies: