Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Update to aws-iot-device-sdk-v2 #183

Draft
wants to merge 4 commits into
base: saga
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
269 changes: 165 additions & 104 deletions cli/device/connect.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { promises as fs } from 'fs'
import { thingShadow } from 'aws-iot-device-sdk'
import { mqtt, io, iot } from 'aws-crt'
import { deviceFileLocations } from '../jitp/deviceFileLocations'
import { iotshadow } from 'aws-iot-device-sdk-v2'
import * as chalk from 'chalk'
import { uiServer, WebSocketConnection } from '@bifravst/device-ui-server'
import { isNotNullOrUndefined } from '../../util/isNullOrUndefined'
import { WebSocketConnection, uiServer } from '@bifravst/device-ui-server'

const defaultConfig = {
act: false, // Whether to enable the active mode
Expand All @@ -14,6 +16,15 @@ const defaultConfig = {
acct: 1, // Accelerometer threshold: minimal absolute value for and accelerometer reading to be considered movement.
} as const

const labeledTimer = (label: string): (() => void) => {
console.log(chalk.yellow.dim(`${label}...`))
const doneLabel = chalk.green(`${label} ✔ `)
console.time(doneLabel)
return () => {
console.timeEnd(doneLabel)
}
}

/**
* Connect to the AWS IoT broker using a generated device certificate
*/
Expand All @@ -32,41 +43,16 @@ export const connect = async ({
caCert: string
version: string
}): Promise<void> => {
io.enable_logging(io.LogLevel.ERROR)
// Certificate check
const deviceFiles = deviceFileLocations({ certsDir, deviceId })
let cfg = defaultConfig
const devRoam = {
dev: {
v: {
band: 666,
nw: 'LAN',
modV: 'device-simulator',
brdV: 'device-simulator',
appV: version,
iccid: '12345678901234567890',
},
ts: Date.now(),
},
roam: {
v: {
rsrp: 70,
area: 30401,
mccmnc: 24201,
cell: 16964098,
ip: '0.0.0.0',
},
ts: Date.now(),
},
}

console.log(chalk.blue('Device ID: '), chalk.yellow(deviceId))
console.log(chalk.blue('endpoint: '), chalk.yellow(endpoint))
console.log(chalk.blue('deviceUiUrl: '), chalk.yellow(deviceUiUrl))
console.log(chalk.blue('CA cert: '), chalk.yellow(caCert))
console.log(chalk.blue('Private key: '), chalk.yellow(deviceFiles.key))
console.log(chalk.blue('Certificate: '), chalk.yellow(deviceFiles.certWithCA))

const certFiles = [deviceFiles.certWithCA, deviceFiles.key, caCert]

try {
await Promise.all(
certFiles.map(async (f) => {
Expand All @@ -86,102 +72,177 @@ export const connect = async ({
process.exit(1)
}

console.time(chalk.green(chalk.inverse(' connected ')))
const config_builder = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder_from_path(
deviceFiles.certWithCA,
deviceFiles.key,
)
config_builder.with_certificate_authority_from_path(undefined, caCert)
config_builder.with_clean_session(true)
config_builder.with_client_id(deviceId)
config_builder.with_endpoint(endpoint)

const config = config_builder.build()
const client = new mqtt.MqttClient(new io.ClientBootstrap())
const connection = client.new_connection(config)
const shadow = new iotshadow.IotShadowClient(connection)
connection.on('disconnect', () => {
console.error(chalk.red(' disconnected! '))
})
connection.on('resume', () => {
console.log(chalk.magenta('reconnecting...'))
})

const connectingTimer = labeledTimer('connecting')
setInterval(() => {
// NOTE! This is needed so the underlying AWS MQTT library does not crash
}, 60000)

const note = chalk.magenta(
`Still connecting ... First connect takes around 30 seconds`,
)
console.time(note)
const connectingNote = setInterval(() => {
console.timeLog(note)
}, 5000)

const connection = new thingShadow({
privateKey: deviceFiles.key,
clientCert: deviceFiles.certWithCA,
caCert,
clientId: deviceId,
host: endpoint,
region: endpoint.split('.')[2],
})
let connected = false
while (!connected) {
try {
await connection.connect()
connected = true
} catch {
await new Promise((resolve) => setTimeout(resolve, 5000))
console.timeLog(note)
}
}
connectingTimer()

// UI Server
let wsConnection: WebSocketConnection

connection.on('connect', async () => {
console.timeEnd(chalk.green(chalk.inverse(' connected ')))
clearInterval(connectingNote)

connection.register(deviceId, {}, async () => {
await uiServer({
deviceUiUrl,
deviceId: deviceId,
onUpdate: (update) => {
console.log(chalk.magenta('<'), chalk.cyan(JSON.stringify(update)))
connection.update(deviceId, { state: { reported: update } })
},
onMessage: (message) => {
console.log(chalk.magenta('<'), chalk.cyan(JSON.stringify(message)))
connection.publish(`${deviceId}/messages`, JSON.stringify(message))
const startingUiServerTimer = labeledTimer('Starting UI server')
await uiServer({
deviceUiUrl,
deviceId: deviceId,
onUpdate: async (update) => {
console.log(chalk.magenta('<'), chalk.cyan(JSON.stringify(update)))
await shadow.publishUpdateShadow(
{
thingName: deviceId,
state: { reported: update },
},
onWsConnection: (c) => {
console.log(chalk.magenta('[ws]'), chalk.cyan('connected'))
wsConnection = c
connection.get(deviceId)
},
})
console.log(
chalk.magenta('>'),
chalk.cyan(
JSON.stringify({ state: { reported: { cfg, ...devRoam } } }),
),
mqtt.QoS.AtLeastOnce,
)
connection.update(deviceId, { state: { reported: { cfg, ...devRoam } } })
})

connection.on('close', () => {
console.error(chalk.red(chalk.inverse(' disconnected! ')))
})
},
onMessage: async (message) => {
console.log(chalk.magenta('<'), chalk.cyan(JSON.stringify(message)))
await connection.publish(
`${deviceId}/messages`,
message,
mqtt.QoS.AtLeastOnce,
)
},
onWsConnection: async (c) => {
console.log(chalk.magenta('[ws]'), chalk.cyan('connected'))
wsConnection = c
// Fetch current config
await shadow.publishGetShadow(
{ thingName: deviceId },
mqtt.QoS.AtLeastOnce,
)
},
})
startingUiServerTimer()

connection.on('reconnect', () => {
console.log(chalk.magenta('reconnecting...'))
})
// Configuration management
let cfg = defaultConfig
const devRoam = {
dev: {
v: {
band: 666,
nw: 'LAN',
modV: 'device-simulator',
brdV: 'device-simulator',
appV: version,
iccid: '12345678901234567890',
},
ts: Date.now(),
},
roam: {
v: {
rsrp: 70,
area: 30401,
mccmnc: 24201,
cell: 16964098,
ip: '0.0.0.0',
},
ts: Date.now(),
},
}

connection.on('status', (_, stat, __, stateObject) => {
console.log(chalk.magenta('>'), chalk.cyan(stat))
console.log(chalk.magenta('>'), chalk.cyan(JSON.stringify(stateObject)))
if (stat === 'accepted') {
if (wsConnection !== undefined) {
cfg = {
...cfg,
...stateObject.desired.cfg,
}
console.log(chalk.magenta('[ws>'), JSON.stringify(cfg))
wsConnection.send(JSON.stringify(cfg))
const acceptedTimer = labeledTimer('Subscribing to shadow get accepted')
await shadow.subscribeToGetShadowAccepted(
{ thingName: deviceId },
mqtt.QoS.AtLeastOnce,
async (error, shadow) => {
if (isNotNullOrUndefined(error)) {
console.error(
chalk.red(`Failed to receive shadow get accepted for ${deviceId}`),
chalk.redBright(error?.message),
)
return
}
console.log(chalk.magenta('>'), chalk.cyan(JSON.stringify(shadow)))
if (wsConnection !== undefined) {
cfg = {
...cfg,
...(shadow?.state?.desired as Record<string, any> | undefined)?.cfg,
}
console.log(chalk.magenta('[ws>'), JSON.stringify(cfg))
wsConnection.send(JSON.stringify(cfg))
}
})
},
)
acceptedTimer()

connection.on('delta', (_, stateObject) => {
const deltaTimer = labeledTimer('Subscribing to shadow delta')
await shadow.subscribeToShadowDeltaUpdatedEvents(
{
thingName: deviceId,
},
mqtt.QoS.AtLeastOnce,
(error, stateObject) => {
if (isNotNullOrUndefined(error)) {
console.error(
chalk.red(`Failed to receive shadow delta for ${deviceId}`),
chalk.redBright(error?.message),
)
return
}
console.log(chalk.magenta('<'), chalk.cyan(JSON.stringify(stateObject)))
cfg = {
...cfg,
...stateObject.state.cfg,
...(stateObject?.state as Record<string, any> | undefined)?.cfg,
}
if (wsConnection !== undefined) {
console.log(chalk.magenta('[ws>'), JSON.stringify(cfg))
wsConnection.send(JSON.stringify(cfg))
}
console.log(
chalk.magenta('>'),
chalk.cyan(JSON.stringify({ state: { reported: { cfg } } })),
)
connection.update(deviceId, { state: { reported: { cfg } } })
})
},
)
deltaTimer()

connection.on('timeout', (thingName, clientToken) => {
console.log(
'received timeout on ' + thingName + ' with token: ' + clientToken,
)
})
})
const currentConfigTimer = labeledTimer('Sending current config')
console.log(
chalk.magenta('>'),
chalk.cyan(JSON.stringify({ state: { reported: { cfg, ...devRoam } } })),
)
await shadow.publishUpdateShadow(
{ thingName: deviceId, state: { reported: { cfg, ...devRoam } } },
mqtt.QoS.AtLeastOnce,
)
currentConfigTimer()
const getShadowTimer = labeledTimer('Getting shadow')
await shadow.publishGetShadow(
{
thingName: deviceId,
},
mqtt.QoS.AtLeastOnce,
)
getShadowTimer()
}