Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comms (mqtt) component to forge platform #706

Merged
merged 17 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions forge/comms/aclManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* This module provides functions to verify whether a broker ACL request
* is valid or not.
*
* It includes the core (CE) ACLs for basic launcher/device command/status messages.
*
* Other components (ie EE-specific features) can register their own additional ACLs
*/
module.exports = function (app) {
// Standard set of verify functions to ensure the request meets particular criteria
const verifyFunctions = {
checkTeamAndObjectIds: async function (requestParts, ids) {
// requestParts = [ _ , <teamid>, <projectid> ]
// ids = [ 'project', <teamid>, <projectid> ]
return requestParts[1] === ids[1] && requestParts[2] === ids[2]
},
checkTeamId: async function (requestParts, ids) {
// requestParts = [ _ , <teamid> ]
// ids = [ 'project', <teamid>, <projectid> ]
return requestParts[1] === ids[1]
},
checkDeviceAssignedToProject: async function (requestParts, ids) {
// requestParts = [ _ , <teamid>, <projectid> ]
// ids = [ 'device', <teamid>, <deviceid> ]

// Do the simple team id check
if (requestParts[1] !== ids[1]) {
return false
}
// Get the project this device is assigned to and check it matches
const assignedProject = await app.db.models.Device.getDeviceProjectId(ids[2])
return assignedProject && assignedProject === requestParts[2]
},
checkDeviceCanAccessProject: async function (requestParts, ids) {
// requestParts = [ _ , <teamid>, <projectid> ]
// ids = [ 'device', <teamid>, <deviceid> ]

// Do the simple team id check
if (requestParts[1] !== ids[1]) {
return false
}
// Get the project this device is assigned to
const assignedProject = await app.db.models.Device.getDeviceProjectId(ids[2])
if (!assignedProject) {
return false
}
if (assignedProject === requestParts[2]) {
// Access the project we're assigned to - all good
return true
}

// Need to check if this project is in the same team.
const projectTeamId = await app.db.models.Project.getProjectTeamId(requestParts[2])
return projectTeamId && app.db.models.Team.encodeHashid(projectTeamId) === requestParts[1]
}
}

const ACLS = {
forge_platform: {
sub: [
// Receive status events from project launchers
// - ff/v1/+/l/+/status
{ topic: /^ff\/v1\/[^/]+\/l\/[^/]+\/status$/ },
// Receive status events from devices
// - ff/v1/+/d/+/status
{ topic: /^ff\/v1\/[^/]+\/d\/[^/]+\/status$/ }
],
pub: [
// Send commands to project launchers
// - ff/v1/+/l/+/command
{ topic: /^ff\/v1\/[^/]+\/l\/[^/]+\/command$/ },
// Send commands to devices
// - ff/v1/+/d/+/command
{ topic: /^ff\/v1\/[^/]+\/d\/[^/]+\/command$/ },
// Send commands to all project-assigned devices
// - ff/v1/+/p/+/command
{ topic: /^ff\/v1\/[^/]+\/p\/[^/]+\/command$/ }
]
},
project: {
sub: [
// Receive commands from the platform
// - ff/v1/<team>/l/<project>/command
{ topic: /^ff\/v1\/([^/]+)\/l\/([^/]+)\/command$/, verify: 'checkTeamAndObjectIds' }
],
pub: [
// Send status to the platform
// - ff/v1/<team>/l/<project>/status
{ topic: /^ff\/v1\/([^/]+)\/l\/([^/]+)\/status$/, verify: 'checkTeamAndObjectIds' }
]
},
device: {
sub: [
// Receive commands from the platform
// - ff/v1/<team>/d/<device>/command
{ topic: /^ff\/v1\/([^/]+)\/d\/([^/]+)\/command$/, verify: 'checkTeamAndObjectIds' },
// Receive commands from the platform - broadcast
// - ff/v1/<team>/p/<project>/command
{ topic: /^ff\/v1\/([^/]+)\/p\/([^/]+)\/command$/, verify: 'checkDeviceAssignedToProject' }
],
pub: [
// Send status to the platform
// - ff/v1/<team>/d/<device>/status
{ topic: /^ff\/v1\/([^/]+)\/d\/([^/]+)\/status$/, verify: 'checkTeamAndObjectIds' }
]
}
}

return {
verify: async function (username, topic, accessLevel) {
// Three types of client
// - forge_platform
// - project:<teamid>:<projectid>
// - device:<teamid>:<deviceid>

let allowed = false
let aclList = []
// accessLevel 1=SUB 2=PUB 3=WRITE
// We do not distinguish between SUB & WRITE
const aclType = accessLevel === 2 ? 'pub' : 'sub'
// Pick the appropriate ACL list based on username/accessLevel
if (username === 'forge_platform') {
aclList = ACLS[username][aclType]
} else if (/^project:/.test(username)) {
aclList = ACLS.project[aclType]
} else if (/^device:/.test(username)) {
aclList = ACLS.device[aclType]
}
const l = aclList.length
for (let i = 0; i < l; i++) {
const m = aclList[i].topic.exec(topic)
if (m) {
if (aclList[i].verify && verifyFunctions[aclList[i].verify]) {
allowed = await verifyFunctions[aclList[i].verify](m, username.split(':'))
} else {
allowed = true
}
break
}
}
return allowed
},
addACL: function (scope, action, acl) {
ACLS[scope][action].push(acl)
}
}
}
63 changes: 63 additions & 0 deletions forge/comms/authRoutes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Broker authentication backend
*
* - /api/comms/auth/client - verify username/password
* - /api/comms/auth/acl - verify username is permitted to pub/sub to particular topic
*
*/

