Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event-source): add SQS as Lambda event source #451

Merged
merged 11 commits into from
Jun 1, 2018
Merged
1 change: 1 addition & 0 deletions docs/cloudformation_compatibility.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Kinesis
Property Name Intrinsic(s) Supported Reasons
======================== ================================== ========================
Stream All
Queue All
StartingPosition All
BatchSize All
======================== ================================== ========================
Expand Down
27 changes: 27 additions & 0 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,33 @@ AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

SQS
^^^^^^^

Example:

.. code:: yaml

MyFunction:
Type: AWS::Serverless::Function
Properties:
...
Events:
MyTrigger:
Type: SQS
Properties:
Queue: arn:aws:sqs:us-east-1:123456789012:my-queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call this property QueueArn instead? I wonder if users might get confused and specify a queue name here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with Queue to be consistent with Stream. Though I agree, QueueArn is more descriptive. If we want in the future we can add QueueArn (and StreamArn) leave Queue as an alias (and possibly deprecate?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Let's stick with Queue to keep the language consistent.

...

Additional generated resources:

================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

DynamoDb
^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions examples/2016-10-31/sqs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
transformed-cfn-template.yaml
19 changes: 19 additions & 0 deletions examples/2016-10-31/sqs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SQS Event Source Example

Example SAM template for processing messages on an SQS queue.

## Running the example

```bash
# Replace YOUR_S3_ARTIFACTS_BUCKET
YOUR_S3_ARTIFACTS_BUCKET='YOUR_S3_ARTIFACTS_BUCKET'; \
aws cloudformation package --template-file template.yaml --output-template-file cfn-transformed-template.yaml --s3-bucket $YOUR_S3_ARTIFACTS_BUCKET
aws cloudformation deploy --template-file ./cfn-transformed-template.yaml --stack-name example-logs-processor --capabilities CAPABILITY_IAM
```

After your CloudFormation Stack has completed creation, push a message to the SQS queue. To see it in action, modify and run the command below:

```bash
YOUR_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue; \
aws sqs send-message --queue-url $YOUR_SQS_QUEUE_URL --message-body '{ "myMessage": "Hello SAM!" }'
```
9 changes: 9 additions & 0 deletions examples/2016-10-31/sqs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
async function handler (event, context) {
// TODO: Handle message...

console.log(event)

return {}
}

module.exports.handler = handler
19 changes: 19 additions & 0 deletions examples/2016-10-31/sqs/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Example of processing messages on an SQS queue with Lambda
Resources:
MySQSQueueFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./index.js
Handler: index.handler
Runtime: nodejs8.10
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: !Ref MyQueue

MyQueue:
Type: AWS::SQS::Queue
Properties:
25 changes: 17 additions & 8 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
class PullEventSource(ResourceMacro):
"""Base class for pull event sources for SAM Functions.

The pull events are the streams--Kinesis and DynamoDB Streams. Both of these correspond to an EventSourceMapping in
Lambda, and require that the execution role be given to Kinesis or DynamoDB Streams, respectively.
The pull events are Kinesis Streams, DynamoDB Streams, and SQS Queues. All of these correspond to an EventSourceMapping in
Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB Streams, or SQS Streams, respectively.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we shouldn't be calling SQS as streams. SQS in itself should do. Thoughts ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch; that was a miss. I refer to it as SQS Queues in the first sentence.


:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
"""
resource_type = None
property_types = {
'Stream': PropertyType(True, is_str()),
'BatchSize': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(True, is_str())
'Stream': PropertyType(False, is_str()),
'Queue': PropertyType(False, is_str()),
'BatchSize': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(False, is_str())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose all properties are changed to not required (False) so as to support event source mapping for both Streams (DynamoDB and Kinesis) and SQS. But aren't we actually missing on validation ? I can create an Event Source mapping for DDB streams without the StartingPosition specified, it will pass this check but will end up in an error from CFN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We do need to validate them separately now that these properties are not required

}

def get_policy_arn(self):
Expand All @@ -32,23 +33,23 @@ def to_cloudformation(self, **kwargs):
:rtype: list
"""
function = kwargs.get('function')

if not function:
raise TypeError("Missing required keyword argument: function")

resources = []

lambda_eventsourcemapping = LambdaEventSourceMapping(self.logical_id)
resources.append(lambda_eventsourcemapping)

try:
# Name will not be available for Alias resources
function_name_or_arn = function.get_runtime_attr("name")
except NotImplementedError:
function_name_or_arn = function.get_runtime_attr("arn")

lambda_eventsourcemapping.FunctionName = function_name_or_arn
lambda_eventsourcemapping.EventSourceArn = self.Stream
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
lambda_eventsourcemapping.BatchSize = self.BatchSize

Expand Down Expand Up @@ -82,3 +83,11 @@ class DynamoDB(PullEventSource):

def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn('service-role/AWSLambdaDynamoDBExecutionRole')


class SQS(PullEventSource):
"""SQS Queue event source."""
resource_type = 'SQS'

def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn('service-role/AWSLambdaSQSExecutionRole')
2 changes: 1 addition & 1 deletion samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class LambdaEventSourceMapping(Resource):
'Enabled': PropertyType(False, is_type(bool)),
'EventSourceArn': PropertyType(True, is_str()),
'FunctionName': PropertyType(True, is_str()),
'StartingPosition': PropertyType(True, is_str())
'StartingPosition': PropertyType(False, is_str())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the previous comment. Aren't we letting go the validation of StartingPosition for streams because of SQS which doesn't require it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON Schema validation will pick this up once we turn it on (it's currently not doing anything with failed JSON Schemas as we're not yet ready). Would prefer to not add additional validation right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if someone forgot to set the Queue property? SAM Translator will crash instead of giving them a validation error. After turning on JSON Schema, we do need a deeper scrub of code. This extra validation won't increase the surface area of the scrub significantly IMO

}

runtime_attrs = {
Expand Down
18 changes: 18 additions & 0 deletions samtranslator/validator/sam_schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@
{
"$ref": "#/definitions/AWS::Serverless::Function.KinesisEvent"
},
{
"$ref": "#/definitions/AWS::Serverless::Function.SQSEvent"
},
{
"$ref": "#/definitions/AWS::Serverless::Function.DynamoDBEvent"
},
Expand Down Expand Up @@ -481,6 +484,21 @@
],
"type": "object"
},
"AWS::Serverless::Function.SQSEvent": {
"additionalProperties": false,
"properties": {
"BatchSize": {
"type": "number"
},
"Queue": {
"type": "string"
}
},
"required": [
"Queue"
],
"type": "object"
},
"AWS::Serverless::Function.S3Event": {
"additionalProperties": false,
"properties": {
Expand Down
12 changes: 12 additions & 0 deletions tests/translator/input/streams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ Resources:
Stream: arn:aws:dynamodb:us-west-2:012345678901:table/TestTable/stream/2015-05-11T21:21:33.291
BatchSize: 200
StartingPosition: LATEST
SQSFunction:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving SQS test to a separate file. This file is already getting overloaded.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should consider moving this to a separate file. And this file is explicitly named Streams and hence SQS doesn't belong here.

Type: 'AWS::Serverless::Function'
Properties:
CodeUri: s3://sam-demo-bucket/queues.zip
Handler: queue.sqs_handler
Runtime: python2.7
Events:
MySqsQueue:
Type: SQS
Properties:
Queue: arn:aws:sqs:us-west-2:012345678901:my-queue
BatchSize: 200

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch size for SQS is only between 1 and 10

Loading