From bf833baf712f3fa94e4147ce2dd39ec8c15c045b Mon Sep 17 00:00:00 2001 From: Shaun Guo Date: Thu, 6 Jun 2024 21:49:00 +1000 Subject: [PATCH 1/9] Create new pattern --- sqs-fifo-delayed-queue-dynamodb/.gitignore | 13 +++ sqs-fifo-delayed-queue-dynamodb/README.md | 58 ++++++++++ sqs-fifo-delayed-queue-dynamodb/app.py | 26 +++++ sqs-fifo-delayed-queue-dynamodb/cdk.json | 51 +++++++++ .../delay_fifo_queue_test/__init__.py | 0 .../delay_fifo_queue_test_stack.py | 108 ++++++++++++++++++ .../example-pattern.json | 44 +++++++ .../lambda/process_message.py | 70 ++++++++++++ .../requirements-dev.txt | 1 + .../requirements.txt | 3 + .../send-messages-batch-1.json | 26 +++++ .../send-messages-batch-2.json | 14 +++ .../send_messages.sh | 8 ++ sqs-fifo-delayed-queue-dynamodb/source.bat | 13 +++ sqs-fifo-delayed-queue-dynamodb/src/app.js | 10 ++ .../tests/__init__.py | 0 .../tests/unit/__init__.py | 0 .../unit/test_delay_fifo_queue_test_stack.py | 15 +++ 18 files changed, 460 insertions(+) create mode 100644 sqs-fifo-delayed-queue-dynamodb/.gitignore create mode 100644 sqs-fifo-delayed-queue-dynamodb/README.md create mode 100644 sqs-fifo-delayed-queue-dynamodb/app.py create mode 100644 sqs-fifo-delayed-queue-dynamodb/cdk.json create mode 100644 sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/__init__.py create mode 100644 sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py create mode 100644 sqs-fifo-delayed-queue-dynamodb/example-pattern.json create mode 100644 sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py create mode 100644 sqs-fifo-delayed-queue-dynamodb/requirements-dev.txt create mode 100644 sqs-fifo-delayed-queue-dynamodb/requirements.txt create mode 100644 sqs-fifo-delayed-queue-dynamodb/send-messages-batch-1.json create mode 100644 sqs-fifo-delayed-queue-dynamodb/send-messages-batch-2.json create mode 100644 sqs-fifo-delayed-queue-dynamodb/send_messages.sh create mode 100644 sqs-fifo-delayed-queue-dynamodb/source.bat create mode 100644 sqs-fifo-delayed-queue-dynamodb/src/app.js create mode 100644 sqs-fifo-delayed-queue-dynamodb/tests/__init__.py create mode 100644 sqs-fifo-delayed-queue-dynamodb/tests/unit/__init__.py create mode 100644 sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py diff --git a/sqs-fifo-delayed-queue-dynamodb/.gitignore b/sqs-fifo-delayed-queue-dynamodb/.gitignore new file mode 100644 index 000000000..b831b312b --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/.gitignore @@ -0,0 +1,13 @@ +*.swp +package-lock.json +__pycache__ +.pytest_cache +.venv +*.egg-info +.env + +# CDK asset staging directory +.cdk.staging +cdk.out + +delay_fifo_queue_test/test_stack.py diff --git a/sqs-fifo-delayed-queue-dynamodb/README.md b/sqs-fifo-delayed-queue-dynamodb/README.md new file mode 100644 index 000000000..3d67ea8b3 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/README.md @@ -0,0 +1,58 @@ +# Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB + +This pattern allows introduction of a delay between processing of messages while maintaining order from individual client. Then sending it sequentially to the downstream for processing, to minimize the consequences of unordered events. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/sqs-fifo-delayed-queue-dynamodb + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd sqs-fifo-delayed-queue-dynamodb + ``` +1. From the command line, use AWS CDK to deploy the AWS resources for the pattern as specified in the `delay_fifo_queue_test/delay_fifo_queue_test_stack.py` file. + ``` + python3 -m pip install -r requirements.txt + cdk synth + cdk deploy + ``` + +## How it works + +This pattern deploys an Amazon SQS FIFO queue called `primary_queue`, a AWS Lambda function `process_queue_function`, a DynamoDB table `customer_table` and a second SQS FIFO queue `downstream_queue`. + +When a messages from `primary_queue` is processed by the `process_queue_function`, it is checked against `customer_table` to see if another message from the same message sender has been processed with in a specified time frame. +If true, the message is not processed and with be retried after the visibility timeout on the `primary_queue`. +If false, the message is sent to the `downstream_queue` for processing. An entry is made to `customer_table` with a TTL. + + +## Testing + +Edit lines 3 and 7 of `send_messages.sh` with the URL of your `primary_queue`. Run this script to send test messages to the queue. +You shold observe messages with `test1`, `test2-first-message`, `test3` and `test4` in the `downstream_queue`. +After around 60 second, there should be another messages in the `downstream_queue` with `test2-delayed-message-1` as MessageBody. +After another 60 seconds, there should be another messages in the `downstream_queue` with `test2-delayed-message-2` as MessageBody. + +## Cleanup + +1. Delete the stack + ``` + cdk destroy + ``` +---- +Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/app.py b/sqs-fifo-delayed-queue-dynamodb/app.py new file mode 100644 index 000000000..ff4fb957a --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/app.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import aws_cdk as cdk + +from delay_fifo_queue_test.delay_fifo_queue_test_stack import DelayFifoQueueTestStack + + +app = cdk.App() +DelayFifoQueueTestStack(app, "DelayFifoQueueTestStack", + # If you don't specify 'env', this stack will be environment-agnostic. + # Account/Region-dependent features and context lookups will not work, + # but a single synthesized template can be deployed anywhere. + + # Uncomment the next line to specialize this stack for the AWS Account + # and Region that are implied by the current CLI configuration. + + #env=cdk.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')), + + # Uncomment the next line if you know exactly what Account and Region you + # want to deploy the stack to. */ + + #env=cdk.Environment(account='123456789012', region='us-east-1'), + + # For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html + ) + +app.synth() diff --git a/sqs-fifo-delayed-queue-dynamodb/cdk.json b/sqs-fifo-delayed-queue-dynamodb/cdk.json new file mode 100644 index 000000000..fd7abdbf9 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/cdk.json @@ -0,0 +1,51 @@ +{ + "app": "python3 app.py", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "requirements*.txt", + "source.bat", + "**/__init__.py", + "python/__pycache__", + "tests" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-iam:standardizedServicePrincipals": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true + } +} diff --git a/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/__init__.py b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py new file mode 100644 index 000000000..fed698d3c --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py @@ -0,0 +1,108 @@ +from aws_cdk import ( + Duration, + Stack, + aws_sqs as sqs, + aws_lambda as lambda_, + aws_iam as iam, + aws_cloudwatch as cloudwatch, + aws_cloudwatch_actions as cloudwatch_actions, + aws_dynamodb as dynamodb, + aws_sns as sns, + aws_sns_subscriptions as subscriptions +) +from constructs import Construct + +class DelayFifoQueueTestStack(Stack): + + def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: + super().__init__(scope, construct_id, **kwargs) + + #create a dead letter queue called primary_queue_dlq + primary_queue_dlq = sqs.Queue(self, "DelayFifoQueueDlq", + visibility_timeout=Duration.seconds(60), + fifo=True, + content_based_deduplication=True + ) + + # create an initial primary SQS FIFO queue with a visibility timeout of 60 seconds + primary_queue = sqs.Queue(self, "DelayFifoQueue", + visibility_timeout=Duration.seconds(60), + fifo=True, + content_based_deduplication=True, + dead_letter_queue=sqs.DeadLetterQueue( + max_receive_count=5, + queue=primary_queue_dlq + ) + ) + + # create a downstream SQS FIFO queue with a visibility timeout of 60 seconds + downstream_queue = sqs.Queue(self, "DelayFifoQueueDownstream", + visibility_timeout=Duration.seconds(60), + fifo=True, + content_based_deduplication=True + ) + + #create a dynamodb table to store customer id and created timestamp + customer_table = dynamodb.Table(self, "CustomerTable", + table_name="DelayFifoQueueCustomerTable", + partition_key=dynamodb.Attribute(name="customer_id", type=dynamodb.AttributeType.STRING), + time_to_live_attribute="ttl" + ) + + # create a lambda function to process messages from the queue + process_queue_function = lambda_.Function(self, "ProcessMessageLambda", + runtime=lambda_.Runtime.PYTHON_3_9, + code=lambda_.Code.from_asset("lambda"), + handler="process_message.handler", + environment={ + "QUEUE_URL": downstream_queue.queue_url, + "TABLE_NAME": customer_table.table_name + }) + + # #create an SNS topic to send notifications when primary_queue_dlq is not empty + dlq_size_sns_topic = sns.Topic(self, "PrimaryQueueDqlSizeAlertTopic") + dlq_size_sns_topic.add_subscription(subscriptions.EmailSubscription("notification_address@email.com")) + + # #create a CloudWatch alarm if primary_queue_dlq is not empty + dlq_size_alarm = cloudwatch.Alarm(self, "PrimaryQueueDqlSizeAlert", + metric=cloudwatch.Metric(metric_name="ApproximateNumberOfMessagesVisible", + namespace="AWS/SQS", + dimensions_map={ + "QueueName": primary_queue_dlq.queue_name + }, + statistic="Sum", + period=Duration.seconds(60) + ), + evaluation_periods=1, + threshold=0, + comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD, + treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING + ) + dlq_size_alarm.add_alarm_action( + cloudwatch_actions.SnsAction( + topic = dlq_size_sns_topic + ) + ) + + + #create lambda execution role that has access to receive messages from primary_queue queue + process_queue_function.add_to_role_policy(iam.PolicyStatement( + actions=["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl"], + resources=[primary_queue.queue_arn] + )) + + #add to lambda execution role policy to send messages to the downstream_queue queue + process_queue_function.add_to_role_policy(iam.PolicyStatement( + actions=["sqs:SendMessage"], + resources=[downstream_queue.queue_arn] + )) + + lambda_.EventSourceMapping(self, "ProcessMessageLambdaEventSourceMapping", + event_source_arn=primary_queue.queue_arn, + target=process_queue_function, + batch_size=10, + report_batch_item_failures=True + ) + + # give permissions for the lambda function to read and write to the dynamodb table + customer_table.grant_read_write_data(process_queue_function) \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/example-pattern.json b/sqs-fifo-delayed-queue-dynamodb/example-pattern.json new file mode 100644 index 000000000..519b17f2d --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/example-pattern.json @@ -0,0 +1,44 @@ +{ + "title": "Amazon SQS FIFO Queue with controlled delay", + "description": "Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB", + "language": "Python", + "level": "200", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern allows introduction of a delay between processing of messages while maintaining order from individual client. Then sending it sequentially to the downstream for processing, to minimize the consequences of unordered events." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sqs-fifo-delayed-queue-dynamodb", + "templateURL": "serverless-patterns/sqs-fifo-delayed-queue-dynamodb", + "projectFolder": "sqs-fifo-delayed-queue-dynamodb", + "templateFile": "delay_fifo_queue_test/delay_fifo_queue_test_stack.py" + } + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy." + ] + }, + "authors": [ + { + "name": "Shaun Guo", + "image": "https://media.licdn.com/dms/image/C5103AQG3KMyMdEIKpA/profile-displayphoto-shrink_800_800/0/1517283953925?e=1692835200&v=beta&t=AxJ9ST_8K_bw8nqTPDaJB2F5dnQspES9FuJ64DBScC8", + "bio": "Shaun is a Senior Technical Account Manager at Amazon Web Services based in Australia", + "linkedin": "https://www.linkedin.com/in/shaun-guo/" + } + ] +} diff --git a/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py b/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py new file mode 100644 index 000000000..da8c8136f --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py @@ -0,0 +1,70 @@ +# create python lambda function +# import json +import boto3 +import os +import time + +dynamodb = boto3.resource('dynamodb') +sqs = boto3.client('sqs') +table = dynamodb.Table(os.environ['TABLE_NAME']) + +# Message delay and TTL for dynamodb item is 60 seconds +message_delay_seconds = 60 + +def handler(event, context): + batch_item_failures = [] + sqs_batch_response = {} + + # iterate sqs messages based on message group id + for record in event['Records']: + # get message group id + message_group_id = record['attributes']['MessageGroupId'] + + # get message body + message_body = record['body'] + + # query records with customer_id as message_group_id + response = table.get_item( + Key={ + 'customer_id': message_group_id + }) + + # if response does not return a record + if ('Item' in response): + # get the item + response_item = response['Item'] + + # get the exprying time of the item from response_item + item_ttl_epoch_seconds = response_item['ttl'] + + # if TTL has expired, send the message body to downstream sqs queue and update the dynamodb table with a new TTL, else place the item back on the delay queue + if (item_ttl_epoch_seconds - int(time.time()) < 0): + process_message(message_body, message_group_id) + else: + message_id = record['messageId'] + # print(f'Message with id "{message_id}" has not expired yet, adding to batch item failures') + batch_item_failures.append({"itemIdentifier": record['messageId']}) + else: + # if no records found, send message to downstream sqs queue and update the dynamodb table with a ttl + process_message(message_body, message_group_id) + + sqs_batch_response["batchItemFailures"] = batch_item_failures + + return sqs_batch_response + +def process_message(message_body, message_group_id): + # send message to downstream sqs queue + expiry_epoch_time = int(time.time()) + message_delay_seconds + sqs.send_message( + QueueUrl=os.environ['QUEUE_URL'], + MessageBody=message_body, + MessageGroupId=message_group_id + ) + + # Update Dynamodb table called CustomerTable with customer_id as partition key and created_timestamp as sort key with the ISO 8601 timestamp + table.put_item( + Item={ + 'customer_id': message_group_id, + 'ttl': expiry_epoch_time + } + ) \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/requirements-dev.txt b/sqs-fifo-delayed-queue-dynamodb/requirements-dev.txt new file mode 100644 index 000000000..927094516 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/requirements-dev.txt @@ -0,0 +1 @@ +pytest==6.2.5 diff --git a/sqs-fifo-delayed-queue-dynamodb/requirements.txt b/sqs-fifo-delayed-queue-dynamodb/requirements.txt new file mode 100644 index 000000000..5228a464d --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/requirements.txt @@ -0,0 +1,3 @@ +aws-cdk-lib==2.79.1 +constructs>=10.0.0,<11.0.0 +boto3 \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/send-messages-batch-1.json b/sqs-fifo-delayed-queue-dynamodb/send-messages-batch-1.json new file mode 100644 index 000000000..96c5a09ae --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/send-messages-batch-1.json @@ -0,0 +1,26 @@ +[ + { + "Id": "55a2e759-1eff-40f2-9516-c7ce294bf802", + "MessageGroupId": "test1", + "MessageBody": "test1", + "MessageDeduplicationId": "b3c72a6b-67ec-447d-8aa3-22c7a180f330" + }, + { + "Id": "7e49806e-1c6b-45e6-b324-ff83f377eb96", + "MessageGroupId": "test2", + "MessageBody": "test2-first-message", + "MessageDeduplicationId": "cbcdeee9-0a97-40d8-ad4a-89ea7ccc057e" + }, + { + "Id": "bab33279-04d8-4c98-9288-385c7a461c36", + "MessageGroupId": "test2", + "MessageBody": "test2-delayed-message-1", + "MessageDeduplicationId": "e2b3bc2f-471c-4541-8cc4-35df50efb871" + }, + { + "Id": "4e105af8-d149-4591-aa4d-c7252fd60fc7", + "MessageGroupId": "test3", + "MessageBody": "test3", + "MessageDeduplicationId": "208cf23b-ea37-4df3-8ba8-eb8d404bf90d" + } +] \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/send-messages-batch-2.json b/sqs-fifo-delayed-queue-dynamodb/send-messages-batch-2.json new file mode 100644 index 000000000..5f406bb50 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/send-messages-batch-2.json @@ -0,0 +1,14 @@ +[ + { + "Id": "ad7949b1-1c2a-4bf3-8a9b-7bdfc0d09bc6", + "MessageGroupId": "test2", + "MessageBody": "test2-delayed-message-2", + "MessageDeduplicationId": "c7b94a2c-06dd-4dd9-85b9-fbad5a256640" + }, + { + "Id": "5f536d8f-2d50-4705-bdd2-7620da144671", + "MessageGroupId": "test4", + "MessageBody": "test4", + "MessageDeduplicationId": "60e2f3ba-f3f6-43be-ab54-881cd7248956" + } +] \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/send_messages.sh b/sqs-fifo-delayed-queue-dynamodb/send_messages.sh new file mode 100644 index 000000000..7c5bedc13 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/send_messages.sh @@ -0,0 +1,8 @@ +#!/bin/bash +aws sqs send-message-batch \ + --queue-url \ + --entries file://send-messages-batch-1.json + +aws sqs send-message-batch \ + --queue-url \ + --entries file://send-messages-batch-2.json \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/source.bat b/sqs-fifo-delayed-queue-dynamodb/source.bat new file mode 100644 index 000000000..9e1a83442 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/source.bat @@ -0,0 +1,13 @@ +@echo off + +rem The sole purpose of this script is to make the command +rem +rem source .venv/bin/activate +rem +rem (which activates a Python virtualenv on Linux or Mac OS X) work on Windows. +rem On Windows, this command just runs this batch file (the argument is ignored). +rem +rem Now we don't need to document a Windows command for activating a virtualenv. + +echo Executing .venv\Scripts\activate.bat for you +.venv\Scripts\activate.bat diff --git a/sqs-fifo-delayed-queue-dynamodb/src/app.js b/sqs-fifo-delayed-queue-dynamodb/src/app.js new file mode 100644 index 000000000..cb3c4d9c1 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/src/app.js @@ -0,0 +1,10 @@ +/*! Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + */ + +'use strict' + +exports.handler = async (event) => { + // Lambda handler code + console.log(JSON.stringify(event, 0, null)) +} \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/tests/__init__.py b/sqs-fifo-delayed-queue-dynamodb/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sqs-fifo-delayed-queue-dynamodb/tests/unit/__init__.py b/sqs-fifo-delayed-queue-dynamodb/tests/unit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py b/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py new file mode 100644 index 000000000..1b4764894 --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py @@ -0,0 +1,15 @@ +import aws_cdk as core +import aws_cdk.assertions as assertions + +from delay_fifo_queue_test.delay_fifo_queue_test_stack import DelayFifoQueueTestStack + +# example tests. To run these tests, uncomment this file along with the example +# resource in delay_fifo_queue_test/delay_fifo_queue_test_stack.py +def test_sqs_queue_created(): + app = core.App() + stack = DelayFifoQueueTestStack(app, "delay-fifo-queue-test") + template = assertions.Template.from_stack(stack) + +# template.has_resource_properties("AWS::SQS::Queue", { +# "VisibilityTimeout": 300 +# }) From 227ec110d2b9f22c12cb85566cae8cbf2a395e5c Mon Sep 17 00:00:00 2001 From: Udit Parikh Date: Tue, 2 Jul 2024 23:08:02 +0530 Subject: [PATCH 2/9] Added sqs-fifo-delayed-queue-dynamodb.json --- .../sqs-fifo-delayed-queue-dynamodb.json | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json diff --git a/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json b/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json new file mode 100644 index 000000000..7c1f5603c --- /dev/null +++ b/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json @@ -0,0 +1,72 @@ +{ + "title": "Amazon SQS FIFO Queue with controlled delay", + "description": "Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB", + "language": "Python", + "level": "200", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern allows introduction of a delay between processing of messages while maintaining order from individual client. Then sending it sequentially to the downstream for processing, to minimize the consequences of unordered events." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sqs-fifo-delayed-queue-dynamodb", + "templateURL": "serverless-patterns/sqs-fifo-delayed-queue-dynamodb", + "projectFolder": "sqs-fifo-delayed-queue-dynamodb", + "templateFile": "delay_fifo_queue_test/delay_fifo_queue_test_stack.py" + } + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy." + ] + }, + "authors": [ + { + "name": "Shaun Guo", + "image": "https://media.licdn.com/dms/image/C5103AQG3KMyMdEIKpA/profile-displayphoto-shrink_800_800/0/1517283953925?e=1692835200&v=beta&t=AxJ9ST_8K_bw8nqTPDaJB2F5dnQspES9FuJ64DBScC8", + "bio": "Shaun is a Senior Technical Account Manager at Amazon Web Services based in Australia", + "linkedin": "shaun-guo" + } + ], + "patternArch": { + "icon1": { + "x": 20, + "y": 50, + "service": "sqs", + "label": "Amazon SQS" + }, + "icon2": { + "x": 50, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "icon3": { + "x": 80, + "y": 50, + "service": "sqs", + "label": "Amazon SQS" + }, + "line1": { + "from": "icon1", + "to": "icon2" + }, + "line2": { + "from": "icon2", + "to": "icon3" + } + } +} From 889951da74f1efa32a9d0a5fa9c676b9f16dfbfb Mon Sep 17 00:00:00 2001 From: Shaun Guo Date: Wed, 3 Jul 2024 21:33:45 +1000 Subject: [PATCH 3/9] Fix comments, added exception handling and clarified testing instructions --- sqs-fifo-delayed-queue-dynamodb/README.md | 10 ++-- .../delay_fifo_queue_test_stack.py | 19 ++++--- .../example-pattern.json | 2 +- .../lambda/process_message.py | 49 ++++++++++++------- .../unit/test_delay_fifo_queue_test_stack.py | 4 -- 5 files changed, 48 insertions(+), 36 deletions(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/README.md b/sqs-fifo-delayed-queue-dynamodb/README.md index 3d67ea8b3..7dd4e7329 100644 --- a/sqs-fifo-delayed-queue-dynamodb/README.md +++ b/sqs-fifo-delayed-queue-dynamodb/README.md @@ -41,10 +41,12 @@ If false, the message is sent to the `downstream_queue` for processing. An entry ## Testing -Edit lines 3 and 7 of `send_messages.sh` with the URL of your `primary_queue`. Run this script to send test messages to the queue. -You shold observe messages with `test1`, `test2-first-message`, `test3` and `test4` in the `downstream_queue`. -After around 60 second, there should be another messages in the `downstream_queue` with `test2-delayed-message-1` as MessageBody. -After another 60 seconds, there should be another messages in the `downstream_queue` with `test2-delayed-message-2` as MessageBody. +1. Edit lines 3 and 7 of `send_messages.sh` with the `DelayFifoQueue` URL from the output of the `cdk deploy`. Run this script to send test messages to the queue. +2. Head to AWS console and go to SQS service. Click on Queues, and select the queue containing the text `DelayFifoQueueDownstream`. +3. Click on `Send and receive messages` then `Poll for messages` to see current messages in the queue. +4. You shold observe messages with `test1`, `test2-first-message`, `test3` and `test4` in the `downstream_queue`. +5. After around 60 seconds poll again, there should be another messages in the `downstream_queue` with `test2-delayed-message-1` as MessageBody. +6. After another 60 seconds poll again, there should be another messages in the `downstream_queue` with `test2-delayed-message-2` as MessageBody. ## Cleanup diff --git a/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py index fed698d3c..6ebbd2edb 100644 --- a/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py +++ b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py @@ -8,7 +8,8 @@ aws_cloudwatch_actions as cloudwatch_actions, aws_dynamodb as dynamodb, aws_sns as sns, - aws_sns_subscriptions as subscriptions + aws_sns_subscriptions as subscriptions, + CfnOutput as cfnoutput ) from constructs import Construct @@ -17,7 +18,7 @@ class DelayFifoQueueTestStack(Stack): def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) - #create a dead letter queue called primary_queue_dlq + # create a dead letter queue called primary_queue_dlq primary_queue_dlq = sqs.Queue(self, "DelayFifoQueueDlq", visibility_timeout=Duration.seconds(60), fifo=True, @@ -42,7 +43,7 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: content_based_deduplication=True ) - #create a dynamodb table to store customer id and created timestamp + # create a dynamodb table to store customer id and created timestamp customer_table = dynamodb.Table(self, "CustomerTable", table_name="DelayFifoQueueCustomerTable", partition_key=dynamodb.Attribute(name="customer_id", type=dynamodb.AttributeType.STRING), @@ -59,11 +60,11 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: "TABLE_NAME": customer_table.table_name }) - # #create an SNS topic to send notifications when primary_queue_dlq is not empty + # create an SNS topic to send notifications when primary_queue_dlq is not empty dlq_size_sns_topic = sns.Topic(self, "PrimaryQueueDqlSizeAlertTopic") dlq_size_sns_topic.add_subscription(subscriptions.EmailSubscription("notification_address@email.com")) - # #create a CloudWatch alarm if primary_queue_dlq is not empty + # create a CloudWatch alarm if primary_queue_dlq is not empty dlq_size_alarm = cloudwatch.Alarm(self, "PrimaryQueueDqlSizeAlert", metric=cloudwatch.Metric(metric_name="ApproximateNumberOfMessagesVisible", namespace="AWS/SQS", @@ -85,13 +86,13 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: ) - #create lambda execution role that has access to receive messages from primary_queue queue + # create lambda execution role that has access to receive messages from primary_queue queue process_queue_function.add_to_role_policy(iam.PolicyStatement( actions=["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl"], resources=[primary_queue.queue_arn] )) - #add to lambda execution role policy to send messages to the downstream_queue queue + # add to lambda execution role policy to send messages to the downstream_queue queue process_queue_function.add_to_role_policy(iam.PolicyStatement( actions=["sqs:SendMessage"], resources=[downstream_queue.queue_arn] @@ -105,4 +106,6 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: ) # give permissions for the lambda function to read and write to the dynamodb table - customer_table.grant_read_write_data(process_queue_function) \ No newline at end of file + customer_table.grant_read_write_data(process_queue_function) + + cfnoutput(self, "DelayFifoQueueURL", value=primary_queue.queue_url) \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/example-pattern.json b/sqs-fifo-delayed-queue-dynamodb/example-pattern.json index 519b17f2d..63d4c6c4a 100644 --- a/sqs-fifo-delayed-queue-dynamodb/example-pattern.json +++ b/sqs-fifo-delayed-queue-dynamodb/example-pattern.json @@ -38,7 +38,7 @@ "name": "Shaun Guo", "image": "https://media.licdn.com/dms/image/C5103AQG3KMyMdEIKpA/profile-displayphoto-shrink_800_800/0/1517283953925?e=1692835200&v=beta&t=AxJ9ST_8K_bw8nqTPDaJB2F5dnQspES9FuJ64DBScC8", "bio": "Shaun is a Senior Technical Account Manager at Amazon Web Services based in Australia", - "linkedin": "https://www.linkedin.com/in/shaun-guo/" + "linkedin": "shaun-guo" } ] } diff --git a/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py b/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py index da8c8136f..056418c49 100644 --- a/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py +++ b/sqs-fifo-delayed-queue-dynamodb/lambda/process_message.py @@ -1,5 +1,4 @@ # create python lambda function -# import json import boto3 import os import time @@ -24,10 +23,15 @@ def handler(event, context): message_body = record['body'] # query records with customer_id as message_group_id - response = table.get_item( - Key={ - 'customer_id': message_group_id - }) + try: + response = table.get_item( + Key={ + 'customer_id': message_group_id + }) + except: + print("An error has occurred while fetching record from CustomerTable table.") + batch_item_failures.append({"itemIdentifier": record['messageId']}) + continue # if response does not return a record if ('Item' in response): @@ -41,12 +45,13 @@ def handler(event, context): if (item_ttl_epoch_seconds - int(time.time()) < 0): process_message(message_body, message_group_id) else: - message_id = record['messageId'] - # print(f'Message with id "{message_id}" has not expired yet, adding to batch item failures') batch_item_failures.append({"itemIdentifier": record['messageId']}) else: # if no records found, send message to downstream sqs queue and update the dynamodb table with a ttl - process_message(message_body, message_group_id) + try: + process_message(message_body, message_group_id) + except: + batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures @@ -55,16 +60,22 @@ def handler(event, context): def process_message(message_body, message_group_id): # send message to downstream sqs queue expiry_epoch_time = int(time.time()) + message_delay_seconds - sqs.send_message( - QueueUrl=os.environ['QUEUE_URL'], - MessageBody=message_body, - MessageGroupId=message_group_id - ) + try: + sqs.send_message( + QueueUrl=os.environ['QUEUE_URL'], + MessageBody=message_body, + MessageGroupId=message_group_id + ) + except: + raise Exception("An error has occurred sending message to downstream queue.") # Update Dynamodb table called CustomerTable with customer_id as partition key and created_timestamp as sort key with the ISO 8601 timestamp - table.put_item( - Item={ - 'customer_id': message_group_id, - 'ttl': expiry_epoch_time - } - ) \ No newline at end of file + try: + table.put_item( + Item={ + 'customer_id': message_group_id, + 'ttl': expiry_epoch_time + } + ) + except: + raise Exception("An error has occurred inserting record to CustomerTable table.") \ No newline at end of file diff --git a/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py b/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py index 1b4764894..1b095009c 100644 --- a/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py +++ b/sqs-fifo-delayed-queue-dynamodb/tests/unit/test_delay_fifo_queue_test_stack.py @@ -9,7 +9,3 @@ def test_sqs_queue_created(): app = core.App() stack = DelayFifoQueueTestStack(app, "delay-fifo-queue-test") template = assertions.Template.from_stack(stack) - -# template.has_resource_properties("AWS::SQS::Queue", { -# "VisibilityTimeout": 300 -# }) From e009f1d9003eeaf7397a5583fd7f9119a605b1af Mon Sep 17 00:00:00 2001 From: Udit Parikh Date: Sat, 6 Jul 2024 02:10:54 +0530 Subject: [PATCH 4/9] Updated README.md --- sqs-fifo-delayed-queue-dynamodb/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/README.md b/sqs-fifo-delayed-queue-dynamodb/README.md index 7dd4e7329..ab876a31d 100644 --- a/sqs-fifo-delayed-queue-dynamodb/README.md +++ b/sqs-fifo-delayed-queue-dynamodb/README.md @@ -11,7 +11,7 @@ Important: this application uses various AWS services and there are costs associ * [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. * [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured * [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) -* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) (AWS CDK) installed ## Deployment Instructions @@ -57,4 +57,4 @@ If false, the message is sent to the `downstream_queue` for processing. An entry ---- Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. -SPDX-License-Identifier: MIT-0 \ No newline at end of file +SPDX-License-Identifier: MIT-0 From 0a4a4d2af3e9e3963b68e710bfdb1f796c703de0 Mon Sep 17 00:00:00 2001 From: Udit Parikh Date: Sat, 6 Jul 2024 02:18:17 +0530 Subject: [PATCH 5/9] Updated sqs-fifo-delayed-queue-dynamodb.json --- .../sqs-fifo-delayed-queue-dynamodb.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json b/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json index 7c1f5603c..d535e9775 100644 --- a/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json +++ b/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json @@ -1,6 +1,6 @@ { "title": "Amazon SQS FIFO Queue with controlled delay", - "description": "Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB", + "description": "Amazon SQS FIFO Queue with delay using AWS Lambda and Amazon DynamoDB", "language": "Python", "level": "200", "framework": "CDK", From b2f763df5d0bcfec9102678b136da2212e2f7371 Mon Sep 17 00:00:00 2001 From: Julian Wood Date: Tue, 9 Jul 2024 11:41:02 +0100 Subject: [PATCH 6/9] Update README.md --- sqs-fifo-delayed-queue-dynamodb/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/README.md b/sqs-fifo-delayed-queue-dynamodb/README.md index ab876a31d..79206ba7a 100644 --- a/sqs-fifo-delayed-queue-dynamodb/README.md +++ b/sqs-fifo-delayed-queue-dynamodb/README.md @@ -1,6 +1,6 @@ -# Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB +# Amazon SQS FIFO queue with delay using AWS Lambda and Amazon DynamoDB -This pattern allows introduction of a delay between processing of messages while maintaining order from individual client. Then sending it sequentially to the downstream for processing, to minimize the consequences of unordered events. +This pattern shows how to introduce a delay between processing messages while maintaining order from an individual client. The message is sent sequentially to the downstream service for processing to minimize the consequences of unordered events. Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/sqs-fifo-delayed-queue-dynamodb From 5014e8f1c10fee1e3217756d2fe612e4f8f4e745 Mon Sep 17 00:00:00 2001 From: Julian Wood Date: Tue, 9 Jul 2024 11:45:38 +0100 Subject: [PATCH 7/9] Update delay_fifo_queue_test_stack.py --- .../delay_fifo_queue_test_stack.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py index 6ebbd2edb..333a00206 100644 --- a/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py +++ b/sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/delay_fifo_queue_test_stack.py @@ -50,7 +50,7 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: time_to_live_attribute="ttl" ) - # create a lambda function to process messages from the queue + # create a Lambda function to process messages from the queue process_queue_function = lambda_.Function(self, "ProcessMessageLambda", runtime=lambda_.Runtime.PYTHON_3_9, code=lambda_.Code.from_asset("lambda"), @@ -86,13 +86,13 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: ) - # create lambda execution role that has access to receive messages from primary_queue queue + # create Lambda execution role that has access to receive messages from primary_queue queue process_queue_function.add_to_role_policy(iam.PolicyStatement( actions=["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl"], resources=[primary_queue.queue_arn] )) - # add to lambda execution role policy to send messages to the downstream_queue queue + # add to Lambda execution role policy to send messages to the downstream_queue queue process_queue_function.add_to_role_policy(iam.PolicyStatement( actions=["sqs:SendMessage"], resources=[downstream_queue.queue_arn] @@ -105,7 +105,7 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: report_batch_item_failures=True ) - # give permissions for the lambda function to read and write to the dynamodb table + # give permissions for the function to read and write to the dynamodb table customer_table.grant_read_write_data(process_queue_function) - cfnoutput(self, "DelayFifoQueueURL", value=primary_queue.queue_url) \ No newline at end of file + cfnoutput(self, "DelayFifoQueueURL", value=primary_queue.queue_url) From 4581db4aa813d547fb48f02f3f8ded83294ef78d Mon Sep 17 00:00:00 2001 From: Julian Wood Date: Tue, 9 Jul 2024 11:45:46 +0100 Subject: [PATCH 8/9] Update example-pattern.json --- sqs-fifo-delayed-queue-dynamodb/example-pattern.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/example-pattern.json b/sqs-fifo-delayed-queue-dynamodb/example-pattern.json index 63d4c6c4a..26d0f61d3 100644 --- a/sqs-fifo-delayed-queue-dynamodb/example-pattern.json +++ b/sqs-fifo-delayed-queue-dynamodb/example-pattern.json @@ -1,13 +1,13 @@ { - "title": "Amazon SQS FIFO Queue with controlled delay", - "description": "Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB", + "title": "Amazon SQS FIFO queue with controlled delay", + "description": "Amazon SQS FIFO queue with delay using AWS Lambda and Amazon DynamoDB", "language": "Python", "level": "200", "framework": "CDK", "introBox": { "headline": "How it works", "text": [ - "This pattern allows introduction of a delay between processing of messages while maintaining order from individual client. Then sending it sequentially to the downstream for processing, to minimize the consequences of unordered events." + "This pattern shows how to introduce a delay between processing messages while maintaining order from an individual client. The message is sent sequentially to the downstream service for processing to minimize the consequences of unordered events." ] }, "gitHub": { From 2cfeafbc6d7c38f7895c89f4ea98f386c1608a57 Mon Sep 17 00:00:00 2001 From: Julian Wood Date: Tue, 9 Jul 2024 11:45:49 +0100 Subject: [PATCH 9/9] Update sqs-fifo-delayed-queue-dynamodb.json --- .../sqs-fifo-delayed-queue-dynamodb.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json b/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json index d535e9775..cc8c6a605 100644 --- a/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json +++ b/sqs-fifo-delayed-queue-dynamodb/sqs-fifo-delayed-queue-dynamodb.json @@ -1,13 +1,13 @@ { - "title": "Amazon SQS FIFO Queue with controlled delay", - "description": "Amazon SQS FIFO Queue with delay using AWS Lambda and Amazon DynamoDB", + "title": "Amazon SQS FIFO queue with controlled delay", + "description": "Amazon SQS FIFO queue with delay using AWS Lambda and Amazon DynamoDB", "language": "Python", "level": "200", "framework": "CDK", "introBox": { "headline": "How it works", "text": [ - "This pattern allows introduction of a delay between processing of messages while maintaining order from individual client. Then sending it sequentially to the downstream for processing, to minimize the consequences of unordered events." + "This pattern shows how to introduce a delay between processing messages while maintaining order from an individual client. The message is sent sequentially to the downstream service for processing to minimize the consequences of unordered events." ] }, "gitHub": {