Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud watch events support #1126

Merged
merged 3 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ Next Release (TBD)
==================

* Add support for stage independent lambda configuration
(#1162 <https://github.com/aws/chalice/pull/1162>`__)
(`#1162 <https://github.com/aws/chalice/pull/1162>`__)

* Add support for subscribing to CloudWatch Events
(`#1126 <https://github.com/aws/chalice/pull/1126>`__)

1.10.0
======
Expand Down
28 changes: 25 additions & 3 deletions chalice/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ def on_sqs_message(self, queue, batch_size=1, name=None):
registration_kwargs={'queue': queue, 'batch_size': batch_size}
)

def on_cw_event(self, event_pattern, name=None):
kapilt marked this conversation as resolved.
Show resolved Hide resolved
return self._create_registration_function(
handler_type='on_cw_event',
name=name,
registration_kwargs={'event_pattern': event_pattern}
)

def schedule(self, expression, name=None):
return self._create_registration_function(
handler_type='schedule',
Expand Down Expand Up @@ -653,6 +660,7 @@ def _wrap_handler(self, handler_type, handler_name, user_handler):
'on_s3_event': S3Event,
'on_sns_message': SNSEvent,
'on_sqs_message': SQSEvent,
'on_cw_event': CloudWatchEvent,
'schedule': CloudWatchEvent,
}
if handler_type in event_classes:
Expand Down Expand Up @@ -797,8 +805,16 @@ def _register_on_sqs_message(self, name, handler_string, kwargs, **unused):
)
self.event_sources.append(sqs_config)

def _register_schedule(self, name, handler_string, kwargs, **unused):
def _register_on_cw_event(self, name, handler_string, kwargs, **unused):
event_source = CloudWatchEventConfig(
name=name,
event_pattern=kwargs['event_pattern'],
handler_string=handler_string
)
self.event_sources.append(event_source)

