Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling around Device Agent tunnels #2488

Merged
merged 3 commits into from Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/contribute/workflows/device-editor.md
@@ -0,0 +1,29 @@
---
navTitle: Device Editor
---

# Enabling the device editor

```mermaid
sequenceDiagram
User->>FrontEnd: Clicks 'open editor' against device
FrontEnd->>+Forge: PUT /api/v1/devices/:id/editor { tunnel: 'enable' }
Forge->Forge: Generates <token>
Forge--)Device: Publishes command to establish connection with <token>
Device--)Forge: WS Connect /api/v1/devices/:id/editor/comms/:token

Forge->>-FrontEnd: Returns session identifier
FrontEnd->>FrontEnd: Opens /device/<id>/editor/
FrontEnd-->+Forge: Sends requests to /device/<id>/editor/**
Forge--)+Device: Request proxied over WebSocket
Device-->>Editor: Performs request on local Node-RED
Editor-->>Device: Returns response
Device-->>-Forge: Streams response back
Forge-->>-FrontEnd: Streams response back
User->>FrontEnd: User navigates away
FrontEnd-->Forge: Node-RED WebSocket closes
Note over Forge: if no active WebSockets for this device
Forge--)Device: Close WebSocket
```


109 changes: 73 additions & 36 deletions forge/ee/lib/deviceEditor/DeviceTunnelManager.js
Expand Up @@ -10,8 +10,9 @@
/**
* A DeviceTunnel object keeps track of connections to a device.
* @typedef {Object} DeviceTunnel
* @property {numner} id - A unique identifier for this tunnel instance
* @property {string} deviceId - Device ID
* @property {number} counter - Number of active connections
* @property {number} nextRequestId - Next available request id
* @property {SocketStream} socket - Socket connection to device
* @property {Array<FastifyRequest>} requests - List of pending requests
* @property {Object} forwardedWS - List of forwarded websocket connections
Expand Down Expand Up @@ -46,6 +47,12 @@ class DeviceTunnelManager {
/** @type {ForgeApplication} Forge application (Fastify app) */
this.app = app
this.#tunnels = new Map()

this.app.addHook('onClose', async (_) => {
Object.keys(this.#tunnels).forEach(deviceId => {
this.closeTunnel(deviceId)
})
})
}

