diff --git a/cdk/package.json b/cdk/package.json index a066b62cf..32b443931 100644 --- a/cdk/package.json +++ b/cdk/package.json @@ -28,6 +28,7 @@ "@aws-cdk/aws-lambda-event-sources": "1.147.0", "@aws-cdk/aws-s3": "1.147.0", "@aws-cdk/aws-ssm": "1.147.0", + "@aws-cdk/aws-sqs": "1.147.0", "@aws-cdk/core": "1.147.0" } } diff --git a/cdk/stack.ts b/cdk/stack.ts index b6bbbf567..e678e27a6 100644 --- a/cdk/stack.ts +++ b/cdk/stack.ts @@ -13,6 +13,7 @@ import * as lambda from "@aws-cdk/aws-lambda"; import * as S3 from "@aws-cdk/aws-s3"; import * as iam from "@aws-cdk/aws-iam"; import * as ssm from "@aws-cdk/aws-ssm"; +import * as sqs from "@aws-cdk/aws-sqs"; import * as apigateway from "@aws-cdk/aws-apigateway"; import * as appsync from "@aws-cdk/aws-appsync"; import * as db from "@aws-cdk/aws-dynamodb"; @@ -23,7 +24,7 @@ import * as eventsTargets from "@aws-cdk/aws-events-targets"; import { join } from "path"; import { APP } from "../shared/constants"; import crypto from "crypto"; -import { DynamoEventSource } from "@aws-cdk/aws-lambda-event-sources"; +import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources"; import { ENVIRONMENT_VARIABLE_KEYS } from "../shared/environmentVariables"; export class PinBoardStack extends Stack { @@ -183,6 +184,11 @@ export class PinBoardStack extends Stack { const pinboardNotificationsLambdaBasename = "pinboard-notifications-lambda"; + const pinboardNotificationsSQS = new sqs.Queue( + thisStack, + `${pinboardNotificationsLambdaBasename}-sqs` + ); + const pinboardNotificationsLambda = new lambda.Function( thisStack, pinboardNotificationsLambdaBasename, @@ -197,16 +203,30 @@ export class PinBoardStack extends Stack { APP, [ENVIRONMENT_VARIABLE_KEYS.usersTableName]: pinboardAppsyncUserTable.tableName, + [ENVIRONMENT_VARIABLE_KEYS.notificationQueueURL]: + pinboardNotificationsSQS.queueUrl, }, functionName: `${pinboardNotificationsLambdaBasename}-${STAGE}`, code: lambda.Code.fromBucket( deployBucket, `${STACK}/${STAGE}/${pinboardNotificationsLambdaBasename}/${pinboardNotificationsLambdaBasename}.zip` ), - initialPolicy: [readPinboardParamStorePolicyStatement], + initialPolicy: [ + readPinboardParamStorePolicyStatement, + new iam.PolicyStatement({ + actions: ["sqs:SendMessage"], + effect: iam.Effect.ALLOW, + resources: [pinboardNotificationsSQS.queueArn], + }), + ], } ); pinboardAppsyncUserTable.grantReadData(pinboardNotificationsLambda); + pinboardNotificationsLambda.addEventSource( + new SqsEventSource(pinboardNotificationsSQS, { + reportBatchItemFailures: true, + }) + ); const pinboardAuthLambdaBasename = "pinboard-auth-lambda"; @@ -286,13 +306,6 @@ export class PinBoardStack extends Stack { } ); - pinboardNotificationsLambda.addEventSource( - new DynamoEventSource(pinboardAppsyncItemTable, { - maxBatchingWindow: Duration.seconds(10), - startingPosition: lambda.StartingPosition.LATEST, - }) - ); - const pinboardLastItemSeenByUserTableBaseName = "pinboard-last-item-seen-by-user-table"; @@ -329,6 +342,14 @@ export class PinBoardStack extends Stack { pinboardGridBridgeLambda ); + const pinboardNotificationsLambdaDataSource = pinboardAppsyncApi.addLambdaDataSource( + `${pinboardNotificationsLambdaBasename + .replace("pinboard-", "") + .split("-") + .join("_")}_ds`, + pinboardNotificationsLambda + ); + const pinboardItemDataSource = pinboardAppsyncApi.addDynamoDbDataSource( `${pinboardItemTableBaseName .replace("pinboard-", "") @@ -374,7 +395,7 @@ export class PinBoardStack extends Stack { } `) ); - const dynamoFilterResponseMappingTemplate = appsync.MappingTemplate.fromString( + const basicResponseMappingTemplate = appsync.MappingTemplate.fromString( "$util.toJson($context.result)" ); @@ -382,30 +403,65 @@ export class PinBoardStack extends Stack { typeName: "Query", fieldName: "listItems", requestMappingTemplate: dynamoFilterRequestMappingTemplate, - responseMappingTemplate: dynamoFilterResponseMappingTemplate, + responseMappingTemplate: basicResponseMappingTemplate, }); - pinboardItemDataSource.createResolver({ + const insertItemPipelineFunction = new appsync.AppsyncFunction( + thisStack, + "pinboard-insert-item-pipeline-function", + { + name: "insert_item", + api: pinboardAppsyncApi, + dataSource: pinboardItemDataSource, + requestMappingTemplate: resolverBugWorkaround( + appsync.MappingTemplate.dynamoDbPutItem( + appsync.PrimaryKey.partition("id").auto(), + appsync.Values.projecting("input") + .attribute("timestamp") + .is("$util.time.nowEpochSeconds()") + .attribute("userEmail") + .is("$ctx.identity.resolverContext.userEmail") + ) + ), + responseMappingTemplate: appsync.MappingTemplate.dynamoDbResultItem(), + } + ); + + const queueNotificationPipelineFunction = new appsync.AppsyncFunction( + thisStack, + "pinboard-queue-notification-pipeline-function", + { + name: "queue_notification", + api: pinboardAppsyncApi, + dataSource: pinboardNotificationsLambdaDataSource, + requestMappingTemplate: resolverBugWorkaround( + appsync.MappingTemplate.lambdaRequest( + "$util.toJson($ctx.prev.result)", + "Invoke" + ) + ), + responseMappingTemplate: appsync.MappingTemplate.lambdaResult(), + } + ); + + pinboardAppsyncApi.createResolver({ typeName: "Mutation", fieldName: "createItem", requestMappingTemplate: resolverBugWorkaround( - appsync.MappingTemplate.dynamoDbPutItem( - appsync.PrimaryKey.partition("id").auto(), - appsync.Values.projecting("input") - .attribute("timestamp") - .is("$util.time.nowEpochSeconds()") - .attribute("userEmail") - .is("$ctx.identity.resolverContext.userEmail") - ) + appsync.MappingTemplate.fromString("{}") ), - responseMappingTemplate: appsync.MappingTemplate.dynamoDbResultItem(), + pipelineConfig: [ + insertItemPipelineFunction, + queueNotificationPipelineFunction, + ], + responseMappingTemplate: basicResponseMappingTemplate, }); pinboardLastItemSeenByUserDataSource.createResolver({ typeName: "Query", fieldName: "listLastItemSeenByUsers", requestMappingTemplate: dynamoFilterRequestMappingTemplate, // TODO consider custom resolver for performance - responseMappingTemplate: dynamoFilterResponseMappingTemplate, + responseMappingTemplate: basicResponseMappingTemplate, }); pinboardLastItemSeenByUserDataSource.createResolver({ @@ -479,7 +535,7 @@ export class PinBoardStack extends Stack { } `) ), - responseMappingTemplate: dynamoFilterResponseMappingTemplate, + responseMappingTemplate: basicResponseMappingTemplate, }); pinboardUserDataSource.createResolver({ diff --git a/cdk/test/__snapshots__/stack.test.ts.snap b/cdk/test/__snapshots__/stack.test.ts.snap index ab0b5cc98..fed157338 100644 --- a/cdk/test/__snapshots__/stack.test.ts.snap +++ b/cdk/test/__snapshots__/stack.test.ts.snap @@ -107,6 +107,42 @@ Object { }, "Type": "AWS::AppSync::GraphQLApi", }, + "pinboardappsyncapiMutationcreateItemResolverFA9560B7": Object { + "DependsOn": Array [ + "pinboardappsyncapiSchema868D9F5B", + ], + "Properties": Object { + "ApiId": Object { + "Fn::GetAtt": Array [ + "pinboardappsyncapi9D519400", + "ApiId", + ], + }, + "FieldName": "createItem", + "Kind": "PIPELINE", + "PipelineConfig": Object { + "Functions": Array [ + Object { + "Fn::GetAtt": Array [ + "pinboardinsertitempipelinefunction3F6F60CF", + "FunctionId", + ], + }, + Object { + "Fn::GetAtt": Array [ + "pinboardqueuenotificationpipelinefunction6293EB5F", + "FunctionId", + ], + }, + ], + }, + "RequestMappingTemplate": "## schema checksum : 069f29169cf579ccf5c31b1d41c78246 +{}", + "ResponseMappingTemplate": "$util.toJson($context.result)", + "TypeName": "Mutation", + }, + "Type": "AWS::AppSync::Resolver", + }, "pinboardappsyncapiSchema868D9F5B": Object { "Properties": Object { "ApiId": Object { @@ -471,39 +507,6 @@ $util.toJson($ctx.result)", }, "Type": "AWS::AppSync::DataSource", }, - "pinboardappsyncapiitemtabledatasourceMutationcreateItemResolver7AEC9201": Object { - "DependsOn": Array [ - "pinboardappsyncapiitemtabledatasourceFD08E0E9", - "pinboardappsyncapiSchema868D9F5B", - ], - "Properties": Object { - "ApiId": Object { - "Fn::GetAtt": Array [ - "pinboardappsyncapi9D519400", - "ApiId", - ], - }, - "DataSourceName": "item_table_datasource", - "FieldName": "createItem", - "Kind": "UNIT", - "RequestMappingTemplate": "## schema checksum : 069f29169cf579ccf5c31b1d41c78246 - - #set($input = $ctx.args.input) - $util.qr($input.put(\\"timestamp\\", $util.time.nowEpochSeconds())) -$util.qr($input.put(\\"userEmail\\", $ctx.identity.resolverContext.userEmail)) - { - \\"version\\": \\"2017-02-28\\", - \\"operation\\": \\"PutItem\\", - \\"key\\" : { - \\"id\\" : $util.dynamodb.toDynamoDBJson($util.autoId()) - }, - \\"attributeValues\\": $util.dynamodb.toMapValuesJson($input) - }", - "ResponseMappingTemplate": "$util.toJson($ctx.result)", - "TypeName": "Mutation", - }, - "Type": "AWS::AppSync::Resolver", - }, "pinboardappsyncapiitemtabledatasourceQuerylistItemsResolver15E3DD35": Object { "DependsOn": Array [ "pinboardappsyncapiitemtabledatasourceFD08E0E9", @@ -783,6 +786,94 @@ $util.qr($input.put(\\"userEmail\\", $ctx.identity.resolverContext.userEmail)) }, "Type": "AWS::IAM::Policy", }, + "pinboardappsyncapinotificationslambdadsFB281B61": Object { + "Properties": Object { + "ApiId": Object { + "Fn::GetAtt": Array [ + "pinboardappsyncapi9D519400", + "ApiId", + ], + }, + "LambdaConfig": Object { + "LambdaFunctionArn": Object { + "Fn::GetAtt": Array [ + "pinboardnotificationslambdaC35CECF7", + "Arn", + ], + }, + }, + "Name": "notifications_lambda_ds", + "ServiceRoleArn": Object { + "Fn::GetAtt": Array [ + "pinboardappsyncapinotificationslambdadsServiceRoleC527BA2E", + "Arn", + ], + }, + "Type": "AWS_LAMBDA", + }, + "Type": "AWS::AppSync::DataSource", + }, + "pinboardappsyncapinotificationslambdadsServiceRoleC527BA2E": Object { + "Properties": Object { + "AssumeRolePolicyDocument": Object { + "Statement": Array [ + Object { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": Object { + "Service": "appsync.amazonaws.com", + }, + }, + ], + "Version": "2012-10-17", + }, + "Tags": Array [ + Object { + "Key": "App", + "Value": "pinboard", + }, + Object { + "Key": "Stack", + "Value": Object { + "Ref": "Stack", + }, + }, + Object { + "Key": "Stage", + "Value": Object { + "Ref": "Stage", + }, + }, + ], + }, + "Type": "AWS::IAM::Role", + }, + "pinboardappsyncapinotificationslambdadsServiceRoleDefaultPolicyC86ED932": Object { + "Properties": Object { + "PolicyDocument": Object { + "Statement": Array [ + Object { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": Object { + "Fn::GetAtt": Array [ + "pinboardnotificationslambdaC35CECF7", + "Arn", + ], + }, + }, + ], + "Version": "2012-10-17", + }, + "PolicyName": "pinboardappsyncapinotificationslambdadsServiceRoleDefaultPolicyC86ED932", + "Roles": Array [ + Object { + "Ref": "pinboardappsyncapinotificationslambdadsServiceRoleC527BA2E", + }, + ], + }, + "Type": "AWS::IAM::Policy", + }, "pinboardappsyncapiusertabledatasource482E1AE5": Object { "Properties": Object { "ApiId": Object { @@ -2254,6 +2345,38 @@ $util.toJson($ctx.result)", }, "Type": "AWS::IAM::Policy", }, + "pinboardinsertitempipelinefunction3F6F60CF": Object { + "DependsOn": Array [ + "pinboardappsyncapiitemtabledatasourceFD08E0E9", + "pinboardappsyncapiSchema868D9F5B", + ], + "Properties": Object { + "ApiId": Object { + "Fn::GetAtt": Array [ + "pinboardappsyncapi9D519400", + "ApiId", + ], + }, + "DataSourceName": "item_table_datasource", + "FunctionVersion": "2018-05-29", + "Name": "insert_item", + "RequestMappingTemplate": "## schema checksum : 069f29169cf579ccf5c31b1d41c78246 + + #set($input = $ctx.args.input) + $util.qr($input.put(\\"timestamp\\", $util.time.nowEpochSeconds())) +$util.qr($input.put(\\"userEmail\\", $ctx.identity.resolverContext.userEmail)) + { + \\"version\\": \\"2017-02-28\\", + \\"operation\\": \\"PutItem\\", + \\"key\\" : { + \\"id\\" : $util.dynamodb.toDynamoDBJson($util.autoId()) + }, + \\"attributeValues\\": $util.dynamodb.toMapValuesJson($input) + }", + "ResponseMappingTemplate": "$util.toJson($ctx.result)", + }, + "Type": "AWS::AppSync::FunctionConfiguration", + }, "pinboarditemtable83382753": Object { "DeletionPolicy": "Retain", "Properties": Object { @@ -2382,6 +2505,9 @@ $util.toJson($ctx.result)", "Environment": Object { "Variables": Object { "APP": "pinboard", + "NOTIFICATION_QUEUE_URL": Object { + "Ref": "pinboardnotificationslambdasqsC013918E", + }, "STACK": Object { "Ref": "Stack", }, @@ -2435,23 +2561,6 @@ $util.toJson($ctx.result)", }, "Type": "AWS::Lambda::Function", }, - "pinboardnotificationslambdaDynamoDBEventSourcePinBoardStackpinboarditemtableB307C42935A18399": Object { - "Properties": Object { - "BatchSize": 100, - "EventSourceArn": Object { - "Fn::GetAtt": Array [ - "pinboarditemtable83382753", - "StreamArn", - ], - }, - "FunctionName": Object { - "Ref": "pinboardnotificationslambdaC35CECF7", - }, - "MaximumBatchingWindowInSeconds": 10, - "StartingPosition": "LATEST", - }, - "Type": "AWS::Lambda::EventSourceMapping", - }, "pinboardnotificationslambdaServiceRole2F6EBFE9": Object { "Properties": Object { "AssumeRolePolicyDocument": Object { @@ -2525,6 +2634,16 @@ $util.toJson($ctx.result)", ], }, }, + Object { + "Action": "sqs:SendMessage", + "Effect": "Allow", + "Resource": Object { + "Fn::GetAtt": Array [ + "pinboardnotificationslambdasqsC013918E", + "Arn", + ], + }, + }, Object { "Action": Array [ "dynamodb:BatchGetItem", @@ -2549,22 +2668,19 @@ $util.toJson($ctx.result)", }, ], }, - Object { - "Action": "dynamodb:ListStreams", - "Effect": "Allow", - "Resource": "*", - }, Object { "Action": Array [ - "dynamodb:DescribeStream", - "dynamodb:GetRecords", - "dynamodb:GetShardIterator", + "sqs:ReceiveMessage", + "sqs:ChangeMessageVisibility", + "sqs:GetQueueUrl", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes", ], "Effect": "Allow", "Resource": Object { "Fn::GetAtt": Array [ - "pinboarditemtable83382753", - "StreamArn", + "pinboardnotificationslambdasqsC013918E", + "Arn", ], }, }, @@ -2580,6 +2696,69 @@ $util.toJson($ctx.result)", }, "Type": "AWS::IAM::Policy", }, + "pinboardnotificationslambdaSqsEventSourcePinBoardStackpinboardnotificationslambdasqs6993908FF9F3EA04": Object { + "Properties": Object { + "EventSourceArn": Object { + "Fn::GetAtt": Array [ + "pinboardnotificationslambdasqsC013918E", + "Arn", + ], + }, + "FunctionName": Object { + "Ref": "pinboardnotificationslambdaC35CECF7", + }, + "FunctionResponseTypes": Array [ + "ReportBatchItemFailures", + ], + }, + "Type": "AWS::Lambda::EventSourceMapping", + }, + "pinboardnotificationslambdasqsC013918E": Object { + "DeletionPolicy": "Delete", + "Properties": Object { + "Tags": Array [ + Object { + "Key": "App", + "Value": "pinboard", + }, + Object { + "Key": "Stack", + "Value": Object { + "Ref": "Stack", + }, + }, + Object { + "Key": "Stage", + "Value": Object { + "Ref": "Stage", + }, + }, + ], + }, + "Type": "AWS::SQS::Queue", + "UpdateReplacePolicy": "Delete", + }, + "pinboardqueuenotificationpipelinefunction6293EB5F": Object { + "DependsOn": Array [ + "pinboardappsyncapinotificationslambdadsFB281B61", + "pinboardappsyncapiSchema868D9F5B", + ], + "Properties": Object { + "ApiId": Object { + "Fn::GetAtt": Array [ + "pinboardappsyncapi9D519400", + "ApiId", + ], + }, + "DataSourceName": "notifications_lambda_ds", + "FunctionVersion": "2018-05-29", + "Name": "queue_notification", + "RequestMappingTemplate": "## schema checksum : 069f29169cf579ccf5c31b1d41c78246 +{\\"version\\": \\"2017-02-28\\", \\"operation\\": \\"Invoke\\", \\"payload\\": $util.toJson($ctx.prev.result)}", + "ResponseMappingTemplate": "$util.toJson($ctx.result)", + }, + "Type": "AWS::AppSync::FunctionConfiguration", + }, "pinboardusersrefresherlambda2D488032": Object { "DependsOn": Array [ "pinboardusersrefresherlambdaServiceRoleDefaultPolicy0F8B88A3", diff --git a/notifications-lambda/run.ts b/notifications-lambda/run.ts index eba05324f..8eda2411e 100644 --- a/notifications-lambda/run.ts +++ b/notifications-lambda/run.ts @@ -1,20 +1,22 @@ import { handler } from "./src"; +import { Item } from "../shared/graphql/graphql"; +import { SQSRecord } from "aws-lambda"; + +const item: Item = { + pinboardId: "63923", + payload: null, + mentions: ["tom.richards@guardian.co.uk"], + userEmail: "tom.richards@guardian.co.uk", + id: "535b86e2-4f01-4f60-a2d0-a5e4f5a7d312", + message: "testing one two three", + type: "message-only", + timestamp: "1630517452", +}; handler({ Records: [ { - dynamodb: { - NewImage: { - pinboardId: { S: "63923" }, - payload: { NULL: true }, - mentions: { L: [{ S: "tom.richards@guardian.co.uk" }] }, - userEmail: { S: "tom.richards@guardian.co.uk" }, - id: { S: "535b86e2-4f01-4f60-a2d0-a5e4f5a7d312" }, - message: { S: "testing one two three" }, - type: { S: "message-only" }, - timestamp: { N: "1630517452" }, - }, - }, - }, + body: JSON.stringify(item), + } as SQSRecord, // casting here to avoid populating loads of fields of SQSRecord which are not used by the handler ], }); diff --git a/notifications-lambda/src/index.ts b/notifications-lambda/src/index.ts index 3b577680c..58299bed9 100644 --- a/notifications-lambda/src/index.ts +++ b/notifications-lambda/src/index.ts @@ -8,8 +8,9 @@ import { Key } from "aws-sdk/clients/dynamodb"; import { Item, MyUser } from "../../shared/graphql/graphql"; import { getEnvironmentVariableOrThrow } from "../../shared/environmentVariables"; import { publicVapidKey } from "../../shared/constants"; -import { DynamoDBStreamEvent } from "aws-lambda"; import { PushSubscription } from "web-push"; +import { SQSEvent } from "aws-lambda/trigger/sqs"; +import { isItem } from "../../shared/graphql/extraTypes"; type UserWithWebPushSubscription = MyUser & { webPushSubscription: PushSubscription; @@ -21,24 +22,36 @@ const isUserMentioned = (item: Item, user: MyUser) => const doesUserManuallyHavePinboardOpen = (item: Item, user: MyUser) => user.manuallyOpenedPinboardIds?.includes(item.pinboardId); -export const handler = async (event: DynamoDBStreamEvent) => { +export const handler = async (event: Item | SQSEvent) => { + console.dir(event); + + if (isItem(event)) { + const item: Item = event; + const sqs = new AWS.SQS(); + await sqs + .sendMessage({ + QueueUrl: getEnvironmentVariableOrThrow("notificationQueueURL"), + DelaySeconds: 0, + MessageBody: JSON.stringify(item), + }) + .promise() + .then((sqsResult) => console.log(sqsResult)) + .catch((sqsError) => + console.error("failed to queue item for notification", sqsError) + ); + return item; // always return success so that pipeline never fails on this step + } + + const items = event.Records.map( + (sqsRecord) => JSON.parse(sqsRecord.body) as Item + ); + const dynamo = new AWS.DynamoDB.DocumentClient(standardAwsConfig); const usersTableName = getEnvironmentVariableOrThrow("usersTableName"); const privateVapidKey = await pinboardSecretPromiseGetter( "notifications/privateVapidKey" ); - const items: Item[] = event.Records.reduce( - (acc, record) => - record.dynamodb?.NewImage - ? [ - ...acc, - AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage) as Item, - ] - : acc, - [] as Item[] - ); - const processPageOfUsers = async (startKey?: Key) => { const userResults = await dynamo .scan({ diff --git a/shared/environmentVariables.ts b/shared/environmentVariables.ts index b10288935..cb291afae 100644 --- a/shared/environmentVariables.ts +++ b/shared/environmentVariables.ts @@ -3,6 +3,7 @@ export const ENVIRONMENT_VARIABLE_KEYS = { workflowDnsName: "WORKFLOW_DATASTORE_LOAD_BALANCER_DNS_NAME", graphqlEndpoint: "GRAPHQL_ENDPOINT", sentryDSN: "SENTRY_DSN", + notificationQueueURL: "NOTIFICATION_QUEUE_URL", }; export const getEnvironmentVariableOrThrow = ( diff --git a/shared/graphql/extraTypes.ts b/shared/graphql/extraTypes.ts index 01d87aef6..e77571d6d 100644 --- a/shared/graphql/extraTypes.ts +++ b/shared/graphql/extraTypes.ts @@ -1,4 +1,4 @@ -import type { WorkflowStub } from "./graphql"; +import type { Item, WorkflowStub } from "./graphql"; export type PinboardData = WorkflowStub; @@ -14,3 +14,6 @@ export const isPinboardData = ( !!maybePinboardData && maybePinboardData !== "loading" && maybePinboardData !== "notTrackedInWorkflow"; + +export const isItem = (maybeItem: unknown): maybeItem is Item => + !!maybeItem && typeof maybeItem === "object" && "pinboardId" in maybeItem;