Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions cdk/resources/LastSeen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import {
aws_dynamodb as DynamoDB,
aws_iam as IAM,
aws_iot as IoT,
RemovalPolicy,
Stack,
} from 'aws-cdk-lib'
import { Construct } from 'constructs'

/**
* Record the timestamp when the device was last seen
*/
export class LastSeen extends Construct {
public readonly table: DynamoDB.ITable
public constructor(parent: Construct) {
super(parent, 'lastSeen')

this.table = new DynamoDB.Table(this, 'table', {
billingMode: DynamoDB.BillingMode.PAY_PER_REQUEST,
partitionKey: {
name: 'deviceId',
type: DynamoDB.AttributeType.STRING,
},
sortKey: {
name: 'source',
type: DynamoDB.AttributeType.STRING,
},
pointInTimeRecovery: true,
removalPolicy: RemovalPolicy.DESTROY,
})

const role = new IAM.Role(this, 'role', {
assumedBy: new IAM.ServicePrincipal(
'iot.amazonaws.com',
) as IAM.IPrincipal,
inlinePolicies: {
rootPermissions: new IAM.PolicyDocument({
statements: [
new IAM.PolicyStatement({
actions: ['iot:Publish'],
resources: [
`arn:aws:iot:${Stack.of(this).region}:${
Stack.of(this).account
}:topic/errors`,
],
}),
],
}),
},
})
this.table.grantWriteData(role)

new IoT.CfnTopicRule(this, 'rule', {
topicRulePayload: {
description: `Record the timestamp when a device last sent in messages`,
ruleDisabled: false,
awsIotSqlVersion: '2016-03-23',
sql: `
select
topic(4) as deviceId,
'deviceMessage' as source,
parse_time("yyyy-MM-dd'T'HH:mm:ss.S'Z'", ts) as lastSeen
from 'data/+/+/+/+'
where messageType = 'DATA'
`,
actions: [
{
dynamoDBv2: {
putItem: {
tableName: this.table.tableName,
},
roleArn: role.roleArn,
},
},
],
errorAction: {
republish: {
roleArn: role.roleArn,
topic: 'errors',
},
},
},
})
}
}
5 changes: 5 additions & 0 deletions cdk/resources/WebsocketAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { PackedLambda } from '../helpers/lambdas/packLambda'
import { ApiLogging } from './ApiLogging.js'
import type { DeviceStorage } from './DeviceStorage.js'
import { LambdaSource } from './LambdaSource.js'
import type { LastSeen } from './LastSeen'

export const integrationUri = (
parent: Construct,
Expand All @@ -38,6 +39,7 @@ export class WebsocketAPI extends Construct {
deviceStorage,
lambdaSources,
layers,
lastSeen,
}: {
deviceStorage: DeviceStorage
lambdaSources: {
Expand All @@ -48,6 +50,7 @@ export class WebsocketAPI extends Construct {
publishToWebsocketClients: PackedLambda
}
layers: Lambda.ILayerVersion[]
lastSeen: LastSeen
},
) {
super(parent, 'WebsocketAPI')
Expand Down Expand Up @@ -86,12 +89,14 @@ export class WebsocketAPI extends Construct {
LOG_LEVEL: this.node.tryGetContext('logLevel'),
NODE_NO_WARNINGS: '1',
DISABLE_METRICS: this.node.tryGetContext('isTest') === true ? '1' : '0',
LAST_SEEN_TABLE_NAME: lastSeen.table.tableName,
},
layers,
logRetention: Logs.RetentionDays.ONE_WEEK,
})
this.eventBus.grantPutEventsTo(onConnect)
this.connectionsTable.grantWriteData(onConnect)
lastSeen.table.grantReadData(onConnect)