/**
Expand All @@ -58,7 +65,6 @@ class DeviceTunnelManager {
* @param {String} token Token to use for tunnel
* @see DeviceTunnelManager#initTunnel
* @see DeviceTunnelManager#closeTunnel
* @see DeviceTunnelManager#removeTunnel
*/
newTunnel (deviceId, token) {
const manager = this
Expand All @@ -67,7 +73,7 @@ class DeviceTunnelManager {
manager.closeTunnel(deviceId)

// create a new tunnel object & add to list
manager.#tunnels.set(deviceId, newTunnel(deviceId, token))
manager.#tunnels.set(deviceId, createNewTunnel(deviceId, token))

return !!manager.#getTunnel(deviceId)
}
Expand All @@ -93,36 +99,33 @@ class DeviceTunnelManager {
getTunnelStatus (deviceId) {
const exists = this.#tunnels.has(deviceId)
if (!exists) {
return null
return { enabled: false }
}
const url = this.getTunnelUrl(deviceId, true)
const url = this.getTunnelUrl(deviceId)
const enabled = this.isEnabled(deviceId)
const connected = this.isConnected(deviceId)
return { url, enabled, connected }
}

closeTunnel (deviceId) {
const tunnel = this.#getTunnel(deviceId)
if (tunnel?.socket) {
tunnel.socket.close()
tunnel.socket.removeAllListeners()
if (tunnel) {
tunnel.socket?.close()
// Close all of the editor websockets that were using this tunnel
Object.keys(tunnel?.forwardedWS).forEach(reqId => {
const wsClient = tunnel.forwardedWS[reqId]
wsClient.close()
})
}
this.removeTunnel(deviceId)
}

removeTunnel (deviceId) {
if (this.#tunnels.has(deviceId)) {
return this.#tunnels.delete(deviceId)
}
}

getTunnelUrl (deviceId, includeToken = false) {
getTunnelUrl (deviceId) {
const tunnel = this.#getTunnel(deviceId)
if (tunnel) {
if (includeToken) {
return `/api/v1/devices/${deviceId}/editor/proxy/?access_token=${tunnel.token}`
}
return `/api/v1/devices/${deviceId}/editor/proxy/`
return `/api/v1/devices/${deviceId}/editor/proxy/?access_token=${tunnel.token}`
}
return ''
}
Expand Down Expand Up @@ -152,15 +155,13 @@ class DeviceTunnelManager {
return false
}

// ensure tunnel is not already open
// Close any existing tunnel
if (tunnel.socket) {
tunnel.socket.close()
tunnel.socket.removeAllListeners()
}

tunnel.socket = inboundDeviceConnection.socket

// handle messages from device
// Handle messages sent from the device
tunnel.socket.on('message', msg => {
const response = JSON.parse(msg.toString())
if (response.id === undefined) {
Expand All @@ -178,22 +179,47 @@ class DeviceTunnelManager {
reply.send()
}
} else if (response.ws) {
// Send message to device editor websocket
const wsSocket = tunnel.forwardedWS[response.id]
wsSocket.send(response.body)
if (wsSocket) {
if (response.closed) {
// The runtime has closed this session's websocket on the device
// Pass that back to the editor so it knows something is up
if (wsSocket) {
wsSocket.close()
}
delete tunnel.forwardedWS[response.id]
} else {
// Send message to device editor websocket
wsSocket.send(response.body)
}
} else {
// This is a message for a editor we don't know about.
// This can happen with Device Agent <= 1.9.4 if multiple
// editors were opened in a single session and then one
// of them is closed. Older Agents don't know to disconnect
// their local comms link for the closed editor, so continue
// sending messages to everyone who was ever connected
}
} else {
// TODO: remove/change temp debug
console.warn('device editor websocket message has no reply')
}
})

tunnel.socket.on('close', () => {
manager.removeTunnel(deviceId)
// The ws connection from the device has closed.
delete tunnel.socket

// Close all of the editor websockets
for (const [id, wsSocket] of Object.entries(tunnel.forwardedWS)) {
wsSocket.close()
delete tunnel.forwardedWS[id]
}
})

/** @type {httpHandler} */
tunnel._handleHTTPGet = (request, reply) => {
const id = tunnel.counter++
const id = tunnel.nextRequestId++
tunnel.requests[id] = reply
tunnel.socket.send(JSON.stringify({
id,
Expand All @@ -208,7 +234,7 @@ class DeviceTunnelManager {
tunnel._handleHTTPGet(request, reply)
return
}
const requestId = tunnel.counter++
const requestId = tunnel.nextRequestId++
tunnel.requests[requestId] = reply
tunnel.socket.send(JSON.stringify({
id: requestId,
Expand All @@ -220,7 +246,8 @@ class DeviceTunnelManager {
}

tunnel._handleWS = (connection, request) => {
const requestId = tunnel.counter++
// A new editor websocket is connecting
const requestId = tunnel.nextRequestId++
tunnel.socket.send(JSON.stringify({
id: requestId,
ws: true,
Expand All @@ -230,20 +257,28 @@ class DeviceTunnelManager {
const wsToDevice = connection.socket
tunnel.forwardedWS[requestId] = wsToDevice

// forward messages from device to client
wsToDevice.on('message', msg => {
// Forward messages sent by the editor down to the device
// console.log(`[${tunnel.id}] [${requestId}] E>R`, msg.toString())
tunnel.socket.send(JSON.stringify({
id: requestId,
ws: true,
body: msg.toString()
}))
})

connection.on('close', () => {
if (tunnel.forwardedWS[requestId]) {
tunnel.forwardedWS[requestId].close()
wsToDevice.on('close', msg => {
// The editor has closed its websocket. Send notification to the
// device so it can close its corresponing connection
// console.log(`[${tunnel.id}] [${requestId}] E>R closed`)
if (tunnel.forwardedWS[requestId] && tunnel.socket) {
// console.log(`[${tunnel.id}] [${requestId}] E>R closed - notifying the device`)
tunnel.socket.send(JSON.stringify({
id: requestId,
ws: true,
closed: true
}))
delete tunnel.forwardedWS[requestId]
}
delete tunnel.forwardedWS[requestId]
})
}
return true
Expand Down Expand Up @@ -304,19 +339,21 @@ class DeviceTunnelManager {
}
}

let tunnelCounter = 0
/**
* Create new tunnel
* @param {String} deviceId Device ID
* @param {String} token Editor access token
* @returns {DeviceTunnel}
* @memberof DeviceTunnelManager
* @static
* @method newTunnel
* @method createNewTunnel
*/
function newTunnel (deviceId, token) {
function createNewTunnel (deviceId, token) {
const tunnel = {
id: ++tunnelCounter,
deviceId,
counter: 0,
nextRequestId: 1,
socket: null,
requests: {},
forwardedWS: {},
Expand Down