Skip to content

Commit

Permalink
Merge pull request #1126 from kapilt/cwe-support
Browse files Browse the repository at this point in the history
cloud watch events support
  • Loading branch information
kyleknap committed Aug 22, 2019
2 parents 4ecde2b + 8705c5e commit 0ad1309
Show file tree
Hide file tree
Showing 15 changed files with 408 additions and 59 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ Next Release (TBD)
==================

* Add support for stage independent lambda configuration
(#1162 <https://github.com/aws/chalice/pull/1162>`__)
(`#1162 <https://github.com/aws/chalice/pull/1162>`__)

* Add support for subscribing to CloudWatch Events
(`#1126 <https://github.com/aws/chalice/pull/1126>`__)

1.10.0
======
Expand Down
28 changes: 25 additions & 3 deletions chalice/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ def on_sqs_message(self, queue, batch_size=1, name=None):
registration_kwargs={'queue': queue, 'batch_size': batch_size}
)

def on_cw_event(self, event_pattern, name=None):
return self._create_registration_function(
handler_type='on_cw_event',
name=name,
registration_kwargs={'event_pattern': event_pattern}
)

def schedule(self, expression, name=None):
return self._create_registration_function(
handler_type='schedule',
Expand Down Expand Up @@ -653,6 +660,7 @@ def _wrap_handler(self, handler_type, handler_name, user_handler):
'on_s3_event': S3Event,
'on_sns_message': SNSEvent,
'on_sqs_message': SQSEvent,
'on_cw_event': CloudWatchEvent,
'schedule': CloudWatchEvent,
}
if handler_type in event_classes:
Expand Down Expand Up @@ -797,8 +805,16 @@ def _register_on_sqs_message(self, name, handler_string, kwargs, **unused):
)
self.event_sources.append(sqs_config)

def _register_schedule(self, name, handler_string, kwargs, **unused):
def _register_on_cw_event(self, name, handler_string, kwargs, **unused):
event_source = CloudWatchEventConfig(
name=name,
event_pattern=kwargs['event_pattern'],
handler_string=handler_string
)
self.event_sources.append(event_source)