def _register_schedule(self, name, handler_string, kwargs, **unused):
event_source = ScheduledEventConfig(
name=name,
schedule_expression=kwargs['expression'],
handler_string=handler_string,
Expand Down Expand Up @@ -1257,12 +1273,18 @@ def __init__(self, name, handler_string):
self.handler_string = handler_string


class CloudWatchEventConfig(BaseEventSourceConfig):
class ScheduledEventConfig(BaseEventSourceConfig):
def __init__(self, name, handler_string, schedule_expression):
super(CloudWatchEventConfig, self).__init__(name, handler_string)
super(ScheduledEventConfig, self).__init__(name, handler_string)
self.schedule_expression = schedule_expression


class CloudWatchEventConfig(BaseEventSourceConfig):
def __init__(self, name, handler_string, event_pattern):
super(CloudWatchEventConfig, self).__init__(name, handler_string)
self.event_pattern = event_pattern


class ScheduleExpression(object):
def to_string(self):
raise NotImplementedError("to_string")
Expand Down
6 changes: 5 additions & 1 deletion chalice/app.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,14 @@ class SQSEventConfig(BaseEventSourceConfig):
batch_size = ... # type: int


class CloudWatchEventConfig(BaseEventSourceConfig):
class ScheduledEventConfig(BaseEventSourceConfig):
schedule_expression = ... # type: Union[str, ScheduleExpression]


class CloudWatchEventConfig(BaseEventSourceConfig):
event_pattern = ... # type: Dict


class Blueprint(DecoratorAPI):
current_request = ... # type: Request
lambda_context = ... # type: LambdaContext
19 changes: 13 additions & 6 deletions chalice/awsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,21 @@ def add_permission_for_authorizer(self, rest_api_id, function_arn,
SourceArn=source_arn,
)

def get_or_create_rule_arn(self, rule_name, schedule_expression):
# type: (str, str) -> str
def get_or_create_rule_arn(
self, rule_name, schedule_expression=None, event_pattern=None):
# type: (str, str, str) -> str
events = self._client('events')
# put_rule is idempotent so we can safely call it even if it already
# exists.
rule_arn = events.put_rule(Name=rule_name,
ScheduleExpression=schedule_expression)
params = {'Name': rule_name}
if schedule_expression:
params['ScheduleExpression'] = schedule_expression
elif event_pattern:
params['EventPattern'] = event_pattern
else:
raise ValueError(
"schedule_expression or event_pattern required")
rule_arn = events.put_rule(**params)
return rule_arn['RuleArn']

def delete_rule(self, rule_name):
Expand All @@ -752,8 +760,7 @@ def connect_rule_to_lambda(self, rule_name, function_arn):
events.put_targets(Rule=rule_name,
Targets=[{'Id': '1', 'Arn': function_arn}])

def add_permission_for_scheduled_event(self, rule_arn,
function_arn):
def add_permission_for_cloudwatch_event(self, rule_arn, function_arn):
# type: (str, str) -> None
self._add_lambda_permission_if_needed(
source_arn=rule_arn,
Expand Down
44 changes: 37 additions & 7 deletions chalice/deploy/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,13 @@ def _create_lambda_event_resources(self, config, deployment, stage_name):
)
elif isinstance(event_source, app.CloudWatchEventConfig):
resources.append(
self._create_event_model(
self._create_cwe_subscription(
config, deployment, event_source, stage_name
)
)
elif isinstance(event_source, app.ScheduledEventConfig):
resources.append(
self._create_scheduled_model(
config, deployment, event_source, stage_name
)
)
Expand Down Expand Up @@ -540,12 +546,36 @@ def _create_websocket_api_model(
api_gateway_stage=config.api_gateway_stage,
)

def _create_event_model(self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.CloudWatchEventConfig
stage_name, # type: str
):
def _create_cwe_subscription(
self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.CloudWatchEventConfig
stage_name, # type: str
):
# type: (...) -> models.CloudWatchEvent
lambda_function = self._create_lambda_model(
config=config, deployment=deployment, name=event_source.name,
handler_name=event_source.handler_string, stage_name=stage_name
)

resource_name = event_source.name + '-event'
rule_name = '%s-%s-%s' % (config.app_name, config.chalice_stage,
resource_name)
cwe = models.CloudWatchEvent(
resource_name=resource_name,
rule_name=rule_name,
event_pattern=json.dumps(event_source.event_pattern),
lambda_function=lambda_function,
)
return cwe

def _create_scheduled_model(self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.ScheduledEventConfig
stage_name, # type: str
):
# type: (...) -> models.ScheduledEvent
lambda_function = self._create_lambda_model(
config=config, deployment=deployment, name=event_source.name,
Expand Down
45 changes: 22 additions & 23 deletions chalice/deploy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,31 @@ def dependencies(self):


@attrs
class ScheduledEvent(ManagedModel):
resource_type = 'scheduled_event'
rule_name = attrib() # type: str
schedule_expression = attrib() # type: str
lambda_function = attrib() # type: LambdaFunction
class FunctionEventSubscriber(ManagedModel):
lambda_function = attrib() # type: LambdaFunction
kyleknap marked this conversation as resolved.
Show resolved Hide resolved

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class CloudWatchEventBase(FunctionEventSubscriber):
rule_name = attrib() # type: str


@attrs
class CloudWatchEvent(CloudWatchEventBase):
resource_type = 'cloudwatch_event'
event_pattern = attrib() # type: str


@attrs
class ScheduledEvent(CloudWatchEventBase):
resource_type = 'scheduled_event'
schedule_expression = attrib() # type: str


@attrs
class RestAPI(ManagedModel):
resource_type = 'rest_api'
Expand Down Expand Up @@ -223,37 +237,22 @@ def dependencies(self):


@attrs
class S3BucketNotification(ManagedModel):
class S3BucketNotification(FunctionEventSubscriber):
resource_type = 's3_event'
bucket = attrib() # type: str
events = attrib() # type: List[str]
prefix = attrib() # type: Optional[str]
suffix = attrib() # type: Optional[str]
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class SNSLambdaSubscription(ManagedModel):
class SNSLambdaSubscription(FunctionEventSubscriber):
resource_type = 'sns_event'
topic = attrib() # type: str
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class SQSEventSource(ManagedModel):
class SQSEventSource(FunctionEventSubscriber):
resource_type = 'sqs_event'
queue = attrib() # type: str
batch_size = attrib() # type: int
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]
34 changes: 23 additions & 11 deletions chalice/deploy/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,21 +561,25 @@ def _plan_s3bucketnotification(self, resource):
),
]

