Skip to content

Commit

Permalink
Support DLQ, RetryPolicy properties for EventBridgeRule,Schedule even…
Browse files Browse the repository at this point in the history
…t sources (#1842)

* Add DeadLetterConfig,RetryPolicy properties for EventBridgeRule,Schedule event sources

* Minor fix,rename function argument

* Update test class name

* Combine dlq extraction/generation into the utility class

* Remove unused import
  • Loading branch information
ejafarli committed Dec 2, 2020
1 parent a79e4d6 commit 16fa852
Show file tree
Hide file tree
Showing 60 changed files with 3,283 additions and 12 deletions.
4 changes: 4 additions & 0 deletions docs/cloudformation_compatibility.rst
Expand Up @@ -169,6 +169,8 @@ CloudWatchEvent (superseded by EventBridgeRule, see below)
Pattern All
Input All
InputPath All
DeadLetterConfig All
RetryPolicy All
======================== ================================== ========================

EventBridgeRule
Expand All @@ -179,6 +181,8 @@ EventBridgeRule
Pattern All
Input All
InputPath All
DeadLetterConfig All
RetryPolicy All
======================== ================================== ========================

IotRule
Expand Down
11 changes: 11 additions & 0 deletions docs/internals/generated_resources.rst
Expand Up @@ -455,6 +455,8 @@ Example:
Type: Schedule
Properties:
Input: rate(5 minutes)
DeadLetterConfig:
Type: SQS
...
Additional generated resources:
Expand All @@ -464,6 +466,8 @@ CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permission MyFunction\ **MyTimer**\ Permission
AWS::Events::Rule MyFunction\ **MyTimer**
AWS::SQS::Queue MyFunction\ **MyTimer**\ Queue
AWS::SQS::QueuePolicy MyFunction\ **MyTimer**\ QueuePolicy
================================== ================================

CloudWatchEvent (superseded by EventBridgeRule, see below)
Expand Down Expand Up @@ -523,6 +527,11 @@ Example:
detail:
state:
- terminated
DeadLetterConfig:
Type: SQS
RetryPolicy:
MaximumEventAgeInSeconds: 600
MaximumRetryAttempts:3
...
Additional generated resources:
Expand All @@ -532,6 +541,8 @@ CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permission MyFunction\ **OnTerminate**\ Permission
AWS::Events::Rule MyFunction\ **OnTerminate**
AWS::SQS::Queue MyFunction\ **OnTerminate**\ Queue
AWS::SQS::QueuePolicy MyFunction\ **OnTerminate**\ QueuePolicy
================================== ================================

AWS::Serverless::Api
Expand Down
54 changes: 54 additions & 0 deletions samtranslator/model/eventbridge_utils.py
@@ -0,0 +1,54 @@
from samtranslator.model.sqs import SQSQueue, SQSQueuePolicy, SQSQueuePolicies
from samtranslator.model.exceptions import InvalidEventException


class EventBridgeRuleUtils:
@staticmethod
def create_dead_letter_queue_with_policy(rule_logical_id, rule_arn, queue_logical_id=None):
resources = []

queue = SQSQueue(queue_logical_id or rule_logical_id + "Queue")
dlq_queue_arn = queue.get_runtime_attr("arn")
dlq_queue_url = queue.get_runtime_attr("queue_url")

# grant necessary permission to Eventbridge Rule resource for sending messages to dead-letter queue
policy = SQSQueuePolicy(rule_logical_id + "QueuePolicy")
policy.PolicyDocument = SQSQueuePolicies.eventbridge_dlq_send_message_resource_based_policy(
rule_arn, dlq_queue_arn
)
policy.Queues = [dlq_queue_url]

resources.append(queue)
resources.append(policy)

return resources

@staticmethod
def validate_dlq_config(source_logical_id, dead_letter_config):
supported_types = ["SQS"]
is_arn_defined = "Arn" in dead_letter_config
is_type_defined = "Type" in dead_letter_config
if is_arn_defined and is_type_defined:
raise InvalidEventException(
source_logical_id, "You can either define 'Arn' or 'Type' property of DeadLetterConfig"
)
if is_type_defined and dead_letter_config.get("Type") not in supported_types:
raise InvalidEventException(
source_logical_id,
"The only valid value for 'Type' property of DeadLetterConfig is 'SQS'",
)
if not is_arn_defined and not is_type_defined:
raise InvalidEventException(source_logical_id, "No 'Arn' or 'Type' property provided for DeadLetterConfig")

@staticmethod
def get_dlq_queue_arn_and_resources(cw_event_source, source_arn):
"""returns dlq queue arn and dlq_resources, assuming cw_event_source.DeadLetterConfig has been validated"""
dlq_queue_arn = cw_event_source.DeadLetterConfig.get("Arn")
if dlq_queue_arn is not None:
return dlq_queue_arn, []
queue_logical_id = cw_event_source.DeadLetterConfig.get("QueueLogicalId")
dlq_resources = EventBridgeRuleUtils.create_dead_letter_queue_with_policy(
cw_event_source.logical_id, source_arn, queue_logical_id
)
dlq_queue_arn = dlq_resources[0].get_runtime_attr("arn")
return dlq_queue_arn, dlq_resources
43 changes: 37 additions & 6 deletions samtranslator/model/eventsources/push.py
Expand Up @@ -12,6 +12,7 @@
from samtranslator.model.events import EventsRule
from samtranslator.model.eventsources.pull import SQS
from samtranslator.model.sqs import SQSQueue, SQSQueuePolicy, SQSQueuePolicies
from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils
from samtranslator.model.iot import IotTopicRule
from samtranslator.model.cognito import CognitoUserPool
from samtranslator.translator import logical_id_generator
Expand Down Expand Up @@ -94,6 +95,8 @@ class Schedule(PushEventSource):
"Enabled": PropertyType(False, is_type(bool)),
"Name": PropertyType(False, is_str()),
"Description": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
}