def _register_schedule(self, name, handler_string, kwargs, **unused):
event_source = ScheduledEventConfig(
name=name,
schedule_expression=kwargs['expression'],
handler_string=handler_string,
Expand Down Expand Up @@ -1257,12 +1273,18 @@ def __init__(self, name, handler_string):
self.handler_string = handler_string


class CloudWatchEventConfig(BaseEventSourceConfig):
class ScheduledEventConfig(BaseEventSourceConfig):
def __init__(self, name, handler_string, schedule_expression):
super(CloudWatchEventConfig, self).__init__(name, handler_string)
super(ScheduledEventConfig, self).__init__(name, handler_string)
self.schedule_expression = schedule_expression


class CloudWatchEventConfig(BaseEventSourceConfig):
def __init__(self, name, handler_string, event_pattern):
super(CloudWatchEventConfig, self).__init__(name, handler_string)
self.event_pattern = event_pattern


class ScheduleExpression(object):
def to_string(self):
raise NotImplementedError("to_string")
Expand Down
6 changes: 5 additions & 1 deletion chalice/app.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,14 @@ class SQSEventConfig(BaseEventSourceConfig):
batch_size = ... # type: int


class CloudWatchEventConfig(BaseEventSourceConfig):
class ScheduledEventConfig(BaseEventSourceConfig):
schedule_expression = ... # type: Union[str, ScheduleExpression]


class CloudWatchEventConfig(BaseEventSourceConfig):
event_pattern = ... # type: Dict


class Blueprint(DecoratorAPI):
current_request = ... # type: Request
lambda_context = ... # type: LambdaContext
19 changes: 13 additions & 6 deletions chalice/awsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,21 @@ def add_permission_for_authorizer(self, rest_api_id, function_arn,
SourceArn=source_arn,
)

def get_or_create_rule_arn(self, rule_name, schedule_expression):
# type: (str, str) -> str
def get_or_create_rule_arn(
self, rule_name, schedule_expression=None, event_pattern=None):
# type: (str, str, str) -> str
events = self._client('events')
# put_rule is idempotent so we can safely call it even if it already
# exists.
rule_arn = events.put_rule(Name=rule_name,
ScheduleExpression=schedule_expression)
params = {'Name': rule_name}
if schedule_expression:
params['ScheduleExpression'] = schedule_expression
elif event_pattern:
params['EventPattern'] = event_pattern
else:
raise ValueError(
"schedule_expression or event_pattern required")
rule_arn = events.put_rule(**params)
return rule_arn['RuleArn']

def delete_rule(self, rule_name):
Expand All @@ -752,8 +760,7 @@ def connect_rule_to_lambda(self, rule_name, function_arn):
events.put_targets(Rule=rule_name,
Targets=[{'Id': '1', 'Arn': function_arn}])

def add_permission_for_scheduled_event(self, rule_arn,
function_arn):
def add_permission_for_cloudwatch_event(self, rule_arn, function_arn):
# type: (str, str) -> None
self._add_lambda_permission_if_needed(
source_arn=rule_arn,
Expand Down
44 changes: 37 additions & 7 deletions chalice/deploy/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,13 @@ def _create_lambda_event_resources(self, config, deployment, stage_name):
)
elif isinstance(event_source, app.CloudWatchEventConfig):
resources.append(
self._create_event_model(
self._create_cwe_subscription(
config, deployment, event_source, stage_name
)
)
elif isinstance(event_source, app.ScheduledEventConfig):
resources.append(
self._create_scheduled_model(
config, deployment, event_source, stage_name
)
)
Expand Down Expand Up @@ -540,12 +546,36 @@ def _create_websocket_api_model(
api_gateway_stage=config.api_gateway_stage,
)

def _create_event_model(self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.CloudWatchEventConfig
stage_name, # type: str
):
def _create_cwe_subscription(
self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.CloudWatchEventConfig
stage_name, # type: str
):
# type: (...) -> models.CloudWatchEvent
lambda_function = self._create_lambda_model(
config=config, deployment=deployment, name=event_source.name,
handler_name=event_source.handler_string, stage_name=stage_name
)

resource_name = event_source.name + '-event'
rule_name = '%s-%s-%s' % (config.app_name, config.chalice_stage,
resource_name)
cwe = models.CloudWatchEvent(
resource_name=resource_name,
rule_name=rule_name,
event_pattern=json.dumps(event_source.event_pattern),
lambda_function=lambda_function,
)
return cwe

def _create_scheduled_model(self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.ScheduledEventConfig
stage_name, # type: str
):
# type: (...) -> models.ScheduledEvent
lambda_function = self._create_lambda_model(
config=config, deployment=deployment, name=event_source.name,
Expand Down
45 changes: 22 additions & 23 deletions chalice/deploy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,31 @@ def dependencies(self):


@attrs
class ScheduledEvent(ManagedModel):
resource_type = 'scheduled_event'
rule_name = attrib() # type: str
schedule_expression = attrib() # type: str
lambda_function = attrib() # type: LambdaFunction
class FunctionEventSubscriber(ManagedModel):
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class CloudWatchEventBase(FunctionEventSubscriber):
rule_name = attrib() # type: str


@attrs
class CloudWatchEvent(CloudWatchEventBase):
resource_type = 'cloudwatch_event'
event_pattern = attrib() # type: str


@attrs
class ScheduledEvent(CloudWatchEventBase):
resource_type = 'scheduled_event'
schedule_expression = attrib() # type: str


@attrs
class RestAPI(ManagedModel):
resource_type = 'rest_api'
Expand Down Expand Up @@ -223,37 +237,22 @@ def dependencies(self):


@attrs
class S3BucketNotification(ManagedModel):
class S3BucketNotification(FunctionEventSubscriber):
resource_type = 's3_event'
bucket = attrib() # type: str
events = attrib() # type: List[str]
prefix = attrib() # type: Optional[str]
suffix = attrib() # type: Optional[str]
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class SNSLambdaSubscription(ManagedModel):
class SNSLambdaSubscription(FunctionEventSubscriber):
resource_type = 'sns_event'
topic = attrib() # type: str
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class SQSEventSource(ManagedModel):
class SQSEventSource(FunctionEventSubscriber):
resource_type = 'sqs_event'
queue = attrib() # type: str
batch_size = attrib() # type: int
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]
34 changes: 23 additions & 11 deletions chalice/deploy/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,21 +561,25 @@ def _plan_s3bucketnotification(self, resource):
),
]