def _plan_scheduledevent(self, resource):
# type: (models.ScheduledEvent) -> Sequence[InstructionMsg]
def _create_cloudwatchevent(self, resource):
# type: (models.CloudWatchEventBase) -> Sequence[InstructionMsg]

function_arn = Variable(
'%s_lambda_arn' % resource.lambda_function.resource_name
)
# Because the underlying API calls have PUT semantics,
# we don't have to check if the resource exists and have
# a separate code path for updates. We could however
# check if the resource exists to avoid unnecessary API
# calls, but that's a later optimization.

params = {'rule_name': resource.rule_name}
if isinstance(resource, models.ScheduledEvent):
resource = cast(models.ScheduledEvent, resource)
params['schedule_expression'] = resource.schedule_expression
else:
resource = cast(models.CloudWatchEvent, resource)
params['event_pattern'] = resource.event_pattern

plan = [
models.APICall(
method_name='get_or_create_rule_arn',
params={'rule_name': resource.rule_name,
'schedule_expression': resource.schedule_expression},
params=params,
output_var='rule-arn',
),
models.APICall(
Expand All @@ -584,11 +588,11 @@ def _plan_scheduledevent(self, resource):
'function_arn': function_arn}
),
models.APICall(
method_name='add_permission_for_scheduled_event',
method_name='add_permission_for_cloudwatch_event',
params={'rule_arn': Variable('rule-arn'),
'function_arn': function_arn},
),
# You need to remote targets (which have IDs)
# You need to remove targets (which have IDs)
# before you can delete a rule.
models.RecordResourceValue(
resource_type='cloudwatch_event',
Expand All @@ -599,6 +603,14 @@ def _plan_scheduledevent(self, resource):
]
return plan

def _plan_cloudwatchevent(self, resource):
# type: (models.CloudWatchEvent) -> Sequence[InstructionMsg]
return self._create_cloudwatchevent(resource)

def _plan_scheduledevent(self, resource):
# type: (models.ScheduledEvent) -> Sequence[InstructionMsg]
return self._create_cloudwatchevent(resource)

def _create_websocket_function_configs(self, resource):
# type: (models.WebsocketAPI) -> Dict[str, Dict[str, Any]]
configs = OrderedDict() # type: Dict[str, Dict[str, Any]]
Expand Down
36 changes: 35 additions & 1 deletion chalice/package.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# pylint: disable=too-many-lines

import copy
import json
import os
import copy

from typing import Any, Optional, Dict, List, Set, Union # noqa
from typing import cast
Expand Down Expand Up @@ -161,6 +162,24 @@ def _generate_scheduledevent(self, resource, template):
}
}

def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
event_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
function_cfn['Properties']['Events'] = {
event_cfn_name: {
'Type': 'CloudWatchEvent',
'Properties': {
# For api calls we need serialized string form, for
# SAM Templates we need datastructures.
'Pattern': json.loads(resource.event_pattern)
}
}
}

def _generate_lambdafunction(self, resource, template):
# type: (models.LambdaFunction, Dict[str, Any]) -> None
resources = template['Resources']
Expand Down Expand Up @@ -717,6 +736,17 @@ def _generate_snslambdasubscription(self, resource, template):
'source_arn': topic_arn
}

def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None

template['resource'].setdefault(
'aws_cloudwatch_event_rule', {})[
resource.resource_name] = {
'name': resource.resource_name,
'event_pattern': resource.event_pattern
}
self._cwe_helper(resource, template)

def _generate_scheduledevent(self, resource, template):
# type: (models.ScheduledEvent, Dict[str, Any]) -> None

Expand All @@ -726,6 +756,10 @@ def _generate_scheduledevent(self, resource, template):
'name': resource.resource_name,
'schedule_expression': resource.schedule_expression
}
self._cwe_helper(resource, template)

def _cwe_helper(self, resource, template):
# type: (models.CloudWatchEventBase, Dict[str, Any]) -> None
template['resource'].setdefault(
'aws_cloudwatch_event_target', {})[
resource.resource_name] = {
Expand Down