diff --git a/cdk/BackendLambdas.d.ts b/cdk/BackendLambdas.d.ts index b969976..a9ddd1a 100644 --- a/cdk/BackendLambdas.d.ts +++ b/cdk/BackendLambdas.d.ts @@ -15,4 +15,5 @@ type BackendLambdas = { updatesToLwM2M: PackedLambda publishLwM2MShadowsToJSON: PackedLambda memfault: PackedLambda + memfaultPollForReboots: PackedLambda } diff --git a/cdk/backend.ts b/cdk/backend.ts index 26080ca..726f0ab 100644 --- a/cdk/backend.ts +++ b/cdk/backend.ts @@ -15,6 +15,7 @@ const packagesInLayer: string[] = [ 'jsonata', 'mqtt', '@protobuf-ts/runtime', + 'p-retry', ] const pack = async ( id: string, @@ -53,6 +54,7 @@ new BackendApp({ nrplusGatewayScan: await pack('nrplusGatewayScan'), updatesToLwM2M: await pack('updatesToLwM2M'), memfault: await pack('memfault'), + memfaultPollForReboots: await pack('memfaultPollForReboots'), // For hello.nrfcloud.com/map publishLwM2MShadowsToJSON: await pack('publishLwM2MShadowsToJSON'), }, diff --git a/cdk/resources/Memfault.ts b/cdk/resources/Memfault.ts index 8c8a062..a861f0a 100644 --- a/cdk/resources/Memfault.ts +++ b/cdk/resources/Memfault.ts @@ -5,6 +5,7 @@ import { aws_s3 as S3, aws_iam as IAM, aws_lambda as Lambda, + aws_iot as IoT, Stack, RemovalPolicy, } from 'aws-cdk-lib' @@ -29,6 +30,7 @@ export class Memfault extends Construct { }: { lambdaSources: { memfault: PackedLambda + memfaultPollForReboots: PackedLambda } baseLayer: Lambda.ILayerVersion assetTrackerStackName: string @@ -72,8 +74,7 @@ export class Memfault extends Construct { ASSET_TRACKER_STACK_NAME: assetTrackerStackName, NODE_NO_WARNINGS: '1', BUCKET: this.bucket.bucketName, - WEBSOCKET_CONNECTIONS_TABLE_NAME: - websocketAPI.connectionsTable.tableName, + CONNECTIONS_TABLE_NAME: websocketAPI.connectionsTable.tableName, }, initialPolicy: [ new IAM.PolicyStatement({ @@ -104,5 +105,98 @@ export class Memfault extends Construct { principal: new IAM.ServicePrincipal('events.amazonaws.com') as IPrincipal, sourceArn: rule.ruleArn, }) + + // When a devices publishes button press 42, poll the Memfault API for an update + const pollForRebootsFn = new Lambda.Function(this, 'pollForRebootsFn', { + handler: lambdaSources.memfaultPollForReboots.handler, + architecture: Lambda.Architecture.ARM_64, + runtime: Lambda.Runtime.NODEJS_20_X, + timeout: Duration.seconds(120), + memorySize: 1792, + code: Lambda.Code.fromAsset( + lambdaSources.memfaultPollForReboots.lambdaZipFile, + ), + description: + 'Poll the Memfault API for an update after a device publishes a button event for button 42', + layers: [baseLayer], + environment: { + VERSION: this.node.tryGetContext('version'), + STACK_NAME: Stack.of(this).stackName, + ASSET_TRACKER_STACK_NAME: assetTrackerStackName, + NODE_NO_WARNINGS: '1', + BUCKET: this.bucket.bucketName, + CONNECTIONS_TABLE_NAME: websocketAPI.connectionsTable.tableName, + WEBSOCKET_MANAGEMENT_API_URL: websocketAPI.websocketManagementAPIURL, + }, + initialPolicy: [ + new IAM.PolicyStatement({ + actions: ['ssm:GetParametersByPath'], + resources: [ + `arn:aws:ssm:${Stack.of(this).region}:${Stack.of(this).account}:parameter/${Stack.of(this).stackName}/memfault/*`, + ], + }), + new IAM.PolicyStatement({ + actions: ['execute-api:ManageConnections'], + resources: [websocketAPI.websocketAPIArn], + }), + new IAM.PolicyStatement({ + actions: ['iot:DescribeThing'], + resources: ['*'], + }), + ], + ...new LambdaLogGroup(this, 'pollForRebootsFnLogs'), + }) + + websocketAPI.connectionsTable.grantFullAccess(pollForRebootsFn) + + const button42RuleRole = new IAM.Role(this, 'button42RuleRole', { + assumedBy: new IAM.ServicePrincipal('iot.amazonaws.com'), + inlinePolicies: { + rootPermissions: new IAM.PolicyDocument({ + statements: [ + new IAM.PolicyStatement({ + actions: ['iot:Publish'], + resources: [ + `arn:aws:iot:${Stack.of(this).region}:${Stack.of(this).account}:topic/errors`, + ], + }), + ], + }), + }, + }) + + const button42Rule = new IoT.CfnTopicRule(this, 'button42Rule', { + topicRulePayload: { + awsIotSqlVersion: '2016-03-23', + description: + 'Trigger a fetch of the Memfault data when a device publishes a button event for button 42', + ruleDisabled: false, + sql: [ + 'SELECT topic(1) as deviceId,', + 'btn.ts as ts,', + `parse_time("yyyy-MM-dd'T'HH:mm:ss.S'Z'", timestamp()) as timestamp`, + "FROM '+/messages'", + 'WHERE btn.v = 42', + ].join(' '), + actions: [ + { + lambda: { + functionArn: pollForRebootsFn.functionArn, + }, + }, + ], + errorAction: { + republish: { + roleArn: button42RuleRole.roleArn, + topic: 'errors', + }, + }, + }, + }) + + pollForRebootsFn.addPermission('storeMessagesRule', { + principal: new IAM.ServicePrincipal('iot.amazonaws.com'), + sourceArn: button42Rule.attrArn, + }) } } diff --git a/lambda/memfault.ts b/lambda/memfault.ts index 2fa7d0a..d475e69 100644 --- a/lambda/memfault.ts +++ b/lambda/memfault.ts @@ -1,88 +1,28 @@ +import { DynamoDBClient } from '@aws-sdk/client-dynamodb' import { IoTClient } from '@aws-sdk/client-iot' -import { GetParametersByPathCommand, SSMClient } from '@aws-sdk/client-ssm' +import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3' +import { SSMClient } from '@aws-sdk/client-ssm' import { fromEnv } from '@nordicsemiconductor/from-env' -import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3' -import { getActiveConnections } from './notifyClients.js' -import { DynamoDBClient } from '@aws-sdk/client-dynamodb' import { listThingsInGroup } from './listThingsInGroup.js' +import { createAPIClient, type Reboot } from './memfault/api.js' +import { getActiveConnections } from './notifyClients.js' -const ssm = new SSMClient({}) +export const ssm = new SSMClient({}) const iot = new IoTClient({}) const s3 = new S3Client({}) const db = new DynamoDBClient({}) -const { - stackName, - nrfAssetTrackerStackName, - bucket, - websocketConnectionsTableName, -} = fromEnv({ - stackName: 'STACK_NAME', - nrfAssetTrackerStackName: 'ASSET_TRACKER_STACK_NAME', - bucket: 'BUCKET', - websocketConnectionsTableName: 'WEBSOCKET_CONNECTIONS_TABLE_NAME', -})(process.env) - -const Prefix = `/${stackName}/memfault/` -const { organizationAuthToken, organizationId, projectId } = ( - ( - await ssm.send( - new GetParametersByPathCommand({ - Path: Prefix, - }), - ) - )?.Parameters ?? [] -).reduce( - (params, p) => ({ - ...params, - [(p.Name ?? '').replace(Prefix, '')]: p.Value ?? '', - }), - {} as Record, -) - -if ( - organizationAuthToken === undefined || - organizationId === undefined || - projectId === undefined -) - throw new Error(`Memfault settings not configured!`) - -const getActive = getActiveConnections(db, websocketConnectionsTableName) +const { stackName, nrfAssetTrackerStackName, bucket, connectionsTableName } = + fromEnv({ + stackName: 'STACK_NAME', + nrfAssetTrackerStackName: 'ASSET_TRACKER_STACK_NAME', + bucket: 'BUCKET', + connectionsTableName: 'CONNECTIONS_TABLE_NAME', + })(process.env) -type Reboot = { - type: 'memfault' - mcu_reason_register: null - time: string // e.g. '2024-03-14T07:26:37.270000+00:00' - reason: number // e.g. 7 - software_version: { - version: string // e.g. '1.11.1+thingy91.low-power.memfault' - id: number // e.g.504765 - software_type: { - id: number //e.g. 32069; - name: string // e.g. 'thingy_world' - } - archived: boolean - } -} +const getActive = getActiveConnections(db, connectionsTableName) -const api = { - getLastReboots: async (deviceId: string): Promise> => { - const res = await fetch( - `https://api.memfault.com/api/v0/organizations/${organizationId}/projects/${projectId}/devices/${deviceId}/reboots?${new URLSearchParams( - { - since: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(), - }, - ).toString()}`, - { - headers: new Headers({ - Authorization: `Basic ${Buffer.from(`:${organizationAuthToken}`).toString('base64')}`, - }), - }, - ) - if (!res.ok) return null - return (await res.json()).data - }, -} +const api = await createAPIClient(ssm, stackName) const listThings = listThingsInGroup(iot) diff --git a/lambda/memfault/api.ts b/lambda/memfault/api.ts new file mode 100644 index 0000000..ee67157 --- /dev/null +++ b/lambda/memfault/api.ts @@ -0,0 +1,68 @@ +import { GetParametersByPathCommand, SSMClient } from '@aws-sdk/client-ssm' + +export const createAPIClient = async ( + ssm: SSMClient, + stackName: string, +): Promise<{ + getLastReboots: (deviceId: string) => Promise> +}> => { + const Prefix = `/${stackName}/memfault/` + + const { organizationAuthToken, organizationId, projectId } = ( + ( + await ssm.send( + new GetParametersByPathCommand({ + Path: Prefix, + }), + ) + )?.Parameters ?? [] + ).reduce( + (params, p) => ({ + ...params, + [(p.Name ?? '').replace(Prefix, '')]: p.Value ?? '', + }), + {} as Record, + ) + + if ( + organizationAuthToken === undefined || + organizationId === undefined || + projectId === undefined + ) + throw new Error(`Memfault settings not configured!`) + + return { + getLastReboots: async (deviceId) => { + const res = await fetch( + `https://api.memfault.com/api/v0/organizations/${organizationId}/projects/${projectId}/devices/${deviceId}/reboots?${new URLSearchParams( + { + since: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(), + }, + ).toString()}`, + { + headers: new Headers({ + Authorization: `Basic ${Buffer.from(`:${organizationAuthToken}`).toString('base64')}`, + }), + }, + ) + if (!res.ok) return null + return (await res.json()).data + }, + } +} + +export type Reboot = { + type: 'memfault' + mcu_reason_register: null + time: string // e.g. '2024-03-14T07:26:37.270000+00:00' + reason: number // e.g. 7 + software_version: { + version: string // e.g. '1.11.1+thingy91.low-power.memfault' + id: number // e.g.504765 + software_type: { + id: number //e.g. 32069; + name: string // e.g. 'thingy_world' + } + archived: boolean + } +} diff --git a/lambda/memfaultPollForReboots.ts b/lambda/memfaultPollForReboots.ts new file mode 100644 index 0000000..349459d --- /dev/null +++ b/lambda/memfaultPollForReboots.ts @@ -0,0 +1,86 @@ +import { ApiGatewayManagementApi } from '@aws-sdk/client-apigatewaymanagementapi' +import { DynamoDBClient } from '@aws-sdk/client-dynamodb' +import { IoTClient } from '@aws-sdk/client-iot' +import { SSMClient } from '@aws-sdk/client-ssm' +import { fromEnv } from '@nordicsemiconductor/from-env' +import { createAPIClient } from './memfault/api.js' +import { getActiveConnections, notifyClients } from './notifyClients.js' +import { withDeviceAlias } from './withDeviceAlias.js' +import pRetry from 'p-retry' + +const ssm = new SSMClient({}) +const iot = new IoTClient({}) +const db = new DynamoDBClient({}) + +const { connectionsTableName, websocketManagementAPIURL, stackName } = fromEnv({ + connectionsTableName: 'CONNECTIONS_TABLE_NAME', + websocketManagementAPIURL: 'WEBSOCKET_MANAGEMENT_API_URL', + stackName: 'STACK_NAME', +})(process.env) + +export const apiGwManagementClient = new ApiGatewayManagementApi({ + endpoint: websocketManagementAPIURL, +}) + +const notifier = withDeviceAlias(iot)( + notifyClients({ + db, + connectionsTableName, + apiGwManagementClient, + }), +) + +const getActive = getActiveConnections(db, connectionsTableName) + +const api = await createAPIClient(ssm, stackName) + +export const handler = async ({ + deviceId, + timestamp, + ts, +}: { + deviceId: string + timestamp: string + ts: number +}): Promise => { + console.log( + JSON.stringify({ + event: { + deviceId, + ts, + timestamp, + }, + }), + ) + const connectionIds: string[] = await getActive() + if (connectionIds.length === 0) { + console.log(`No clients to notify.`) + return + } + + // Wait 15 seconds before polling + await new Promise((resolve) => setTimeout(resolve, 15 * 1000)) + await pRetry( + async () => { + const reboots = await api.getLastReboots(deviceId) + const reboot = reboots?.[0] + if (reboot === undefined) + throw new Error(`No reboots found for device ${deviceId}.`) + + if (new Date(reboot.time).getTime() < ts) { + console.debug(JSON.stringify(reboots)) + throw new Error(`Latest reboot is not newer.`) + } + console.debug(`new reboot`, JSON.stringify(reboot)) + await notifier({ + '@context': new URL('https://thingy.rocks/memfault-reboot'), + deviceId, + reboot, + }) + }, + { + retries: 7, + minTimeout: 15, + }, + ) +} diff --git a/lambda/notifyClients.ts b/lambda/notifyClients.ts index 2b785f5..e10df62 100644 --- a/lambda/notifyClients.ts +++ b/lambda/notifyClients.ts @@ -62,7 +62,15 @@ export type CellGeoLocationEvent = { } } -export type Event = DeviceEvent | CellGeoLocationEvent +export type MemfaultUpdateReceived = { + deviceId: string + type: 'reboot' +} + +export type Event = + | DeviceEvent + | CellGeoLocationEvent + | (Record & { '@context': URL }) export const notifyClients = ( { @@ -92,7 +100,8 @@ export const notifyClients = ( for (const connectionId of connectionIds) { try { - const context = getEventContext(event) + const context = + '@context' in event ? event['@context'] : getEventContext(event) if (context === null) throw new Error(`Unknown event: ${JSON.stringify(event)}`) await send(connectionId, event, context) diff --git a/package-lock.json b/package-lock.json index 3e4566f..e9df3a8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "jsonata": "2.0.4", "lodash-es": "4.17.21", "mqtt": "5.5.0", + "p-retry": "6.2.0", "p-throttle": "6.1.0", "protobufjs": "7.2.6" }, @@ -4920,6 +4921,11 @@ "@types/node": "*" } }, + "node_modules/@types/retry": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz", + "integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==" + }, "node_modules/@types/semver": { "version": "7.5.8", "resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.5.8.tgz", @@ -8399,6 +8405,17 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/is-network-error": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/is-network-error/-/is-network-error-1.1.0.tgz", + "integrity": "sha512-tUdRRAnhT+OtCZR/LxZelH/C7QtjtFrTu5tXCA8pl55eTUElUHT+GPYV8MBMBvea/j+NxQqVt3LbWMRir7Gx9g==", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-number": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", @@ -9224,6 +9241,22 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-retry": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-6.2.0.tgz", + "integrity": "sha512-JA6nkq6hKyWLLasXQXUrO4z8BUZGUt/LjlJxx8Gb2+2ntodU/SS63YZ8b0LUTbQ8ZB9iwOfhEPhg4ykKnn2KsA==", + "dependencies": { + "@types/retry": "0.12.2", + "is-network-error": "^1.0.0", + "retry": "^0.13.1" + }, + "engines": { + "node": ">=16.17" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-throttle": { "version": "6.1.0", "resolved": "https://registry.npmjs.org/p-throttle/-/p-throttle-6.1.0.tgz", @@ -9786,6 +9819,14 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "engines": { + "node": ">= 4" + } + }, "node_modules/reusify": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", diff --git a/package.json b/package.json index 28d2178..bdf7481 100644 --- a/package.json +++ b/package.json @@ -92,6 +92,7 @@ "jsonata": "2.0.4", "lodash-es": "4.17.21", "mqtt": "5.5.0", + "p-retry": "6.2.0", "p-throttle": "6.1.0", "protobufjs": "7.2.6" }