def _plan_scheduledevent(self, resource):
# type: (models.ScheduledEvent) -> Sequence[InstructionMsg]
def _create_cloudwatchevent(self, resource):
# type: (models.CloudWatchEventBase) -> Sequence[InstructionMsg]

function_arn = Variable(
'%s_lambda_arn' % resource.lambda_function.resource_name
)
# Because the underlying API calls have PUT semantics,
# we don't have to check if the resource exists and have
# a separate code path for updates. We could however
# check if the resource exists to avoid unnecessary API
# calls, but that's a later optimization.

params = {'rule_name': resource.rule_name}
if isinstance(resource, models.ScheduledEvent):
resource = cast(models.ScheduledEvent, resource)
params['schedule_expression'] = resource.schedule_expression
else:
resource = cast(models.CloudWatchEvent, resource)
params['event_pattern'] = resource.event_pattern

plan = [
models.APICall(
method_name='get_or_create_rule_arn',
params={'rule_name': resource.rule_name,
'schedule_expression': resource.schedule_expression},
params=params,
output_var='rule-arn',
),
models.APICall(
Expand All @@ -584,11 +588,11 @@ def _plan_scheduledevent(self, resource):
'function_arn': function_arn}
),
models.APICall(
method_name='add_permission_for_scheduled_event',
method_name='add_permission_for_cloudwatch_event',
params={'rule_arn': Variable('rule-arn'),
'function_arn': function_arn},
),
# You need to remote targets (which have IDs)
# You need to remove targets (which have IDs)
# before you can delete a rule.
models.RecordResourceValue(
resource_type='cloudwatch_event',
Expand All @@ -599,6 +603,14 @@ def _plan_scheduledevent(self, resource):
]
return plan

def _plan_cloudwatchevent(self, resource):
# type: (models.CloudWatchEvent) -> Sequence[InstructionMsg]
return self._create_cloudwatchevent(resource)

def _plan_scheduledevent(self, resource):
# type: (models.ScheduledEvent) -> Sequence[InstructionMsg]
return self._create_cloudwatchevent(resource)

def _create_websocket_function_configs(self, resource):
# type: (models.WebsocketAPI) -> Dict[str, Dict[str, Any]]
configs = OrderedDict() # type: Dict[str, Dict[str, Any]]
Expand Down
36 changes: 35 additions & 1 deletion chalice/package.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# pylint: disable=too-many-lines

import copy
import json
import os
import copy

from typing import Any, Optional, Dict, List, Set, Union # noqa
from typing import cast
Expand Down Expand Up @@ -161,6 +162,24 @@ def _generate_scheduledevent(self, resource, template):
}
}

def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
event_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
function_cfn['Properties']['Events'] = {
event_cfn_name: {
'Type': 'CloudWatchEvent',
'Properties': {
# For api calls we need serialized string form, for
# SAM Templates we need datastructures.
'Pattern': json.loads(resource.event_pattern)
}
}
}

def _generate_lambdafunction(self, resource, template):
# type: (models.LambdaFunction, Dict[str, Any]) -> None
resources = template['Resources']
Expand Down Expand Up @@ -717,6 +736,17 @@ def _generate_snslambdasubscription(self, resource, template):
'source_arn': topic_arn
}

def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None

template['resource'].setdefault(
'aws_cloudwatch_event_rule', {})[
resource.resource_name] = {
'name': resource.resource_name,
'event_pattern': resource.event_pattern
}
self._cwe_helper(resource, template)

def _generate_scheduledevent(self, resource, template):
# type: (models.ScheduledEvent, Dict[str, Any]) -> None

Expand All @@ -726,6 +756,10 @@ def _generate_scheduledevent(self, resource, template):
'name': resource.resource_name,
'schedule_expression': resource.schedule_expression
}
self._cwe_helper(resource, template)

def _cwe_helper(self, resource, template):
# type: (models.CloudWatchEventBase, Dict[str, Any]) -> None
template['resource'].setdefault(
'aws_cloudwatch_event_target', {})[
resource.resource_name] = {
Expand Down

0 comments on commit 0ad1309

Please sign in to comment.