def to_cloudformation(self, **kwargs):
Expand All @@ -118,16 +121,23 @@ def to_cloudformation(self, **kwargs):
events_rule.State = "ENABLED" if self.Enabled else "DISABLED"
events_rule.Name = self.Name
events_rule.Description = self.Description
events_rule.Targets = [self._construct_target(function)]

source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)

events_rule.Targets = [self._construct_target(function, dlq_queue_arn)]

if CONDITION in function.resource_attributes:
events_rule.set_resource_attribute(CONDITION, function.resource_attributes[CONDITION])
resources.append(self._construct_permission(function, source_arn=source_arn))

return resources

def _construct_target(self, function):
def _construct_target(self, function, dead_letter_queue_arn=None):
"""Constructs the Target property for the EventBridge Rule.
:returns: the Target property
Expand All @@ -137,6 +147,12 @@ def _construct_target(self, function):
if self.Input is not None:
target["Input"] = self.Input

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand All @@ -148,6 +164,8 @@ class CloudWatchEvent(PushEventSource):
property_types = {
"EventBusName": PropertyType(False, is_str()),
"Pattern": PropertyType(False, is_type(dict)),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
"Input": PropertyType(False, is_str()),
"InputPath": PropertyType(False, is_str()),
"Target": PropertyType(False, is_type(dict)),
Expand All @@ -171,18 +189,24 @@ def to_cloudformation(self, **kwargs):
events_rule = EventsRule(self.logical_id)
events_rule.EventBusName = self.EventBusName
events_rule.EventPattern = self.Pattern
events_rule.Targets = [self._construct_target(function)]
source_arn = events_rule.get_runtime_attr("arn")

dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)

events_rule.Targets = [self._construct_target(function, dlq_queue_arn)]
if CONDITION in function.resource_attributes:
events_rule.set_resource_attribute(CONDITION, function.resource_attributes[CONDITION])

resources.append(events_rule)

source_arn = events_rule.get_runtime_attr("arn")
resources.append(self._construct_permission(function, source_arn=source_arn))

return resources

def _construct_target(self, function):
def _construct_target(self, function, dead_letter_queue_arn=None):
"""Constructs the Target property for the CloudWatch Events/EventBridge Rule.
:returns: the Target property
Expand All @@ -195,6 +219,13 @@ def _construct_target(self, function):

if self.InputPath is not None:
target["InputPath"] = self.InputPath

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand Down
20 changes: 18 additions & 2 deletions samtranslator/model/sqs.py
Expand Up @@ -19,8 +19,8 @@ class SQSQueuePolicy(Resource):