module.exports = async function (app) {
app.post('/client', {
schema: {
body: {
type: 'object',
required: ['clientid', 'password', 'username'],
properties: {
clientid: { type: 'string' },
password: { type: 'string' },
username: { type: 'string' }
}
}
}
}, async (request, response) => {
const isValid = await app.db.controllers.BrokerClient.authenticateCredentials(
request.body.username,
request.body.password
)
if (isValid) {
response.status(200).send()
} else {
response.status(401).send()
}
})

app.post('/acl', {
schema: {
body: {
type: 'object',
required: ['acc', 'clientid', 'topic', 'username'],
properties: {
clientid: { type: 'string' },
username: { type: 'string' },
topic: { type: 'string' },
acc: { type: 'number' }
}
}
}
}, async (request, response) => {
const allowed = await app.comms.aclManager.verify(request.body.username, request.body.topic, request.body.acc)
if (allowed) {
response.status(200).send()
} else {
response.status(401).send()
}
})

// app.get('/test', async (request, response) => {
// const project = await app.db.models.Project.byId('ac8e4995-cb47-42ec-a9a4-71c42366c7f3')
// const creds = await app.db.controllers.BrokerClient.createClientForProject(project)
// // const device = await app.db.models.Device.byId(1)
// // const creds = await app.db.controllers.BrokerClient.createClientForDevice(device)
// response.send(creds)
// })
}
74 changes: 74 additions & 0 deletions forge/comms/commsClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const mqtt = require('mqtt')
const EventEmitter = require('events')

/**
* MQTT Client wrapper. This connects to the platform broker and subscribes
* to the appropriate status topics.
*/
class CommsClient extends EventEmitter {
constructor (app) {
super()
this.app = app
}

async init () {
// To aid testing, we use a url of `:test:` to allow us to configure
// the platform with comms enabled, but no active MQTT connection
if (this.app.config.broker.url !== ':test:') {
const brokerConfig = {
clientId: 'forge_platform',
username: 'forge_platform',
password: await this.app.settings.get('commsToken'),
reconnectPeriod: 5000
}
this.client = mqtt.connect(this.app.config.broker.url, brokerConfig)
this.client.on('connect', () => {
this.app.log.info('Connected to comms broker')
})
this.client.on('reconnect', () => {
this.app.log.info('Reconnecting to comms broker')
})
this.client.on('error', (err) => {
this.app.log.info(`Connection error to comms broker: ${err.toString()}`)
})
this.client.on('message', (topic, message) => {
const topicParts = topic.split('/')
const ownerType = topicParts[3]
const ownerId = topicParts[4]
if (ownerType === 'p') {
this.emit('status/project', {
id: ownerId,
status: message.toString()
})
} else if (ownerType === 'd') {
this.emit('status/device', {
id: ownerId,
status: message.toString()
})
}
})
this.client.subscribe([
// Launcher status
'ff/v1/+/l/+/status',
// Device status
'ff/v1/+/d/+/status'
])
}
}

publish (topic, payload) {
if (this.client) {
this.client.publish(topic, payload)
}
}

async disconnect () {
if (this.client) {
this.client.end()
}
}
}

module.exports = {
CommsClient
}
93 changes: 93 additions & 0 deletions forge/comms/devices.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* This module provides the handler for device status events - as well as APIs
* for sending commands to devices.
*/

class DeviceCommsHandler {
constructor (app, client) {
this.app = app
this.client = client

// Listen for any incoming device status events
client.on('status/device', (status) => { this.handleStatus(status) })
}

async handleStatus (status) {
// Check it looks like a valid status message
if (status.id && status.status) {
const deviceId = status.id
const device = await this.app.db.models.Device.byId(deviceId)
if (!device) {
// TODO: log invalid device
return
}
try {
const payload = JSON.parse(status.status)
await this.app.db.controllers.Device.updateState(device, payload)

// If the status state===unknown, the device is waiting for confirmation
// it has the right details. Always response with an 'update' command in
// this scenario
let sendUpdateCommand = payload.state === 'unknown'

if (Object.hasOwn(payload, 'project') && payload.project !== (device.Project?.id || null)) {
// The Project is incorrect
sendUpdateCommand = true
}
if (Object.hasOwn(payload, 'snapshot') && payload.snapshot !== (device.targetSnapshot?.hashid || null)) {
// The Snapshot is incorrect
sendUpdateCommand = true
}
if (Object.hasOwn(payload, 'settings') && payload.settings !== (device.settingsHash || null)) {
// The Settings are incorrect
sendUpdateCommand = true
}

if (sendUpdateCommand) {
this.sendCommand(device.Team.hashid, deviceId, 'update', {
project: device.Project?.id || null,
snapshot: device.targetSnapshot?.hashid || null,
settings: device.settingsHash || null
})
}
} catch (err) {
// Not a JSON payload - ignore
}
}
}

/**
* Send a command to all devices assigned to a project using the broadcast
* topic.
* @param {String} teamId
* @param {String} projectId
* @param {String} command
* @param {Object} payload
*/
sendCommandToProjectDevices (teamId, projectId, command, payload) {
const topic = `ff/v1/${teamId}/p/${projectId}/command`
this.client.publish(topic, JSON.stringify({
command: command,
...payload
}))
}

/**
* Send a command to a specific device using its command topic.
* @param {String} teamId
* @param {String} deviceId
* @param {String} command
* @param {Object} payload
*/
sendCommand (teamId, deviceId, command, payload) {
const topic = `ff/v1/${teamId}/d/${deviceId}/command`
this.client.publish(topic, JSON.stringify({
command: command,
...payload
}))
}
}

module.exports = {
DeviceCommsHandler: (app, client) => new DeviceCommsHandler(app, client)
}
Loading