// onMessage
const onMessage = new Lambda.Function(this, 'onMessage', {
Expand Down
4 changes: 4 additions & 0 deletions cdk/stacks/BackendStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from '../resources/Integration.js'
import { parameterStoreLayerARN } from '../resources/LambdaExtensionLayers.js'
import { LambdaSource } from '../resources/LambdaSource.js'
import { LastSeen } from '../resources/LastSeen.js'
import { WebsocketAPI } from '../resources/WebsocketAPI.js'
import { STACK_NAME } from './stackConfig.js'

Expand Down Expand Up @@ -98,10 +99,13 @@ export class BackendStack extends Stack {

const deviceStorage = new DeviceStorage(this)

const lastSeen = new LastSeen(this)

const websocketAPI = new WebsocketAPI(this, {
lambdaSources,
deviceStorage,
layers: lambdaLayers,
lastSeen,
})

new DeviceShadow(this, {
Expand Down
43 changes: 43 additions & 0 deletions features/LastSeen.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Last seen

> I should receive a timestamp when the device last sent in data to the cloud so
> I can determine if the device is active.

## Background

Given I have the fingerprint for a `PCA20035+solar` device in `fingerprint`

<!-- The device sends in data to the cloud -->

And I store `$floor($millis()/1000)*1000` into `ts`

And the device `${fingerprint:deviceId}` publishes this message to the topic
`m/d/${fingerprint:deviceId}/d2c`

```json
{
"appId": "SOLAR",
"messageType": "DATA",
"ts": ${ts},
"data": "3.123456"
}
```

## Retrieve last seen timestamp on connect

Given I store `$fromMillis(${ts})` into `tsISO`

When I connect to the websocket using fingerprint `${fingerprint}`

<!-- @retry:tries=5,initialDelay=1000,delayFactor=2 -->

Soon I should receive a message on the websocket that matches

```json
{
"@context": "https://github.com/hello-nrfcloud/proto/deviceIdentity",
"id": "${fingerprint:deviceId}",
"model": "PCA20035+solar",
"lastSeen": "${tsISO}"
}
```
6 changes: 5 additions & 1 deletion lambda/onConnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ import { Context, DeviceIdentity } from '@hello.nrfcloud.com/proto/hello'
import { fromEnv } from '@nordicsemiconductor/from-env'
import type { Static } from '@sinclair/typebox'
import type { APIGatewayProxyStructuredResultV2 } from 'aws-lambda'
import { lastSeenRepo } from '../lastSeen/lastSeenRepo.js'
import { connectionsRepository } from '../websocket/connectionsRepository.js'
import type { WebsocketPayload } from './publishToWebsocketClients.js'
import { logger } from './util/logger.js'
import type { AuthorizedEvent } from './ws/AuthorizedEvent.js'

const { EventBusName, TableName } = fromEnv({
const { EventBusName, TableName, LastSeenTableName } = fromEnv({
EventBusName: 'EVENTBUS_NAME',
TableName: 'WEBSOCKET_CONNECTIONS_TABLE_NAME',
LastSeenTableName: 'LAST_SEEN_TABLE_NAME',
})(process.env)

const log = logger('connect')
const eventBus = new EventBridge({})
const db = new DynamoDBClient({})

const repo = connectionsRepository(db, TableName)
const { getLastSeenOrNull } = lastSeenRepo(db, LastSeenTableName)

export const handler = async (
event: AuthorizedEvent,
Expand All @@ -38,6 +41,7 @@ export const handler = async (
'@context': Context.deviceIdentity.toString(),
model,
id: deviceId,
lastSeen: (await getLastSeenOrNull(deviceId))?.toISOString() ?? undefined,
}

log.debug('websocket message', { message })
Expand Down
49 changes: 49 additions & 0 deletions lastSeen/lastSeenRepo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { GetItemCommand, type DynamoDBClient } from '@aws-sdk/client-dynamodb'

export const lastSeenRepo = (
db: DynamoDBClient,
TableName: string,
): {
getLastSeen: (
deviceId: string,
) => Promise<{ error: Error } | { lastSeen: Date | null }>
getLastSeenOrNull: (deviceId: string) => Promise<Date | null>
} => {
const getLastSeen = async (
deviceId: string,
): Promise<{ error: Error } | { lastSeen: Date | null }> => {
try {
const { Item } = await db.send(
new GetItemCommand({
TableName,
Key: {
deviceId: {
S: deviceId,
},
source: {
S: 'deviceMessage',
},
},
ProjectionExpression: '#lastSeen',
ExpressionAttributeNames: {
'#lastSeen': 'lastSeen',
},
}),
)
const lastSeen = Item?.lastSeen?.S
return {
lastSeen: lastSeen === undefined ? null : new Date(lastSeen),
}
} catch (err) {
return { error: err as Error }
}
}

return {
getLastSeen,
getLastSeenOrNull: async (deviceId: string): Promise<Date | null> => {
const res = await getLastSeen(deviceId)
return 'lastSeen' in res ? res.lastSeen : null
},
}
}