Skip to content

Commit

Permalink
replace DynamoDB Streams for the notifications-lambda with SQS and …
Browse files Browse the repository at this point in the history
…a AppSync pipeline resolver

Co-Authored-By: Andrew Nowak <andrew.nowak@guardian.co.uk>
  • Loading branch information
twrichards and andrew-nowak committed Aug 10, 2022
1 parent ce8309a commit c2fc114
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 110 deletions.
1 change: 1 addition & 0 deletions cdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@aws-cdk/aws-lambda-event-sources": "1.147.0",
"@aws-cdk/aws-s3": "1.147.0",
"@aws-cdk/aws-ssm": "1.147.0",
"@aws-cdk/aws-sqs": "1.147.0",
"@aws-cdk/core": "1.147.0"
}
}
102 changes: 79 additions & 23 deletions cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as lambda from "@aws-cdk/aws-lambda";
import * as S3 from "@aws-cdk/aws-s3";
import * as iam from "@aws-cdk/aws-iam";
import * as ssm from "@aws-cdk/aws-ssm";
import * as sqs from "@aws-cdk/aws-sqs";
import * as apigateway from "@aws-cdk/aws-apigateway";
import * as appsync from "@aws-cdk/aws-appsync";
import * as db from "@aws-cdk/aws-dynamodb";
Expand All @@ -23,7 +24,7 @@ import * as eventsTargets from "@aws-cdk/aws-events-targets";
import { join } from "path";
import { APP } from "../shared/constants";
import crypto from "crypto";
import { DynamoEventSource } from "@aws-cdk/aws-lambda-event-sources";
import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources";
import { ENVIRONMENT_VARIABLE_KEYS } from "../shared/environmentVariables";

export class PinBoardStack extends Stack {
Expand Down Expand Up @@ -183,6 +184,11 @@ export class PinBoardStack extends Stack {

const pinboardNotificationsLambdaBasename = "pinboard-notifications-lambda";

const pinboardNotificationsSQS = new sqs.Queue(
thisStack,
`${pinboardNotificationsLambdaBasename}-sqs`
);

const pinboardNotificationsLambda = new lambda.Function(
thisStack,
pinboardNotificationsLambdaBasename,
Expand All @@ -197,16 +203,30 @@ export class PinBoardStack extends Stack {
APP,
[ENVIRONMENT_VARIABLE_KEYS.usersTableName]:
pinboardAppsyncUserTable.tableName,
[ENVIRONMENT_VARIABLE_KEYS.notificationQueueURL]:
pinboardNotificationsSQS.queueUrl,
},
functionName: `${pinboardNotificationsLambdaBasename}-${STAGE}`,
code: lambda.Code.fromBucket(
deployBucket,
`${STACK}/${STAGE}/${pinboardNotificationsLambdaBasename}/${pinboardNotificationsLambdaBasename}.zip`
),
initialPolicy: [readPinboardParamStorePolicyStatement],
initialPolicy: [
readPinboardParamStorePolicyStatement,
new iam.PolicyStatement({
actions: ["sqs:SendMessage"],
effect: iam.Effect.ALLOW,
resources: [pinboardNotificationsSQS.queueArn],
}),
],
}
);
pinboardAppsyncUserTable.grantReadData(pinboardNotificationsLambda);
pinboardNotificationsLambda.addEventSource(
new SqsEventSource(pinboardNotificationsSQS, {
reportBatchItemFailures: true,
})
);

const pinboardAuthLambdaBasename = "pinboard-auth-lambda";

Expand Down Expand Up @@ -286,13 +306,6 @@ export class PinBoardStack extends Stack {
}
);

pinboardNotificationsLambda.addEventSource(
new DynamoEventSource(pinboardAppsyncItemTable, {
maxBatchingWindow: Duration.seconds(10),
startingPosition: lambda.StartingPosition.LATEST,
})
);

