Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdk/resources/WebsocketAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ export class WebsocketAPI extends Construct {
},
)
connectionsTable.table.grantReadWriteData(publishToWebsocketClients)
eventBus.eventBus.grantPutEventsTo(publishToWebsocketClients)
new Events.Rule(this, 'publishToWebsocketClientsRule', {
eventPattern: {
source: ['thingy.ws'],
Expand Down
26 changes: 2 additions & 24 deletions lambda/onDisconnect.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,32 @@
import { DeleteItemCommand, DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { EventBridge } from '@aws-sdk/client-eventbridge'
import { fromEnv } from '@nordicsemiconductor/from-env'
import type { APIGatewayProxyStructuredResultV2 } from 'aws-lambda'
import { logger } from './util/logger.js'
import type { AuthorizedEvent } from './ws/AuthorizedEvent.js'

const { TableName, EventBusName } = fromEnv({
const { TableName } = fromEnv({
TableName: 'WEBSOCKET_CONNECTIONS_TABLE_NAME',
EventBusName: 'EVENTBUS_NAME',
})(process.env)

const log = logger('disconnect')
const db = new DynamoDBClient({})
const eventBus = new EventBridge({})

export const handler = async (
event: AuthorizedEvent,
): Promise<APIGatewayProxyStructuredResultV2> => {
log.info('onDisconnect event', { event })

const { deviceId } = event.requestContext.authorizer

const result = await db.send(
await db.send(
new DeleteItemCommand({
TableName,
Key: {
connectionId: {
S: event.requestContext.connectionId,
},
},
ReturnValues: 'ALL_OLD',
}),
)

if (result.Attributes?.deviceId?.S !== undefined) {
await eventBus.putEvents({
Entries: [
{
EventBusName,
Source: 'thingy.ws',
DetailType: 'disconnect',
Detail: JSON.stringify({
deviceId,
connectionId: event.requestContext.connectionId,
}),
},
],
})
}

return {
statusCode: 200,
}
Expand Down
19 changes: 5 additions & 14 deletions lambda/publishToWebsocketClients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ export type WebsocketPayload = {
message: Record<string, unknown>
}

const { connectionsTableName, websocketManagementAPIURL, eventBusName } =
fromEnv({
connectionsTableName: 'WEBSOCKET_CONNECTIONS_TABLE_NAME',
websocketManagementAPIURL: 'WEBSOCKET_MANAGEMENT_API_URL',
eventBusName: 'EVENTBUS_NAME',
})(process.env)
const { connectionsTableName, websocketManagementAPIURL } = fromEnv({
connectionsTableName: 'WEBSOCKET_CONNECTIONS_TABLE_NAME',
websocketManagementAPIURL: 'WEBSOCKET_MANAGEMENT_API_URL',
})(process.env)

const log = logger('publishToWebsockets')
const db = new DynamoDBClient({})
Expand All @@ -30,19 +28,12 @@ const notifier = notifyClients({
db,
connectionsTableName,
apiGwManagementClient,
eventBusName,
})

export const handler = async (
event: EventBridgeEvent<
'message' | 'connect' | 'disconnect' | 'error',
WebsocketPayload
>,
event: EventBridgeEvent<'message' | 'connect' | 'error', WebsocketPayload>,
): Promise<void> => {
log.info('publishToWebSocketClients event', { event })

// Do not publish websocket for disconnect type
if (event['detail-type'] === 'disconnect') return

await notifier(event.detail)
}
42 changes: 9 additions & 33 deletions websocket/notifyClients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,21 @@ import {
DynamoDBClient,
ExecuteStatementCommand,
} from '@aws-sdk/client-dynamodb'
import {
EventBridgeClient,
PutEventsCommand,
} from '@aws-sdk/client-eventbridge'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import type { WebsocketPayload } from '../lambda/publishToWebsocketClients.js'
import { logger } from '../lambda/util/logger.js'

const log = logger('notifyClients')
const eventBus = new EventBridgeClient({})

export const notifyClients =
({
db,
connectionsTableName,
apiGwManagementClient,
eventBusName,
}: {
db: DynamoDBClient
connectionsTableName: string
apiGwManagementClient: ApiGatewayManagementApiClient
eventBusName: string
}) =>
async (event: WebsocketPayload): Promise<void> => {
const { connectionId, deviceId, message } = event
Expand All @@ -51,33 +44,16 @@ export const notifyClients =
const error = err as Error
if (error.name === 'GoneException') {
log.warn(`Client is gone`, connectionId)
await Promise.all([
eventBus.send(
new PutEventsCommand({
Entries: [
{
EventBusName: eventBusName,
Source: 'thingy.ws',
DetailType: 'disconnect',
Detail: JSON.stringify(<WebsocketPayload>{
deviceId,
connectionId,
}),
},
],
}),
),
db.send(
new DeleteItemCommand({
TableName: connectionsTableName,
Key: {
connectionId: {
S: connectionId,
},
await db.send(
new DeleteItemCommand({
TableName: connectionsTableName,
Key: {
connectionId: {
S: connectionId,
},
}),
),
])
},
}),
)
continue
}
log.error(error.message, { error })
Expand Down