Skip to content

Commit

Permalink
fix(memfault): only pull if connections are active
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Mar 14, 2024
1 parent 4a2493a commit 4e25c1d
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 31 deletions.
8 changes: 7 additions & 1 deletion cdk/resources/Memfault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { IPrincipal } from 'aws-cdk-lib/aws-iam/index.js'
import { Construct } from 'constructs'
import type { PackedLambda } from '../backend.js'
import { LambdaLogGroup } from './LambdaLogGroup.js'
import type { WebsocketAPI } from './WebsocketAPI.js'

/**
* Pull Memfault data for devices
Expand All @@ -24,12 +25,14 @@ export class Memfault extends Construct {
lambdaSources,
baseLayer,
assetTrackerStackName,
websocketAPI,
}: {
lambdaSources: {
memfault: PackedLambda
}
baseLayer: Lambda.ILayerVersion
assetTrackerStackName: string
websocketAPI: WebsocketAPI
},
) {
super(parent, 'Memfault')
Expand Down Expand Up @@ -61,14 +64,16 @@ export class Memfault extends Construct {
timeout: Duration.seconds(60),
memorySize: 1792,
code: Lambda.Code.fromAsset(lambdaSources.memfault.lambdaZipFile),
description: 'Pull Memfault data for devices and put them in the shadow',
description: 'Pull Memfault data for devices and publish it on S3',
layers: [baseLayer],
environment: {
VERSION: this.node.tryGetContext('version'),

Check warning on line 70 in cdk/resources/Memfault.ts

View workflow job for this annotation

GitHub Actions / tests

Unsafe assignment of an `any` value
STACK_NAME: Stack.of(this).stackName,
ASSET_TRACKER_STACK_NAME: assetTrackerStackName,
NODE_NO_WARNINGS: '1',
BUCKET: this.bucket.bucketName,
WEBSOCKET_CONNECTIONS_TABLE_NAME:
websocketAPI.connectionsTable.tableName,
},
initialPolicy: [
new IAM.PolicyStatement({
Expand All @@ -86,6 +91,7 @@ export class Memfault extends Construct {
})

this.bucket.grantWrite(fn)
websocketAPI.connectionsTable.grantReadData(fn)

const rule = new Events.Rule(this, 'Rule', {
schedule: Events.Schedule.expression('rate(5 minutes)'),
Expand Down
4 changes: 2 additions & 2 deletions cdk/resources/WebsocketAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { LambdaLogGroup } from './LambdaLogGroup.js'

export class WebsocketAPI extends Construct {
public readonly websocketURI: string
public readonly connectionsTable: DynamoDB.ITable
public readonly connectionsTable: DynamoDB.Table
public readonly websocketAPIArn: string
public readonly websocketManagementAPIURL: string
public constructor(
Expand Down Expand Up @@ -43,7 +43,7 @@ export class WebsocketAPI extends Construct {
},
timeToLiveAttribute: 'ttl',
removalPolicy: RemovalPolicy.DESTROY,
}) as DynamoDB.ITable
})

// API
const api = new ApiGateway.CfnApi(this, 'api', {
Expand Down
1 change: 1 addition & 0 deletions cdk/stacks/BackendStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export class BackendStack extends Stack {
assetTrackerStackName,
baseLayer,
lambdaSources,
websocketAPI: api,
})

// Outputs
Expand Down
17 changes: 16 additions & 1 deletion lambda/memfault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ import { IoTClient, ListThingsInThingGroupCommand } from '@aws-sdk/client-iot'
import { GetParametersByPathCommand, SSMClient } from '@aws-sdk/client-ssm'
import { fromEnv } from '@nordicsemiconductor/from-env'
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import { getActiveConnections } from './notifyClients.js'
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'

const ssm = new SSMClient({})
const iot = new IoTClient({})
const s3 = new S3Client({})
const db = new DynamoDBClient({})

const { stackName, nrfAssetTrackerStackName, bucket } = fromEnv({
const {
stackName,
nrfAssetTrackerStackName,
bucket,
websocketConnectionsTableName,
} = fromEnv({
stackName: 'STACK_NAME',
nrfAssetTrackerStackName: 'ASSET_TRACKER_STACK_NAME',
bucket: 'BUCKET',
websocketConnectionsTableName: 'WEBSOCKET_CONNECTIONS_TABLE_NAME',
})(process.env)

const Prefix = `/${stackName}/memfault/`
Expand All @@ -37,6 +46,8 @@ if (
)
throw new Error(`Memfault settings not configured!`)

const getActive = getActiveConnections(db, websocketConnectionsTableName)

type Reboot = {
type: 'memfault'
mcu_reason_register: null
Expand Down Expand Up @@ -76,6 +87,10 @@ const api = {
* Pull data from Memfault about all devices
*/
export const handler = async (): Promise<void> => {
if ((await getActive()).length === 0) {
console.debug('No active connections.')
return
}
const { things } = await iot.send(
new ListThingsInThingGroupCommand({
thingGroupName: nrfAssetTrackerStackName,
Expand Down
45 changes: 30 additions & 15 deletions lambda/notifyClients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const notifyClients = (
dropMessage = false,
): ((event: Event) => Promise<void>) => {
const send = sendEvent(apiGwManagementClient)
const getActive = getActiveConnections(db, connectionsTableName)
return async (event: Event): Promise<void> => {
console.log(
JSON.stringify({
Expand All @@ -87,10 +88,7 @@ export const notifyClients = (
console.debug(`Dropped message`)
return
}
const connectionIds: string[] = await getActiveConnections(
db,
connectionsTableName,
)
const connectionIds: string[] = await getActive()

for (const connectionId of connectionIds) {
try {
Expand Down Expand Up @@ -151,18 +149,35 @@ const getEventContext = (event: Event): URL | null => {
return null
}

export const getActiveConnections = async (
export const getActiveConnections = (
db: DynamoDBClient,
connectionsTableName: string,
): Promise<string[]> => {
const res = await db.send(
new ScanCommand({
TableName: connectionsTableName,
}),
)
): (() => Promise<Array<string>>) => {
let lastResult: {
connectionIds: string[]
ts: number
}
return async (): Promise<string[]> => {
// Cache for 60 seconds
if (lastResult !== undefined && lastResult.ts > Date.now() - 60 * 1000) {
return lastResult.connectionIds
}

const res = await db.send(
new ScanCommand({
TableName: connectionsTableName,
}),
)

const connectionIds: string[] = res?.Items?.map(
({ connectionId }) => connectionId?.S,
).filter((connectionId) => connectionId !== undefined) as string[]

const connectionIds: string[] = res?.Items?.map(
({ connectionId }) => connectionId?.S,
).filter((connectionId) => connectionId !== undefined) as string[]
return connectionIds
lastResult = {
connectionIds,
ts: Date.now(),
}

return connectionIds
}
}
7 changes: 3 additions & 4 deletions lambda/onNewNetworkSurvey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ const notifier = withDeviceAlias(iot)(
}),
)

const getActive = getActiveConnections(db, connectionsTableName)

export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
console.log(JSON.stringify({ event, networkGeolocationApiUrl }))

const connectionIds: string[] = await getActiveConnections(
db,
connectionsTableName,
)
const connectionIds: string[] = await getActive()
if (connectionIds.length === 0) {
console.log(`No clients to notify.`)
return
Expand Down
7 changes: 3 additions & 4 deletions lambda/publishSummaries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ const [historicaldataDatabaseName, historicaldataTableName] =

const timestream = new TimestreamQueryClient({})

const getActive = getActiveConnections(db, connectionsTableName)

export const handler = async (): Promise<void> => {
const connectionIds: string[] = await getActiveConnections(
db,
connectionsTableName,
)
const connectionIds: string[] = await getActive()
if (connectionIds.length === 0) {
console.log(`No clients to notify.`)
return
Expand Down
7 changes: 3 additions & 4 deletions lambda/resolveCellLocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const notifier = withDeviceAlias(iot)(
}),
)

const getActive = getActiveConnections(db, connectionsTableName)

export const handler = async (event: {
roam: {
v: {
Expand All @@ -42,10 +44,7 @@ export const handler = async (event: {
}): Promise<void> => {
console.log(JSON.stringify({ event, geolocationApiUrl }))

const connectionIds: string[] = await getActiveConnections(
db,
connectionsTableName,
)
const connectionIds: string[] = await getActive()
if (connectionIds.length === 0) {
console.log(`No clients to notify.`)
return
Expand Down

0 comments on commit 4e25c1d

Please sign in to comment.