diff --git a/README.md b/README.md index 4860ba6..d4a17b0 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,10 @@ After registering this plugin, you can choose on which routes the WS server will const fastify = require('fastify')() fastify.register(require('@fastify/websocket')) fastify.register(async function (fastify) { - fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => { - connection.socket.on('message', message => { + fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => { + socket.on('message', message => { // message.toString() === 'hi from client' - connection.socket.send('hi from server') + socket.send('hi from server') }) }) }) @@ -63,17 +63,17 @@ fastify.register(require('@fastify/websocket'), { }) fastify.register(async function (fastify) { - fastify.get('/*', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => { - connection.socket.on('message', message => { + fastify.get('/*', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => { + socket.on('message', message => { // message.toString() === 'hi from client' - connection.socket.send('hi from wildcard route') + socket.send('hi from wildcard route') }) }) - fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => { - connection.socket.on('message', message => { + fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => { + socket.on('message', message => { // message.toString() === 'hi from client' - connection.socket.send('hi from server') + socket.send('hi from server') }) }) }) @@ -93,10 +93,10 @@ It is important that websocket route handlers attach event handlers synchronousl Here is an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing: ```javascript -fastify.get('/*', { websocket: true }, (connection, request) => { +fastify.get('/*', { websocket: true }, (socket, request) => { const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise - connection.socket.on('message', async (message) => { + socket.on('message', async (message) => { const session = await sessionPromise() // do something with the message and session }) @@ -113,9 +113,9 @@ fastify.addHook('preValidation', async (request, reply) => { await reply.code(401).send("not authenticated"); } }) -fastify.get('/', { websocket: true }, (connection, req) => { +fastify.get('/', { websocket: true }, (socket, req) => { // the connection will only be opened for authenticated incoming requests - connection.socket.on('message', message => { + socket.on('message', message => { // ... }) }) @@ -134,13 +134,13 @@ import websocket from '@fastify/websocket' const fastify = Fastify() await fastify.register(websocket) -fastify.get('/', { websocket: true }, function wsHandler (connection, req) { +fastify.get('/', { websocket: true }, function wsHandler (socket, req) { // bound to fastify server this.myDecoration.someFunc() - connection.socket.on('message', message => { + socket.on('message', message => { // message.toString() === 'hi from client' - connection.socket.send('hi from server') + socket.send('hi from server') }) }) @@ -154,8 +154,8 @@ If you need to handle both HTTP requests and incoming socket connections on the const fastify = require('fastify')() -function handle (conn, req) { - conn.pipe(conn) // creates an echo server +function handle (socket, req) { + socket.on('message', (data) => socket.send(data)) // creates an echo server } fastify.register(require('@fastify/websocket'), { @@ -171,13 +171,12 @@ fastify.register(async function () { // this will handle http requests reply.send({ hello: 'world' }) }, - wsHandler: (conn, req) => { + wsHandler: (socket, req) => { // this will handle websockets connections - conn.setEncoding('utf8') - conn.write('hello client') + socket.send('hello client') - conn.once('data', chunk => { - conn.end() + socket.once('message', chunk => { + socket.close() }) } }) @@ -201,10 +200,10 @@ Neither the `errorHandler` passed to this plugin or fastify's `onError` hook wil const fastify = require('fastify')() fastify.register(require('@fastify/websocket'), { - errorHandler: function (error, conn /* SocketStream */, req /* FastifyRequest */, reply /* FastifyReply */) { + errorHandler: function (error, socket /* WebSocket */, req /* FastifyRequest */, reply /* FastifyReply */) { // Do stuff // destroy/close connection - conn.destroy(error) + socket.terminate() }, options: { maxPayload: 1048576, // we set the maximum allowed messages size to 1 MiB (1024 bytes * 1024 bytes) @@ -217,10 +216,10 @@ fastify.register(require('@fastify/websocket'), { } }) -fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => { - connection.socket.on('message', message => { +fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => { + socket.on('message', message => { // message.toString() === 'hi from client' - connection.socket.send('hi from server') + socket.send('hi from server') }) }) @@ -247,8 +246,8 @@ fastify.register(require('@fastify/websocket'), { preClose: (done) => { // Note: can also use async style, without done-callback const server = this.websocketServer - for (const connection of server.clients) { - connection.close(1001, 'WS server is going offline in custom manner, sending a code + message') + for (const socket of server.clients) { + socket.close(1001, 'WS server is going offline in custom manner, sending a code + message') } server.close(done) @@ -263,6 +262,32 @@ It allows to test easily a websocket endpoint. The signature of injectWS is the following: `([path], [upgradeContext])`. + +### Creating a stream from the WebSocket + +```js +const Fastify = require('fastify') +const FastifyWebSocket = require('@fastify/websocket') +const ws = require('ws') + +const fastify = Fastify() +await fastify.register(websocket) + +fastify.get('/', { websocket: true }, (socket, req) => { + const stream = ws.createWebSocketStream(socket, { /* options */ }) + stream.setEncoding('utf8') + stream.write('hello client') + + stream.on('data', function (data) { + // Make sure to set up a data handler or read all the incoming + // data in another way, otherwise stream backpressure will cause + // the underlying WebSocket object to get paused. + }) +}) + +await fastify.listen({ port: 3000 }) +``` + #### App.js ```js @@ -282,9 +307,9 @@ App.register(async function(fastify) { } }) - fastify.get('/', { websocket: true }, (connection) => { - connection.socket.on('message', message => { - connection.socket.send('hi from server') + fastify.get('/', { websocket: true }, (socket) => { + socket.on('message', message => { + socket.send('hi from server') }) }) }) @@ -350,15 +375,6 @@ _**NB** The `path` option from `ws` should not be provided since the routing is _**NB** The `noServer` option from `ws` should not be provided since the point of @fastify/websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_ -You can also pass the following as `connectionOptions` for [createWebSocketStream](https://github.com/websockets/ws/blob/master/doc/ws.md#createwebsocketstreamwebsocket-options). - -- `allowHalfOpen` If set to false, then the stream will automatically end the writable side when the readable side ends. Default: true. -- `readable` Sets whether the Duplex should be readable. Default: true. -- `writable` Sets whether the Duplex should be writable. Default: true. -- `readableObjectMode` Sets objectMode for readable side of the stream. Has no effect if objectMode is true. Default: false. -- `readableHighWaterMark` Sets highWaterMark for the readable side of the stream. -- `writableHighWaterMark` Sets highWaterMark for the writable side of the stream. - [ws](https://github.com/websockets/ws) does not allow you to set `objectMode` or `writableObjectMode` to true ## Acknowledgements diff --git a/index.js b/index.js index ef639ef..186961a 100644 --- a/index.js +++ b/index.js @@ -122,20 +122,11 @@ function fastifyWebsocket (fastify, opts, next) { wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => { wss.emit('connection', socket, rawRequest) - const connection = WebSocket.createWebSocketStream(socket, opts.connectionOptions) - connection.socket = socket - - connection.on('error', (error) => { + socket.on('error', (error) => { fastify.log.error(error) }) - connection.socket.on('newListener', event => { - if (event === 'message') { - connection.resume() - } - }) - - callback(connection) + callback(socket) }) } @@ -187,20 +178,20 @@ function fastifyWebsocket (fastify, opts, next) { // within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so if (request.raw[kWs]) { reply.hijack() - handleUpgrade(request.raw, connection => { + handleUpgrade(request.raw, socket => { let result try { if (isWebsocketRoute) { - result = wsHandler.call(this, connection, request) + result = wsHandler.call(this, socket, request) } else { - result = noHandle.call(this, connection, request) + result = noHandle.call(this, socket, request) } } catch (err) { - return errorHandler.call(this, err, connection, request, reply) + return errorHandler.call(this, err, socket, request, reply) } if (result && typeof result.catch === 'function') { - result.catch(err => errorHandler.call(this, err, connection, request, reply)) + result.catch(err => errorHandler.call(this, err, socket, request, reply)) } }) } else { @@ -229,19 +220,14 @@ function fastifyWebsocket (fastify, opts, next) { done() } - function noHandle (connection, rawRequest) { + function noHandle (socket, rawRequest) { this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler') - connection.socket.close() + socket.close() } - function defaultErrorHandler (error, conn, request) { - // Before destroying the connection, we attach an error listener. - // Since we already handled the error, adding this listener prevents the ws - // library from emitting the error and causing an uncaughtException - // Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35 - conn.on('error', _ => { }) + function defaultErrorHandler (error, socket, request) { request.log.error(error) - conn.destroy(error) + socket.terminate() } next() diff --git a/test/base.test.js b/test/base.test.js index e03a14a..be43118 100644 --- a/test/base.test.js +++ b/test/base.test.js @@ -1,7 +1,6 @@ 'use strict' const http = require('node:http') -const util = require('node:util') const split = require('split2') const test = require('tap').test const Fastify = require('fastify') @@ -22,29 +21,27 @@ test('Should expose a websocket', async (t) => { await fastify.register(fastifyWebsocket) - fastify.get('/', { websocket: true }, (connection) => { - connection.setEncoding('utf8') - t.teardown(() => connection.destroy()) + fastify.get('/', { websocket: true }, (socket) => { + t.teardown(() => socket.terminate()) - connection.once('data', (chunk) => { - t.equal(chunk, 'hello server') - connection.write('hello client') - connection.end() + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') + socket.send('hello client') }) }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + t.teardown(() => ws.close()) - client.setEncoding('utf8') - client.write('hello server') + const chunkPromise = once(ws, 'message') + await once(ws, 'open') + ws.send('hello server') - const [chunk] = await once(client, 'data') - t.equal(chunk, 'hello client') - client.end() + const [chunk] = await chunkPromise + t.equal(chunk.toString(), 'hello client') + ws.close() }) test('Should fail if custom errorHandler is not a function', async (t) => { @@ -59,8 +56,8 @@ test('Should fail if custom errorHandler is not a function', async (t) => { t.equal(err.message, 'invalid errorHandler function') } - fastify.get('/', { websocket: true }, (connection) => { - t.teardown(() => connection.destroy()) + fastify.get('/', { websocket: true }, (socket) => { + t.teardown(() => socket.terminate()) }) try { @@ -88,17 +85,17 @@ test('Should run custom errorHandler on wildcard route handler error', async (t) } }) - fastify.get('/*', { websocket: true }, (conn) => { - conn.pipe(conn) - t.teardown(() => conn.destroy()) + fastify.get('/*', { websocket: true }, (socket) => { + socket.on('message', (data) => socket.send(data)) + t.teardown(() => socket.terminate()) return Promise.reject(new Error('Fail')) }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + t.teardown(() => ws.close()) + await p }) @@ -120,16 +117,15 @@ test('Should run custom errorHandler on error inside websocket handler', async ( await fastify.register(fastifyWebsocket, options) - fastify.get('/', { websocket: true }, function wsHandler (conn) { - conn.pipe(conn) - t.teardown(() => conn.destroy()) + fastify.get('/', { websocket: true }, function wsHandler (socket) { + socket.on('message', (data) => socket.send(data)) + t.teardown(() => socket.terminate()) throw new Error('Fail') }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + t.teardown(() => ws.close()) await p }) @@ -152,20 +148,20 @@ test('Should run custom errorHandler on error inside async websocket handler', a await fastify.register(fastifyWebsocket, options) - fastify.get('/', { websocket: true }, async function wsHandler (conn) { - conn.pipe(conn) - t.teardown(() => conn.destroy()) + fastify.get('/', { websocket: true }, async function wsHandler (socket) { + socket.on('message', (data) => socket.send(data)) + t.teardown(() => socket.terminate()) throw new Error('Fail') }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + t.teardown(() => ws.close()) + await p }) -test('Should be able to pass custom options to websocket-stream', async (t) => { +test('Should be able to pass custom options to ws', async (t) => { t.plan(2) const fastify = Fastify() @@ -181,27 +177,27 @@ test('Should be able to pass custom options to websocket-stream', async (t) => { await fastify.register(fastifyWebsocket, { options }) - fastify.get('/*', { websocket: true }, (connection) => { - connection.pipe(connection) - t.teardown(() => connection.destroy()) + fastify.get('/*', { websocket: true }, (socket) => { + socket.on('message', (data) => socket.send(data)) + t.teardown(() => socket.terminate()) }) await fastify.listen({ port: 0 }) const clientOptions = { headers: { 'x-custom-header': 'fastify is awesome !' } } const ws = new WebSocket('ws://localhost:' + fastify.server.address().port, clientOptions) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + const chunkPromise = once(ws, 'message') + await once(ws, 'open') + t.teardown(() => ws.close()) - client.setEncoding('utf8') - client.write('hello') + ws.send('hello') - const [chunk] = await once(client, 'data') - t.equal(chunk, 'hello') - client.end() + const [chunk] = await chunkPromise + t.equal(chunk.toString(), 'hello') + ws.close() }) -test('Should warn if path option is provided to websocket-stream', async (t) => { +test('Should warn if path option is provided to ws', async (t) => { t.plan(3) const logStream = split(JSON.parse) const fastify = Fastify({ @@ -221,27 +217,27 @@ test('Should warn if path option is provided to websocket-stream', async (t) => const options = { path: '/' } await fastify.register(fastifyWebsocket, { options }) - fastify.get('/*', { websocket: true }, (connection) => { - connection.pipe(connection) - t.teardown(() => connection.destroy()) + fastify.get('/*', { websocket: true }, (socket) => { + socket.on('message', (data) => socket.send(data)) + t.teardown(() => socket.terminate()) }) await fastify.listen({ port: 0 }) const clientOptions = { headers: { 'x-custom-header': 'fastify is awesome !' } } const ws = new WebSocket('ws://localhost:' + fastify.server.address().port, clientOptions) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + const chunkPromise = once(ws, 'message') + await once(ws, 'open') + t.teardown(() => ws.close()) - client.setEncoding('utf8') - client.write('hello') + ws.send('hello') - const [chunk] = await once(client, 'data') - t.equal(chunk, 'hello') - client.end() + const [chunk] = await chunkPromise + t.equal(chunk.toString(), 'hello') + ws.close() }) -test('Should be able to pass a custom server option to websocket-stream', async (t) => { +test('Should be able to pass a custom server option to ws', async (t) => { // We create an external server const externalServerPort = 3000 const externalServer = http @@ -263,26 +259,26 @@ test('Should be able to pass a custom server option to websocket-stream', async await fastify.register(fastifyWebsocket, { options }) - fastify.get('/', { websocket: true }, (connection) => { - connection.pipe(connection) - t.teardown(() => connection.destroy()) + fastify.get('/', { websocket: true }, (socket) => { + socket.on('message', (data) => socket.send(data)) + t.teardown(() => socket.terminate()) }) await fastify.ready() const ws = new WebSocket('ws://localhost:' + externalServerPort) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + const chunkPromise = once(ws, 'message') + await once(ws, 'open') + t.teardown(() => ws.close()) - client.setEncoding('utf8') - client.write('hello') + ws.send('hello') - const [chunk] = await once(client, 'data') - t.equal(chunk, 'hello') - client.end() + const [chunk] = await chunkPromise + t.equal(chunk.toString(), 'hello') + ws.close() }) -test('Should be able to pass clientTracking option in false to websocket-stream', (t) => { +test('Should be able to pass clientTracking option in false to ws', (t) => { t.plan(2) const fastify = Fastify() @@ -293,8 +289,8 @@ test('Should be able to pass clientTracking option in false to websocket-stream' fastify.register(fastifyWebsocket, { options }) - fastify.get('/*', { websocket: true }, (connection) => { - connection.destroy() + fastify.get('/*', { websocket: true }, (socket) => { + socket.close() }) fastify.listen({ port: 0 }, (err) => { @@ -306,46 +302,6 @@ test('Should be able to pass clientTracking option in false to websocket-stream' }) }) -test('Should be able to pass custom connectionOptions to createWebSocketStream', async (t) => { - t.plan(2) - - const fastify = Fastify() - t.teardown(() => fastify.close()) - - const connectionOptions = { - readableObjectMode: true - } - - await fastify.register(fastifyWebsocket, { connectionOptions }) - - let _resolve - const p = new Promise((resolve) => { - _resolve = resolve - }) - - fastify.get('/', { websocket: true }, (connection) => { - t.equal(connection.readableObjectMode, true) - connection.socket.binaryType = 'arraybuffer' - - connection.once('data', (chunk) => { - const message = new util.TextDecoder().decode(chunk) - t.equal(message, 'Hello') - _resolve() - }) - t.teardown(() => connection.destroy()) - }) - - await fastify.listen({ port: 0 }) - - const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) - - client.setEncoding('utf8') - client.write('Hello') - await p -}) - test('Should be able to pass preClose option to override default', async (t) => { t.plan(3) @@ -362,29 +318,27 @@ test('Should be able to pass preClose option to override default', async (t) => await fastify.register(fastifyWebsocket, { preClose }) - fastify.get('/', { websocket: true }, (connection) => { - connection.setEncoding('utf8') - t.teardown(() => connection.destroy()) + fastify.get('/', { websocket: true }, (socket) => { + t.teardown(() => socket.terminate()) - connection.once('data', (chunk) => { - t.equal(chunk, 'hello server') - connection.write('hello client') - connection.end() + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') + socket.send('hello client') }) }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.teardown(() => client.destroy()) + t.teardown(() => ws.close()) - client.setEncoding('utf8') - client.write('hello server') + const chunkPromise = once(ws, 'message') + await once(ws, 'open') + ws.send('hello server') - const [chunk] = await once(client, 'data') - t.equal(chunk, 'hello client') - client.end() + const [chunk] = await chunkPromise + t.equal(chunk.toString(), 'hello client') + ws.close() await fastify.close() }) @@ -403,8 +357,8 @@ test('Should fail if custom preClose is not a function', async (t) => { t.equal(err.message, 'invalid preClose function') } - fastify.get('/', { websocket: true }, (connection) => { - t.teardown(() => connection.destroy()) + fastify.get('/', { websocket: true }, (socket) => { + t.teardown(() => socket.terminate()) }) try { @@ -422,30 +376,27 @@ test('Should gracefully close with a connected client', async (t) => { await fastify.register(fastifyWebsocket) let serverConnEnded - fastify.get('/', { websocket: true }, (connection) => { - connection.setEncoding('utf8') - connection.write('hello client') + fastify.get('/', { websocket: true }, (socket) => { + socket.send('hello client') - connection.once('data', (chunk) => { - t.equal(chunk, 'hello server') + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') }) - serverConnEnded = once(connection, 'end') + serverConnEnded = once(socket, 'close') // this connection stays alive untile we close the server }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - - client.setEncoding('utf8') - client.write('hello server') - - const ended = once(client, 'end') + const chunkPromise = once(ws, 'message') + await once(ws, 'open') + ws.send('hello server') - const [chunk] = await once(client, 'data') - t.equal(chunk, 'hello client') + const ended = once(ws, 'close') + const [chunk] = await chunkPromise + t.equal(chunk.toString(), 'hello client') await fastify.close() await ended await serverConnEnded @@ -469,9 +420,9 @@ test('Should gracefully close when clients attempt to connect after calling clos await fastify.register(fastifyWebsocket) - fastify.get('/', { websocket: true }, (connection) => { + fastify.get('/', { websocket: true }, (socket) => { t.pass('received client connection') - connection.destroy() + socket.close() // this connection stays alive until we close the server }) @@ -505,7 +456,7 @@ test('Should keep accepting connection', { skip: !timersPromises }, async t => { await fastify.register(fastifyWebsocket) - fastify.get('/', { websocket: true }, ({ socket }) => { + fastify.get('/', { websocket: true }, (socket) => { socket.on('message', () => { unhandled-- }) @@ -561,7 +512,7 @@ test('Should keep processing message when many medium sized messages are sent', await fastify.register(fastifyWebsocket) - fastify.get('/', { websocket: true }, ({ socket }) => { + fastify.get('/', { websocket: true }, (socket) => { socket.on('message', () => { socket.send('handled') }) @@ -631,7 +582,7 @@ test('Should Handle WebSocket errors to avoid Node.js crashes', async t => { const fastify = Fastify() await fastify.register(fastifyWebsocket) - fastify.get('/', { websocket: true }, ({ socket }) => { + fastify.get('/', { websocket: true }, (socket) => { socket.on('error', err => { t.equal(err.code, 'WS_ERR_UNEXPECTED_RSV_2_3') }) diff --git a/test/hooks.test.js b/test/hooks.test.js index 87d580e..35e4de1 100644 --- a/test/hooks.test.js +++ b/test/hooks.test.js @@ -21,14 +21,12 @@ test('Should run onRequest, preValidation, preHandler hooks', t => { fastify.addHook('preValidation', async (request, reply) => t.ok('called', 'preValidation')) fastify.addHook('preHandler', async (request, reply) => t.ok('called', 'preHandler')) - fastify.get('/echo', { websocket: true }, (conn, request) => { - conn.setEncoding('utf8') - conn.write('hello client') - t.teardown(conn.destroy.bind(conn)) + fastify.get('/echo', { websocket: true }, (socket, request) => { + socket.send('hello client') + t.teardown(() => socket.terminate()) - conn.once('data', chunk => { - t.equal(chunk, 'hello server') - conn.end() + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') }) }) }) @@ -60,11 +58,10 @@ test('Should not run onTimeout hook', t => { fastify.register(async function () { fastify.addHook('onTimeout', async (request, reply) => t.fail('called', 'onTimeout')) - fastify.get('/echo', { websocket: true }, (conn, request) => { - conn.setEncoding('utf8') - conn.write('hello client') + fastify.get('/echo', { websocket: true }, (socket, request) => { + socket.send('hello client') request.raw.setTimeout(50) - t.teardown(conn.destroy.bind(conn)) + t.teardown(() => socket.terminate()) }) }) @@ -206,8 +203,8 @@ test('Should not run onError hook if reply was already hijacked (error thrown in fastify.register(async function (fastify) { fastify.addHook('onError', async (request, reply) => t.fail('called', 'onError')) - fastify.get('/echo', { websocket: true }, async (conn, request) => { - t.teardown(conn.destroy.bind(conn)) + fastify.get('/echo', { websocket: true }, async (socket, request) => { + t.teardown(() => socket.terminate()) throw new Error('Fail') }) }) @@ -233,11 +230,9 @@ test('Should not run preSerialization/onSend hooks', t => { fastify.addHook('onSend', async (request, reply) => t.fail('called', 'onSend')) fastify.addHook('preSerialization', async (request, reply) => t.fail('called', 'preSerialization')) - fastify.get('/echo', { websocket: true }, async (conn, request) => { - conn.setEncoding('utf8') - conn.write('hello client') - t.teardown(conn.destroy.bind(conn)) - conn.end() + fastify.get('/echo', { websocket: true }, async (socket, request) => { + socket.send('hello client') + t.teardown(() => socket.terminate()) }) }) @@ -299,14 +294,12 @@ test('Should run async hooks and still deliver quickly sent messages', (t) => { async () => await new Promise((resolve) => setTimeout(resolve, 25)) ) - fastify.get('/echo', { websocket: true }, (conn, request) => { - conn.setEncoding('utf8') - conn.write('hello client') - t.teardown(conn.destroy.bind(conn)) + fastify.get('/echo', { websocket: true }, (socket, request) => { + socket.send('hello client') + t.teardown(() => socket.terminate()) - conn.socket.on('message', (message) => { + socket.on('message', (message) => { t.equal(message.toString('utf-8'), 'hello server') - conn.end() }) }) }) @@ -362,7 +355,7 @@ test('Should not hijack reply for an normal request to a websocket route that is }) test('Should not hijack reply for an WS request to a WS route that gets sent a normal HTTP response in a hook', t => { - t.plan(6) + t.plan(2) const stream = split(JSON.parse) const fastify = Fastify({ logger: { stream } }) @@ -377,7 +370,9 @@ test('Should not hijack reply for an WS request to a WS route that gets sent a n }) stream.on('data', (chunk) => { - t.ok(chunk.level < 50) + if (chunk.level >= 50) { + t.fail() + } }) fastify.listen({ port: 0 }, err => { diff --git a/test/inject.test.js b/test/inject.test.js index bb012c0..8082f2b 100644 --- a/test/inject.test.js +++ b/test/inject.test.js @@ -20,8 +20,8 @@ test('routes correctly the message', async (t) => { fastify.register( async function (instance) { - instance.get('/ws', { websocket: true }, function (conn) { - conn.once('data', chunk => { + instance.get('/ws', { websocket: true }, function (socket) { + socket.once('message', chunk => { _resolve(chunk.toString()) }) }) @@ -43,8 +43,8 @@ test('redirect on / if no path specified', async (t) => { fastify.register( async function (instance) { - instance.get('/', { websocket: true }, function (conn) { - conn.once('data', chunk => { + instance.get('/', { websocket: true }, function (socket) { + socket.once('message', chunk => { _resolve(chunk.toString()) }) }) @@ -67,14 +67,14 @@ test('routes correctly the message between two routes', async (t) => { fastify.register( async function (instance) { - instance.get('/ws', { websocket: true }, function (conn) { - conn.once('data', () => { + instance.get('/ws', { websocket: true }, function (socket) { + socket.once('message', chunk => { _reject('wrong-route') }) }) - instance.get('/ws-2', { websocket: true }, function (conn) { - conn.once('data', chunk => { + instance.get('/ws-2', { websocket: true }, function (socket) { + socket.once('message', chunk => { _resolve(chunk.toString()) }) }) @@ -102,8 +102,8 @@ test('use the upgrade context to upgrade if there is some hook', async (t) => { } }) - instance.get('/', { websocket: true }, function (conn) { - conn.once('data', chunk => { + instance.get('/', { websocket: true }, function (socket) { + socket.once('message', chunk => { _resolve(chunk.toString()) }) }) diff --git a/test/router.test.js b/test/router.test.js index 7a46400..b0b44b0 100644 --- a/test/router.test.js +++ b/test/router.test.js @@ -16,15 +16,13 @@ test('Should expose a websocket on prefixed route', t => { fastify.register(fastifyWebsocket) fastify.register( function (instance, opts, next) { - instance.get('/echo', { websocket: true }, function (conn, request) { + instance.get('/echo', { websocket: true }, function (socket, request) { t.equal(this.prefix, '/baz') - conn.setEncoding('utf8') - conn.write('hello client') - t.teardown(conn.destroy.bind(conn)) + socket.send('hello client') + t.teardown(() => socket.terminate()) - conn.once('data', chunk => { - t.equal(chunk, 'hello server') - conn.end() + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') }) }) next() @@ -57,14 +55,12 @@ test('Should expose a websocket on prefixed route with /', t => { fastify.register(fastifyWebsocket) fastify.register( function (instance, opts, next) { - instance.get('/', { websocket: true }, (conn, req) => { - conn.setEncoding('utf8') - conn.write('hello client') - t.teardown(conn.destroy.bind(conn)) - - conn.once('data', chunk => { - t.equal(chunk, 'hello server') - conn.end() + instance.get('/', { websocket: true }, (socket, req) => { + socket.send('hello client') + t.teardown(() => socket.terminate()) + + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') }) }) next() @@ -103,14 +99,12 @@ test('Should expose websocket and http route', t => { handler: (request, reply) => { reply.send({ hello: 'world' }) }, - wsHandler: (conn, req) => { - conn.setEncoding('utf8') - conn.write('hello client') - t.teardown(conn.destroy.bind(conn)) - - conn.once('data', chunk => { - t.equal(chunk, 'hello server') - conn.end() + wsHandler: (socket, req) => { + socket.send('hello client') + t.teardown(() => socket.terminate()) + + socket.once('message', (chunk) => { + t.equal(chunk.toString(), 'hello server') }) } }) @@ -162,16 +156,16 @@ test('Should close on unregistered path (with no wildcard route websocket handle reply.send('hello world') }) - fastify.get('/echo', { websocket: true }, (connection, request) => { - connection.socket.on('message', message => { + fastify.get('/echo', { websocket: true }, (socket, request) => { + socket.on('message', message => { try { - connection.socket.send(message) + socket.send(message) } catch (err) { - connection.socket.send(err.message) + socket.send(err.message) } }) - t.teardown(connection.destroy.bind(connection)) + t.teardown(() => socket.terminate()) }) }) @@ -201,12 +195,12 @@ test('Should use wildcard websocket route when (with a normal http wildcard rout handler: (_, reply) => { reply.send({ hello: 'world' }) }, - wsHandler: (conn) => { - conn.setEncoding('utf8') - conn.write('hello client') + wsHandler: (socket) => { + socket.send('hello client') + t.teardown(() => socket.terminate()) - conn.once('data', () => { - conn.end() + socket.once('message', (chunk) => { + socket.close() }) } }) @@ -234,28 +228,28 @@ test('Should call wildcard route handler on unregistered path', t => { fastify .register(fastifyWebsocket) .register(async function (fastify) { - fastify.get('/*', { websocket: true }, (conn) => { - conn.socket.on('message', () => { + fastify.get('/*', { websocket: true }, (socket) => { + socket.on('message', () => { try { - conn.socket.send('hi from wildcard route handler') + socket.send('hi from wildcard route handler') } catch (err) { - conn.socket.send(err.message) + socket.send(err.message) } }) - t.teardown(conn.destroy.bind(conn)) + t.teardown(() => socket.terminate()) }) }) - fastify.get('/echo', { websocket: true }, (conn) => { - conn.socket.on('message', () => { + fastify.get('/echo', { websocket: true }, (socket) => { + socket.on('message', () => { try { - conn.socket.send('hi from /echo handler') + socket.send('hi from /echo handler') } catch (err) { - conn.socket.send(err.message) + socket.send(err.message) } }) - t.teardown(conn.destroy.bind(conn)) + t.teardown(() => socket.terminate()) }) fastify.listen({ port: 0 }, err => { @@ -293,9 +287,9 @@ test('Should invoke the correct handler depending on the headers', t => { handler: (request, reply) => { reply.send('hi from handler') }, - wsHandler: (conn, request) => { - conn.write('hi from wsHandler') - t.teardown(conn.destroy.bind(conn)) + wsHandler: (socket, request) => { + socket.send('hi from wsHandler') + t.teardown(() => socket.terminate()) } }) }) @@ -330,10 +324,10 @@ test('Should call the wildcard handler if a no other non-websocket route with pa fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/*', { websocket: true }, (conn, request) => { + fastify.get('/*', { websocket: true }, (socket, request) => { t.ok('called', 'wildcard handler') - conn.end() - t.teardown(conn.destroy.bind(conn)) + socket.close() + t.teardown(() => socket.terminate()) }) fastify.get('/http', (request, reply) => { @@ -360,9 +354,9 @@ test('Should close the connection if a non-websocket route with path exists', t fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/*', { websocket: true }, (conn, request) => { + fastify.get('/*', { websocket: true }, (socket, request) => { t.fail('called', 'wildcard handler') - t.teardown(conn.destroy.bind(conn)) + t.teardown(() => socket.terminate()) }) fastify.get('/http', (request, reply) => { @@ -389,15 +383,15 @@ test('Should throw on wrong HTTP method', t => { fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.post('/echo', { websocket: true }, (connection, request) => { - connection.socket.on('message', message => { + fastify.post('/echo', { websocket: true }, (socket, request) => { + socket.on('message', message => { try { - connection.socket.send(message) + socket.send(message) } catch (err) { - connection.socket.send(err.message) + socket.send(err.message) } }) - t.teardown(connection.destroy.bind(connection)) + t.teardown(() => socket.terminate()) }) fastify.get('/http', (request, reply) => { @@ -442,16 +436,16 @@ test('Should open on registered path', t => { fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/echo', { websocket: true }, (connection, request) => { - connection.socket.on('message', message => { + fastify.get('/echo', { websocket: true }, (socket, request) => { + socket.on('message', message => { try { - connection.socket.send(message) + socket.send(message) } catch (err) { - connection.socket.send(err.message) + socket.send(err.message) } }) - t.teardown(connection.destroy.bind(connection)) + t.teardown(() => socket.terminate()) }) }) @@ -477,17 +471,17 @@ test('Should send message and close', t => { fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/', { websocket: true }, (connection, request) => { - connection.socket.on('message', message => { + fastify.get('/', { websocket: true }, (socket, request) => { + socket.on('message', message => { t.equal(message.toString(), 'hi from client') - connection.socket.send('hi from server') + socket.send('hi from server') }) - connection.socket.on('close', () => { + socket.on('close', () => { t.pass() }) - t.teardown(connection.destroy.bind(connection)) + t.teardown(() => socket.terminate()) }) }) @@ -519,17 +513,17 @@ test('Should return 404 on http request', t => { fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/', { websocket: true }, (connection, request) => { - connection.socket.on('message', message => { + fastify.get('/', { websocket: true }, (socket, request) => { + socket.on('message', message => { t.equal(message.toString(), 'hi from client') - connection.socket.send('hi from server') + socket.send('hi from server') }) - connection.socket.on('close', () => { + socket.on('close', () => { t.pass() }) - t.teardown(connection.destroy.bind(connection)) + t.teardown(() => socket.terminate()) }) }) @@ -550,17 +544,17 @@ test('Should pass route params to per-route handlers', t => { fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/ws', { websocket: true }, (conn, request) => { + fastify.get('/ws', { websocket: true }, (socket, request) => { const params = request.params t.equal(Object.keys(params).length, 0, 'params are empty') - conn.write('empty') - conn.end() + socket.send('empty') + socket.close() }) - fastify.get('/ws/:id', { websocket: true }, (conn, request) => { + fastify.get('/ws/:id', { websocket: true }, (socket, request) => { const params = request.params t.equal(params.id, 'foo', 'params are correct') - conn.write(params.id) - conn.end() + socket.send(params.id) + socket.close() }) }) @@ -603,10 +597,10 @@ test('Should not throw error when register empty get with prefix', t => { fastify.register( function (instance, opts, next) { - instance.get('/', { websocket: true }, (connection, request) => { - connection.socket.on('message', message => { + instance.get('/', { websocket: true }, (socket, request) => { + socket.on('message', message => { t.equal(message.toString(), 'hi from client') - connection.socket.send('hi from server') + socket.send('hi from server') }) }) next() @@ -636,10 +630,10 @@ test('Should expose fastify instance to websocket per-route handler', t => { fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/ws', { websocket: true }, function wsHandler (conn) { + fastify.get('/ws', { websocket: true }, function wsHandler (socket) { t.equal(this, fastify, 'this is bound to fastify server') - conn.write('empty') - conn.end() + socket.send('empty') + socket.close() }) }) @@ -669,10 +663,10 @@ test('Should have access to decorators in per-route handler', t => { fastify.decorateRequest('str', 'it works!') fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/ws', { websocket: true }, function wsHandler (conn, request) { + fastify.get('/ws', { websocket: true }, function wsHandler (socket, request) { t.equal(request.str, 'it works!', 'decorator is accessible') - conn.write('empty') - conn.end() + socket.send('empty') + socket.close() }) }) @@ -698,9 +692,9 @@ test('should call `destroy` when exception is thrown inside async handler', t => fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { - fastify.get('/ws', { websocket: true }, async function wsHandler (conn, request) { - conn.on('error', err => { - t.equal(err.message, 'something wrong') + fastify.get('/ws', { websocket: true }, async function wsHandler (socket, request) { + socket.on('close', code => { + t.equal(code, 1006) t.end() }) throw new Error('something wrong') diff --git a/types/index.d.ts b/types/index.d.ts index 44420ea..d75546e 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -3,7 +3,6 @@ import { IncomingMessage, ServerResponse, Server } from 'http'; import { FastifyRequest, FastifyPluginCallback, RawServerBase, RawServerDefault, RawRequestDefaultExpression, RawReplyDefaultExpression, RequestGenericInterface, ContextConfigDefault, FastifyInstance, FastifySchema, FastifyTypeProvider, FastifyTypeProviderDefault, FastifyBaseLogger } from 'fastify'; import * as fastify from 'fastify'; import * as WebSocket from 'ws'; -import { Duplex, DuplexOptions } from 'stream'; import { FastifyReply } from 'fastify/types/reply'; import { preCloseHookHandler, preCloseAsyncHookHandler } from 'fastify/types/hooks'; import { RouteGenericInterface } from 'fastify/types/route'; @@ -80,16 +79,12 @@ declare namespace fastifyWebsocket { Logger extends FastifyBaseLogger = FastifyBaseLogger > = ( this: FastifyInstance, - connection: SocketStream, + socket: WebSocket.WebSocket, request: FastifyRequest ) => void | Promise; - export interface SocketStream extends Duplex { - socket: WebSocket; - } export interface WebsocketPluginOptions { - errorHandler?: (this: FastifyInstance, error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply) => void; + errorHandler?: (this: FastifyInstance, error: Error, socket: WebSocket.WebSocket, request: FastifyRequest, reply: FastifyReply) => void; options?: WebSocketServerOptions; - connectionOptions?: DuplexOptions; preClose?: preCloseHookHandler | preCloseAsyncHookHandler; } export interface RouteOptions< diff --git a/types/index.test-d.ts b/types/index.test-d.ts index 8722875..f7a242e 100644 --- a/types/index.test-d.ts +++ b/types/index.test-d.ts @@ -1,8 +1,8 @@ -import fastifyWebsocket, { WebsocketHandler, SocketStream, fastifyWebsocket as namedFastifyWebsocket, default as defaultFastifyWebsocket } from '..'; +import fastifyWebsocket, { WebsocketHandler, fastifyWebsocket as namedFastifyWebsocket, default as defaultFastifyWebsocket } from '..'; import type { IncomingMessage } from "http"; import fastify, { RouteOptions, FastifyRequest, FastifyInstance, FastifyReply, RequestGenericInterface, FastifyBaseLogger, RawServerDefault, FastifySchema, RawRequestDefaultExpression } from 'fastify'; import { expectType } from 'tsd'; -import { Server } from 'ws'; +import { Server, WebSocket } from 'ws'; import { RouteGenericInterface } from 'fastify/types/route'; import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox'; import { Type } from '@sinclair/typebox' @@ -12,10 +12,10 @@ app.register(fastifyWebsocket); app.register(fastifyWebsocket, {}); app.register(fastifyWebsocket, { options: { maxPayload: 123 } }); app.register(fastifyWebsocket, { - errorHandler: function errorHandler(error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply): void { + errorHandler: function errorHandler(error: Error, socket: WebSocket, request: FastifyRequest, reply: FastifyReply): void { expectType(this); expectType(error) - expectType(connection) + expectType(socket) expectType(request) expectType(reply) } @@ -24,17 +24,17 @@ app.register(fastifyWebsocket, { options: { perMessageDeflate: true } }); app.register(fastifyWebsocket, { preClose: function syncPreclose() {} }); app.register(fastifyWebsocket, { preClose: async function asyncPreclose(){} }); -app.get('/websockets-via-inferrence', { websocket: true }, async function (connection, request) { +app.get('/websockets-via-inferrence', { websocket: true }, async function (socket, request) { expectType(this); - expectType(connection); + expectType(socket); expectType(app.websocketServer); expectType>(request) expectType(request.ws); expectType(request.log); }); -const handler: WebsocketHandler = async (connection, request) => { - expectType(connection); +const handler: WebsocketHandler = async (socket, request) => { + expectType(socket); expectType(app.websocketServer); expectType>(request) } @@ -60,8 +60,8 @@ app.route({ expectType(reply); expectType(request.ws); }, - wsHandler: (connection, request) => { - expectType(connection); + wsHandler: (socket, request) => { + expectType(socket); expectType>(request); expectType(request.ws); }, @@ -74,8 +74,8 @@ const augmentedRouteOptions: RouteOptions = { expectType(request); expectType(reply); }, - wsHandler: (connection, request) => { - expectType(connection); + wsHandler: (socket, request) => { + expectType(socket); expectType>(request) }, }; @@ -84,8 +84,8 @@ app.route(augmentedRouteOptions); app.get<{ Params: { foo: string }, Body: { bar: string }, Querystring: { search: string }, Headers: { auth: string } }>('/shorthand-explicit-types', { websocket: true -}, async (connection, request) => { - expectType(connection); +}, async (socket, request) => { + expectType(socket); expectType<{ foo: string }>(request.params); expectType<{ bar: string }>(request.body); expectType<{ search: string }>(request.query); @@ -102,8 +102,8 @@ app.route<{ Params: { foo: string }, Body: { bar: string }, Querystring: { searc expectType<{ search: string }>(request.query); expectType(request.headers); }, - wsHandler: (connection, request) => { - expectType(connection); + wsHandler: (socket, request) => { + expectType(socket); expectType<{ foo: string }>(request.params); expectType<{ bar: string }>(request.body); expectType<{ search: string }>(request.query); @@ -139,8 +139,8 @@ server.route({ expectType<{ search: string }>(request.query); expectType(request.headers); }, - wsHandler: (connection, request) => { - expectType(connection); + wsHandler: (socket, request) => { + expectType(socket); expectType<{ foo: string }>(request.params); expectType<{ bar: string }>(request.body); expectType<{ search: string }>(request.query); @@ -150,9 +150,9 @@ server.route({ server.get('/websockets-no-type-inference', { websocket: true }, - async function (connection, request) { + async function (socket, request) { expectType(this); - expectType(connection); + expectType(socket); expectType(app.websocketServer); expectType>(request); expectType(request.ws);