class SQSQueuePolicies:
@classmethod
def sns_topic_send_message_role_policy(cls, topic_arn, queue_arn):
@staticmethod
def sns_topic_send_message_role_policy(topic_arn, queue_arn):
document = {
"Version": "2012-10-17",
"Statement": [
Expand All @@ -34,3 +34,19 @@ def sns_topic_send_message_role_policy(cls, topic_arn, queue_arn):
],
}
return document

@staticmethod
def eventbridge_dlq_send_message_resource_based_policy(rule_arn, queue_arn):
document = {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Principal": {"Service": "events.amazonaws.com"},
"Resource": queue_arn,
"Condition": {"ArnEquals": {"aws:SourceArn": rule_arn}},
}
],
}
return document
41 changes: 37 additions & 4 deletions samtranslator/model/stepfunctions/events.py
Expand Up @@ -8,6 +8,7 @@
from samtranslator.model.intrinsics import fnSub
from samtranslator.translator import logical_id_generator
from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException
from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.swagger.swagger import SwaggerEditor
from samtranslator.open_api.open_api import OpenApiEditor
Expand Down Expand Up @@ -81,6 +82,8 @@ class Schedule(EventSource):
"Enabled": PropertyType(False, is_type(bool)),
"Name": PropertyType(False, is_str()),
"Description": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
}

def to_cloudformation(self, resource, **kwargs):
Expand All @@ -107,11 +110,18 @@ def to_cloudformation(self, resource, **kwargs):

role = self._construct_role(resource, permissions_boundary)
resources.append(role)
events_rule.Targets = [self._construct_target(resource, role)]

source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)
events_rule.Targets = [self._construct_target(resource, role, dlq_queue_arn)]

return resources

def _construct_target(self, resource, role):
def _construct_target(self, resource, role, dead_letter_queue_arn=None):
"""Constructs the Target property for the EventBridge Rule.
:returns: the Target property
Expand All @@ -125,6 +135,12 @@ def _construct_target(self, resource, role):
if self.Input is not None:
target["Input"] = self.Input

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand All @@ -138,6 +154,8 @@ class CloudWatchEvent(EventSource):
"Pattern": PropertyType(False, is_type(dict)),
"Input": PropertyType(False, is_str()),
"InputPath": PropertyType(False, is_str()),
"DeadLetterConfig": PropertyType(False, is_type(dict)),
"RetryPolicy": PropertyType(False, is_type(dict)),
}

def to_cloudformation(self, resource, **kwargs):
Expand All @@ -162,11 +180,19 @@ def to_cloudformation(self, resource, **kwargs):

role = self._construct_role(resource, permissions_boundary)
resources.append(role)
events_rule.Targets = [self._construct_target(resource, role)]

source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig)
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources(self, source_arn)
resources.extend(dlq_resources)

events_rule.Targets = [self._construct_target(resource, role, dlq_queue_arn)]

return resources

def _construct_target(self, resource, role):
def _construct_target(self, resource, role, dead_letter_queue_arn=None):
"""Constructs the Target property for the CloudWatch Events/EventBridge Rule.
:returns: the Target property
Expand All @@ -182,6 +208,13 @@ def _construct_target(self, resource, role):

if self.InputPath is not None:
target["InputPath"] = self.InputPath

if self.DeadLetterConfig is not None:
target["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn}

if self.RetryPolicy is not None:
target["RetryPolicy"] = self.RetryPolicy

return target


Expand Down
28 changes: 28 additions & 0 deletions samtranslator/validator/sam_schema/schema.json
Expand Up @@ -368,6 +368,34 @@
},
"Pattern": {
"type": "object"
},
"DeadLetterConfig": {
"additionalProperties": false,
"properties": {
"Arn": {
"type": "string"
},
"Type": {
"type": "string"
},
"QueueLogicalId": {
"type": "string"
}
},
"type": "object"
},
"RetryPolicy": {
"additionalProperties": false,
"minProperties": 1,
"properties": {
"MaximumEventAgeInSeconds": {
"type": "number"
},
"MaximumRetryAttempts": {
"type": "number"
}
},
"type": "object"
}
},
"required": [
Expand Down

0 comments on commit 16fa852

Please sign in to comment.