const pinboardLastItemSeenByUserTableBaseName =
"pinboard-last-item-seen-by-user-table";

Expand Down Expand Up @@ -329,6 +342,14 @@ export class PinBoardStack extends Stack {
pinboardGridBridgeLambda
);

const pinboardNotificationsLambdaDataSource = pinboardAppsyncApi.addLambdaDataSource(
`${pinboardNotificationsLambdaBasename
.replace("pinboard-", "")
.split("-")
.join("_")}_ds`,
pinboardNotificationsLambda
);

const pinboardItemDataSource = pinboardAppsyncApi.addDynamoDbDataSource(
`${pinboardItemTableBaseName
.replace("pinboard-", "")
Expand Down Expand Up @@ -374,38 +395,73 @@ export class PinBoardStack extends Stack {
}
`)
);
const dynamoFilterResponseMappingTemplate = appsync.MappingTemplate.fromString(
const basicResponseMappingTemplate = appsync.MappingTemplate.fromString(
"$util.toJson($context.result)"
);

pinboardItemDataSource.createResolver({
typeName: "Query",
fieldName: "listItems",
requestMappingTemplate: dynamoFilterRequestMappingTemplate,
responseMappingTemplate: dynamoFilterResponseMappingTemplate,
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardItemDataSource.createResolver({
const insertItemPipelineFunction = new appsync.AppsyncFunction(
thisStack,
"pinboard-insert-item-pipeline-function",
{
name: "insert_item",
api: pinboardAppsyncApi,
dataSource: pinboardItemDataSource,
requestMappingTemplate: resolverBugWorkaround(
appsync.MappingTemplate.dynamoDbPutItem(
appsync.PrimaryKey.partition("id").auto(),
appsync.Values.projecting("input")
.attribute("timestamp")
.is("$util.time.nowEpochSeconds()")
.attribute("userEmail")
.is("$ctx.identity.resolverContext.userEmail")
)
),
responseMappingTemplate: appsync.MappingTemplate.dynamoDbResultItem(),
}
);

const queueNotificationPipelineFunction = new appsync.AppsyncFunction(
thisStack,
"pinboard-queue-notification-pipeline-function",
{
name: "queue_notification",
api: pinboardAppsyncApi,
dataSource: pinboardNotificationsLambdaDataSource,
requestMappingTemplate: resolverBugWorkaround(
appsync.MappingTemplate.lambdaRequest(
"$util.toJson($ctx.prev.result)",
"Invoke"
)
),
responseMappingTemplate: appsync.MappingTemplate.lambdaResult(),
}
);

pinboardAppsyncApi.createResolver({
typeName: "Mutation",
fieldName: "createItem",
requestMappingTemplate: resolverBugWorkaround(
appsync.MappingTemplate.dynamoDbPutItem(
appsync.PrimaryKey.partition("id").auto(),
appsync.Values.projecting("input")
.attribute("timestamp")
.is("$util.time.nowEpochSeconds()")
.attribute("userEmail")
.is("$ctx.identity.resolverContext.userEmail")
)
appsync.MappingTemplate.fromString("{}")
),
responseMappingTemplate: appsync.MappingTemplate.dynamoDbResultItem(),
pipelineConfig: [
insertItemPipelineFunction,
queueNotificationPipelineFunction,
],
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardLastItemSeenByUserDataSource.createResolver({
typeName: "Query",
fieldName: "listLastItemSeenByUsers",
requestMappingTemplate: dynamoFilterRequestMappingTemplate, // TODO consider custom resolver for performance
responseMappingTemplate: dynamoFilterResponseMappingTemplate,
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardLastItemSeenByUserDataSource.createResolver({
Expand Down Expand Up @@ -479,7 +535,7 @@ export class PinBoardStack extends Stack {
}
`)
),
responseMappingTemplate: dynamoFilterResponseMappingTemplate,
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardUserDataSource.createResolver({
Expand Down

0 comments on commit c2fc114

Please sign in to comment.