Skip to content

Commit

Permalink
fix: provide websocket instead of stream to avoid potential backpress…
Browse files Browse the repository at this point in the history
…ure issues (#289) (#290)

* fix: provide websocket instead of stream to avoid potential backpressure issues (#289)

* chore: update documentation and tests

* chore: update documentation to include an example of using createWebSocketStream

* chore: update failing unit test
  • Loading branch information
mat-sz committed Mar 18, 2024
1 parent 25d37d2 commit bc969b7
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 362 deletions.
100 changes: 58 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ After registering this plugin, you can choose on which routes the WS server will
const fastify = require('fastify')()
fastify.register(require('@fastify/websocket'))
fastify.register(async function (fastify) {
fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})
})
Expand All @@ -63,17 +63,17 @@ fastify.register(require('@fastify/websocket'), {
})

fastify.register(async function (fastify) {
fastify.get('/*', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/*', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from wildcard route')
socket.send('hi from wildcard route')
})
})

fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})
})
Expand All @@ -93,10 +93,10 @@ It is important that websocket route handlers attach event handlers synchronousl
Here is an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing:

```javascript
fastify.get('/*', { websocket: true }, (connection, request) => {
fastify.get('/*', { websocket: true }, (socket, request) => {
const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise

connection.socket.on('message', async (message) => {
socket.on('message', async (message) => {
const session = await sessionPromise()
// do something with the message and session
})
Expand All @@ -113,9 +113,9 @@ fastify.addHook('preValidation', async (request, reply) => {
await reply.code(401).send("not authenticated");
}
})
fastify.get('/', { websocket: true }, (connection, req) => {
fastify.get('/', { websocket: true }, (socket, req) => {
// the connection will only be opened for authenticated incoming requests
connection.socket.on('message', message => {
socket.on('message', message => {
// ...
})
})
Expand All @@ -134,13 +134,13 @@ import websocket from '@fastify/websocket'
const fastify = Fastify()
await fastify.register(websocket)

fastify.get('/', { websocket: true }, function wsHandler (connection, req) {
fastify.get('/', { websocket: true }, function wsHandler (socket, req) {
// bound to fastify server
this.myDecoration.someFunc()

connection.socket.on('message', message => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})

Expand All @@ -154,8 +154,8 @@ If you need to handle both HTTP requests and incoming socket connections on the

const fastify = require('fastify')()

function handle (conn, req) {
conn.pipe(conn) // creates an echo server
function handle (socket, req) {
socket.on('message', (data) => socket.send(data)) // creates an echo server
}

fastify.register(require('@fastify/websocket'), {
Expand All @@ -171,13 +171,12 @@ fastify.register(async function () {
// this will handle http requests
reply.send({ hello: 'world' })
},
wsHandler: (conn, req) => {
wsHandler: (socket, req) => {
// this will handle websockets connections
conn.setEncoding('utf8')
conn.write('hello client')
socket.send('hello client')

conn.once('data', chunk => {
conn.end()
socket.once('message', chunk => {
socket.close()
})
}
})
Expand All @@ -201,10 +200,10 @@ Neither the `errorHandler` passed to this plugin or fastify's `onError` hook wil
const fastify = require('fastify')()

fastify.register(require('@fastify/websocket'), {
errorHandler: function (error, conn /* SocketStream */, req /* FastifyRequest */, reply /* FastifyReply */) {
errorHandler: function (error, socket /* WebSocket */, req /* FastifyRequest */, reply /* FastifyReply */) {
// Do stuff
// destroy/close connection
conn.destroy(error)
socket.terminate()
},
options: {
maxPayload: 1048576, // we set the maximum allowed messages size to 1 MiB (1024 bytes * 1024 bytes)
Expand All @@ -217,10 +216,10 @@ fastify.register(require('@fastify/websocket'), {
}
})

fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})

Expand All @@ -247,8 +246,8 @@ fastify.register(require('@fastify/websocket'), {
preClose: (done) => { // Note: can also use async style, without done-callback
const server = this.websocketServer

for (const connection of server.clients) {
connection.close(1001, 'WS server is going offline in custom manner, sending a code + message')
for (const socket of server.clients) {
socket.close(1001, 'WS server is going offline in custom manner, sending a code + message')
}

server.close(done)
Expand All @@ -263,6 +262,32 @@ It allows to test easily a websocket endpoint.

The signature of injectWS is the following: `([path], [upgradeContext])`.


### Creating a stream from the WebSocket

```js
const Fastify = require('fastify')
const FastifyWebSocket = require('@fastify/websocket')
const ws = require('ws')

const fastify = Fastify()
await fastify.register(websocket)

fastify.get('/', { websocket: true }, (socket, req) => {
const stream = ws.createWebSocketStream(socket, { /* options */ })
stream.setEncoding('utf8')
stream.write('hello client')

stream.on('data', function (data) {
// Make sure to set up a data handler or read all the incoming
// data in another way, otherwise stream backpressure will cause
// the underlying WebSocket object to get paused.
})
})

await fastify.listen({ port: 3000 })
```

#### App.js

```js
Expand All @@ -282,9 +307,9 @@ App.register(async function(fastify) {
}
})

fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
fastify.get('/', { websocket: true }, (socket) => {
socket.on('message', message => {
socket.send('hi from server')
})
})
})
Expand Down Expand Up @@ -350,15 +375,6 @@ _**NB** The `path` option from `ws` should not be provided since the routing is

_**NB** The `noServer` option from `ws` should not be provided since the point of @fastify/websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_

You can also pass the following as `connectionOptions` for [createWebSocketStream](https://github.com/websockets/ws/blob/master/doc/ws.md#createwebsocketstreamwebsocket-options).

- `allowHalfOpen` <boolean> If set to false, then the stream will automatically end the writable side when the readable side ends. Default: true.
- `readable` <boolean> Sets whether the Duplex should be readable. Default: true.
- `writable` <boolean> Sets whether the Duplex should be writable. Default: true.
- `readableObjectMode` <boolean> Sets objectMode for readable side of the stream. Has no effect if objectMode is true. Default: false.
- `readableHighWaterMark` <number> Sets highWaterMark for the readable side of the stream.
- `writableHighWaterMark` <number> Sets highWaterMark for the writable side of the stream.

[ws](https://github.com/websockets/ws) does not allow you to set `objectMode` or `writableObjectMode` to true
## Acknowledgements

Expand Down
36 changes: 11 additions & 25 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,11 @@ function fastifyWebsocket (fastify, opts, next) {
wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => {
wss.emit('connection', socket, rawRequest)

const connection = WebSocket.createWebSocketStream(socket, opts.connectionOptions)
connection.socket = socket

connection.on('error', (error) => {
socket.on('error', (error) => {
fastify.log.error(error)
})

connection.socket.on('newListener', event => {
if (event === 'message') {
connection.resume()
}
})

callback(connection)
callback(socket)
})
}

Expand Down Expand Up @@ -187,20 +178,20 @@ function fastifyWebsocket (fastify, opts, next) {
// within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so
if (request.raw[kWs]) {
reply.hijack()
handleUpgrade(request.raw, connection => {
handleUpgrade(request.raw, socket => {
let result
try {
if (isWebsocketRoute) {
result = wsHandler.call(this, connection, request)
result = wsHandler.call(this, socket, request)
} else {
result = noHandle.call(this, connection, request)
result = noHandle.call(this, socket, request)
}
} catch (err) {
return errorHandler.call(this, err, connection, request, reply)
return errorHandler.call(this, err, socket, request, reply)
}

if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, connection, request, reply))
result.catch(err => errorHandler.call(this, err, socket, request, reply))
}
})
} else {
Expand Down Expand Up @@ -229,19 +220,14 @@ function fastifyWebsocket (fastify, opts, next) {
done()
}

function noHandle (connection, rawRequest) {
function noHandle (socket, rawRequest) {
this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler')
connection.socket.close()
socket.close()
}

function defaultErrorHandler (error, conn, request) {
// Before destroying the connection, we attach an error listener.
// Since we already handled the error, adding this listener prevents the ws
// library from emitting the error and causing an uncaughtException
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
conn.on('error', _ => { })
function defaultErrorHandler (error, socket, request) {
request.log.error(error)
conn.destroy(error)
socket.terminate()
}

next()
Expand Down

0 comments on commit bc969b7

Please sign in to comment.