Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace DynamoDB Streams for the notifications-lambda with SQS and a AppSync pipeline resolver #150

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ADRs/database.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# ADR: database

_Initially written 8th August 2012, but there are some updates._

## Current Scenario : DynamoDB

When getting started with AWS AppSync (TODO add ADR for this), DynamoDB is an easy choice for persisting data and has served us very well so far, primarily for the following reasons...
Expand Down Expand Up @@ -36,4 +38,8 @@ With RDS Aurora being the only option, this leaves us with the choice of MySQL v

## Chosen Solution : RDS Aurora (Postgres flavour)

This addresses all of the limitations outlined above. The primary concern was how to replicate the behaviour we get from 'DynamoDB Streams' (to invoke the `notifications-lambda` on inserts into the Item table), fortunately [RDS Aurora supports invoking lambdas directly from within the DB engine](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/PostgreSQL-Lambda.html) and is in fact better, because we can perform all the joins and filters in the DB engine (in the insert trigger logic) to build the lambda payload (so the lambda needn't do queries back to the database - as it does at the moment) and conditionally invoke.
This addresses all the limitations outlined above. The primary concern was how to replicate the behaviour we get from 'DynamoDB Streams' (to invoke the `notifications-lambda` on inserts into the Item table), ~~fortunately [RDS Aurora supports invoking lambdas directly from within the DB engine](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/PostgreSQL-Lambda.html) and is in fact better, because we can perform all the joins and filters in the DB engine (in the insert trigger logic) to build the lambda payload (so the lambda needn't do queries back to the database - as it does at the moment) and conditionally invoke.~~ **UPDATE 10th August 2022...**
upon further investigation/experimentation, due to the fact we must use Aurora ServerlessV1 (because it's the only thing which supports the `data-api`, which AppSync relies upon) this doesn't support attaching IAM roles and so we cannot permission the RDS cluster to invoke the lambda - putting an end to that approach. That leaves us with two choices...

- ~~add a lambda and RDS proxy between AppSync and RDS (as explained in https://aws.amazon.com/blogs/mobile/appsync-graphql-sql-rds-proxy)~~ - this seems like too much infrastructure complexity (at this point)
- convert the `createItem` AppSync resolver to an AppSync 'pipeline' resolver, where the first function does the DB insert as before, then the second function invokes the lambda (and we leave the lambda to look-up what it needs to from the DB, a shame but worth it). To avoid the user/client waiting on all the notifications being sent before they know their message has been inserted, the lambda invoked from the pipeline resolver just queues the item to be dealt with later, so the resolver can return as quickly as possible (unfortunately there's no async invoke of lambdas from AppSync as far as we could find out) - **this work is done in https://github.com/guardian/pinboard/pull/150.**
1 change: 1 addition & 0 deletions cdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@aws-cdk/aws-lambda-event-sources": "1.147.0",
"@aws-cdk/aws-s3": "1.147.0",
"@aws-cdk/aws-ssm": "1.147.0",
"@aws-cdk/aws-sqs": "1.147.0",
"@aws-cdk/core": "1.147.0"
}
}
102 changes: 79 additions & 23 deletions cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as lambda from "@aws-cdk/aws-lambda";
import * as S3 from "@aws-cdk/aws-s3";
import * as iam from "@aws-cdk/aws-iam";
import * as ssm from "@aws-cdk/aws-ssm";
import * as sqs from "@aws-cdk/aws-sqs";
import * as apigateway from "@aws-cdk/aws-apigateway";
import * as appsync from "@aws-cdk/aws-appsync";
import * as db from "@aws-cdk/aws-dynamodb";
Expand All @@ -23,7 +24,7 @@ import * as eventsTargets from "@aws-cdk/aws-events-targets";
import { join } from "path";
import { APP } from "../shared/constants";
import crypto from "crypto";
import { DynamoEventSource } from "@aws-cdk/aws-lambda-event-sources";
import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources";
import { ENVIRONMENT_VARIABLE_KEYS } from "../shared/environmentVariables";

export class PinBoardStack extends Stack {
Expand Down Expand Up @@ -183,6 +184,11 @@ export class PinBoardStack extends Stack {

const pinboardNotificationsLambdaBasename = "pinboard-notifications-lambda";

const pinboardNotificationsSQS = new sqs.Queue(
thisStack,
`${pinboardNotificationsLambdaBasename}-sqs`
);

const pinboardNotificationsLambda = new lambda.Function(
thisStack,
pinboardNotificationsLambdaBasename,
Expand All @@ -197,16 +203,30 @@ export class PinBoardStack extends Stack {
APP,
[ENVIRONMENT_VARIABLE_KEYS.usersTableName]:
pinboardAppsyncUserTable.tableName,
[ENVIRONMENT_VARIABLE_KEYS.notificationQueueURL]:
pinboardNotificationsSQS.queueUrl,
},
functionName: `${pinboardNotificationsLambdaBasename}-${STAGE}`,
code: lambda.Code.fromBucket(
deployBucket,
`${STACK}/${STAGE}/${pinboardNotificationsLambdaBasename}/${pinboardNotificationsLambdaBasename}.zip`
),
initialPolicy: [readPinboardParamStorePolicyStatement],
initialPolicy: [
readPinboardParamStorePolicyStatement,
new iam.PolicyStatement({
actions: ["sqs:SendMessage"],
effect: iam.Effect.ALLOW,
resources: [pinboardNotificationsSQS.queueArn],
}),
],
}
);
pinboardAppsyncUserTable.grantReadData(pinboardNotificationsLambda);
pinboardNotificationsLambda.addEventSource(
new SqsEventSource(pinboardNotificationsSQS, {
reportBatchItemFailures: true,
})
);

const pinboardAuthLambdaBasename = "pinboard-auth-lambda";

Expand Down Expand Up @@ -286,13 +306,6 @@ export class PinBoardStack extends Stack {
}
);

pinboardNotificationsLambda.addEventSource(
new DynamoEventSource(pinboardAppsyncItemTable, {
maxBatchingWindow: Duration.seconds(10),
startingPosition: lambda.StartingPosition.LATEST,
})
);

const pinboardLastItemSeenByUserTableBaseName =
"pinboard-last-item-seen-by-user-table";

Expand Down Expand Up @@ -329,6 +342,14 @@ export class PinBoardStack extends Stack {
pinboardGridBridgeLambda
);

const pinboardNotificationsLambdaDataSource = pinboardAppsyncApi.addLambdaDataSource(
`${pinboardNotificationsLambdaBasename
.replace("pinboard-", "")
.split("-")
.join("_")}_ds`,
pinboardNotificationsLambda
);

const pinboardItemDataSource = pinboardAppsyncApi.addDynamoDbDataSource(
`${pinboardItemTableBaseName
.replace("pinboard-", "")
Expand Down Expand Up @@ -374,38 +395,73 @@ export class PinBoardStack extends Stack {
}
`)
);
const dynamoFilterResponseMappingTemplate = appsync.MappingTemplate.fromString(
const basicResponseMappingTemplate = appsync.MappingTemplate.fromString(
"$util.toJson($context.result)"
);

pinboardItemDataSource.createResolver({
typeName: "Query",
fieldName: "listItems",
requestMappingTemplate: dynamoFilterRequestMappingTemplate,
responseMappingTemplate: dynamoFilterResponseMappingTemplate,
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardItemDataSource.createResolver({
const insertItemPipelineFunction = new appsync.AppsyncFunction(
thisStack,
"pinboard-insert-item-pipeline-function",
{
name: "insert_item",
api: pinboardAppsyncApi,
dataSource: pinboardItemDataSource,
requestMappingTemplate: resolverBugWorkaround(
appsync.MappingTemplate.dynamoDbPutItem(
appsync.PrimaryKey.partition("id").auto(),
appsync.Values.projecting("input")
.attribute("timestamp")
.is("$util.time.nowEpochSeconds()")
.attribute("userEmail")
.is("$ctx.identity.resolverContext.userEmail")
)
),
responseMappingTemplate: appsync.MappingTemplate.dynamoDbResultItem(),
}
);

const queueNotificationPipelineFunction = new appsync.AppsyncFunction(
thisStack,
"pinboard-queue-notification-pipeline-function",
{
name: "queue_notification",
api: pinboardAppsyncApi,
dataSource: pinboardNotificationsLambdaDataSource,
requestMappingTemplate: resolverBugWorkaround(
appsync.MappingTemplate.lambdaRequest(
"$util.toJson($ctx.prev.result)",
"Invoke"
)
),
responseMappingTemplate: appsync.MappingTemplate.lambdaResult(),
}
);

pinboardAppsyncApi.createResolver({
typeName: "Mutation",
fieldName: "createItem",
requestMappingTemplate: resolverBugWorkaround(
appsync.MappingTemplate.dynamoDbPutItem(
appsync.PrimaryKey.partition("id").auto(),
appsync.Values.projecting("input")
.attribute("timestamp")
.is("$util.time.nowEpochSeconds()")
.attribute("userEmail")
.is("$ctx.identity.resolverContext.userEmail")
)
appsync.MappingTemplate.fromString("{}")
),
responseMappingTemplate: appsync.MappingTemplate.dynamoDbResultItem(),
pipelineConfig: [
insertItemPipelineFunction,
queueNotificationPipelineFunction,
],
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardLastItemSeenByUserDataSource.createResolver({
typeName: "Query",
fieldName: "listLastItemSeenByUsers",
requestMappingTemplate: dynamoFilterRequestMappingTemplate, // TODO consider custom resolver for performance
responseMappingTemplate: dynamoFilterResponseMappingTemplate,
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardLastItemSeenByUserDataSource.createResolver({
Expand Down Expand Up @@ -479,7 +535,7 @@ export class PinBoardStack extends Stack {
}
`)
),
responseMappingTemplate: dynamoFilterResponseMappingTemplate,
responseMappingTemplate: basicResponseMappingTemplate,
});

pinboardUserDataSource.createResolver({
Expand Down
Loading