Skip to content

Commit

Permalink
support subscribing to cloud watch events
Browse files Browse the repository at this point in the history
  • Loading branch information
kapilt committed May 27, 2019
1 parent 0ad94ee commit 87080f0
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 25 deletions.
28 changes: 25 additions & 3 deletions chalice/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,13 @@ def on_sqs_message(self, queue, batch_size=1, name=None):
registration_kwargs={'queue': queue, 'batch_size': batch_size}
)

def on_cw_event(self, event_pattern, name=None):
return self._create_registration_function(
handler_type='on_cw_event',
name=name,
registration_kwargs={'event_pattern': event_pattern}
)

def schedule(self, expression, name=None):
return self._create_registration_function(
handler_type='schedule',
Expand Down Expand Up @@ -535,6 +542,7 @@ def _wrap_handler(self, handler_type, handler_name, user_handler):
'on_s3_event': S3Event,
'on_sns_message': SNSEvent,
'on_sqs_message': SQSEvent,
'on_cw_event': CloudWatchEvent,
'schedule': CloudWatchEvent,
}
if handler_type in event_classes:
Expand Down Expand Up @@ -623,8 +631,16 @@ def _register_on_sqs_message(self, name, handler_string, kwargs, **unused):
)
self.event_sources.append(sqs_config)

def _register_schedule(self, name, handler_string, kwargs, **unused):
def _register_on_cw_event(self, name, handler_string, kwargs, **unused):
event_source = CloudWatchEventConfig(
name=name,
event_pattern=kwargs['event_pattern'],
handler_string=handler_string
)
self.event_sources.append(event_source)

def _register_schedule(self, name, handler_string, kwargs, **unused):
event_source = ScheduledEventConfig(
name=name,
schedule_expression=kwargs['expression'],
handler_string=handler_string,
Expand Down Expand Up @@ -1064,12 +1080,18 @@ def __init__(self, name, handler_string):
self.handler_string = handler_string


class CloudWatchEventConfig(BaseEventSourceConfig):
class ScheduledEventConfig(BaseEventSourceConfig):
def __init__(self, name, handler_string, schedule_expression):
super(CloudWatchEventConfig, self).__init__(name, handler_string)
super(ScheduledEventConfig, self).__init__(name, handler_string)
self.schedule_expression = schedule_expression


class CloudWatchEventConfig(BaseEventSourceConfig):
def __init__(self, name, handler_string, event_pattern):
super(CloudWatchEventConfig, self).__init__(name, handler_string)
self.event_pattern = event_pattern


class ScheduleExpression(object):
def to_string(self):
raise NotImplementedError("to_string")
Expand Down
6 changes: 5 additions & 1 deletion chalice/app.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,14 @@ class SQSEventConfig(BaseEventSourceConfig):
batch_size = ... # type: int


class CloudWatchEventConfig(BaseEventSourceConfig):
class ScheduledEventConfig(BaseEventSourceConfig):
schedule_expression = ... # type: Union[str, ScheduleExpression]


class CloudWatchEventConfig(BaseEventSourceConfig):
event_pattern = ... # type: Dict


class Blueprint(DecoratorAPI):
current_request = ... # type: Request
lambda_context = ... # type: LambdaContext
17 changes: 11 additions & 6 deletions chalice/awsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,18 @@ def add_permission_for_authorizer(self, rest_api_id, function_arn,
SourceArn=source_arn,
)

def get_or_create_rule_arn(self, rule_name, schedule_expression):
# type: (str, str) -> str
def get_or_create_rule_arn(
self, rule_name, schedule_expression=None, event_pattern=None):
# type: (str, str, str) -> str
events = self._client('events')
# put_rule is idempotent so we can safely call it even if it already
# exists.
rule_arn = events.put_rule(Name=rule_name,
ScheduleExpression=schedule_expression)
params = {'Name': rule_name}
if schedule_expression:
params['ScheduleExpression'] = schedule_expression
elif event_pattern:
params['EventPattern'] = event_pattern
rule_arn = events.put_rule(**params)
return rule_arn['RuleArn']

def delete_rule(self, rule_name):
Expand All @@ -729,8 +734,8 @@ def connect_rule_to_lambda(self, rule_name, function_arn):
events.put_targets(Rule=rule_name,
Targets=[{'Id': '1', 'Arn': function_arn}])

def add_permission_for_scheduled_event(self, rule_arn,
function_arn):
def add_permission_for_cloud_watch_event(self, rule_arn,
function_arn):
# type: (str, str) -> None
self._add_lambda_permission_if_needed(
source_arn=rule_arn,
Expand Down
44 changes: 37 additions & 7 deletions chalice/deploy/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,13 @@ def _create_lambda_event_resources(self, config, deployment, stage_name):
)
elif isinstance(event_source, app.CloudWatchEventConfig):
resources.append(
self._create_event_model(
self._create_cwe_subscription(
config, deployment, event_source, stage_name
)
)
elif isinstance(event_source, app.ScheduledEventConfig):
resources.append(
self._create_scheduled_model(
config, deployment, event_source, stage_name
)
)
Expand Down Expand Up @@ -447,12 +453,36 @@ def _create_rest_api_model(self,
authorizers=authorizers,
)

def _create_event_model(self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.CloudWatchEventConfig
stage_name, # type: str
):
def _create_cwe_subscription(
self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.CloudWatchEventConfig
stage_name, # type: str
):
# type: (...) -> models.CloudWatchEvent
lambda_function = self._create_lambda_model(
config=config, deployment=deployment, name=event_source.name,
handler_name=event_source.handler_string, stage_name=stage_name
)

resource_name = event_source.name + '-event'
rule_name = '%s-%s-%s' % (config.app_name, config.chalice_stage,
resource_name)
cwe = models.CloudWatchEvent(
resource_name=resource_name,
rule_name=rule_name,
event_pattern=json.dumps(event_source.event_pattern),
lambda_function=lambda_function,
)
return cwe

def _create_scheduled_model(self,
config, # type: Config
deployment, # type: models.DeploymentPackage
event_source, # type: app.ScheduledEventConfig
stage_name, # type: str
):
# type: (...) -> models.ScheduledEvent
lambda_function = self._create_lambda_model(
config=config, deployment=deployment, name=event_source.name,
Expand Down
12 changes: 12 additions & 0 deletions chalice/deploy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ def dependencies(self):
return [self.role, self.deployment_package]


@attrs
class CloudWatchEvent(ManagedModel):
resource_type = 'cw_event'
rule_name = attrib() # type: str
event_pattern = attrib() # type: str
lambda_function = attrib() # type: LambdaFunction

def dependencies(self):
# type: () -> List[Model]
return [self.lambda_function]


@attrs
class ScheduledEvent(ManagedModel):
resource_type = 'scheduled_event'
Expand Down
39 changes: 37 additions & 2 deletions chalice/deploy/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,41 @@ def _plan_s3bucketnotification(self, resource):
),
]

def _plan_cloudwatchevent(self, resource):
# type: (models.CloudWatchEvent) -> Sequence[InstructionMsg]

function_arn = Variable(
'%s_lambda_arn' % resource.lambda_function.resource_name
)

plan = [
models.APICall(
method_name='get_or_create_rule_arn',
params={'rule_name': resource.rule_name,
'event_pattern': resource.event_pattern},
output_var='rule-arn',
),
models.APICall(
method_name='connect_rule_to_lambda',
params={'rule_name': resource.rule_name,
'function_arn': function_arn}
),
models.APICall(
method_name='add_permission_for_cloud_watch_event',
params={'rule_arn': Variable('rule-arn'),
'function_arn': function_arn},
),
# You need to remove targets (which have IDs)
# before you can delete a rule.
models.RecordResourceValue(
resource_type='cloudwatch_event',
resource_name=resource.resource_name,
name='rule_name',
value=resource.rule_name,
)
]
return plan

def _plan_scheduledevent(self, resource):
# type: (models.ScheduledEvent) -> Sequence[InstructionMsg]
function_arn = Variable(
Expand All @@ -709,11 +744,11 @@ def _plan_scheduledevent(self, resource):
'function_arn': function_arn}
),
models.APICall(
method_name='add_permission_for_scheduled_event',
method_name='add_permission_for_cloud_watch_event',
params={'rule_arn': Variable('rule-arn'),
'function_arn': function_arn},
),
# You need to remote targets (which have IDs)
# You need to remove targets (which have IDs)
# before you can delete a rule.
models.RecordResourceValue(
resource_type='cloudwatch_event',
Expand Down
21 changes: 20 additions & 1 deletion chalice/package.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import copy
import json
import os

from typing import Any, Dict, List, Set, Union # noqa
from typing import cast
Expand Down Expand Up @@ -99,6 +100,24 @@ def _generate_scheduledevent(self, resource, template):
}
}

def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
event_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
function_cfn['Properties']['Events'] = {
event_cfn_name: {
'Type': 'CloudWatchEvent',
'Properties': {
# For api calls we need serialized string form, for
# SAM Templates we need datastructures.
'Pattern': json.loads(resource.event_pattern)
}
}
}

def _generate_lambdafunction(self, resource, template):
# type: (models.LambdaFunction, Dict[str, Any]) -> None
resources = template['Resources']
Expand Down
21 changes: 19 additions & 2 deletions tests/functional/test_awsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,23 @@ def test_update_api_from_swagger(stubbed_session):
stubbed_session.verify_stubs()


def test_can_get_or_create_rule_arn_with_pattern(stubbed_session):
events = stubbed_session.stub('events')
events.put_rule(
Name='rule-name',
EventPattern='{"source": ["aws.ec2"]}').returns({
'RuleArn': 'rule-arn',
})

stubbed_session.activate_stubs()
awsclient = TypedAWSClient(stubbed_session)
result = awsclient.get_or_create_rule_arn(
rule_name='rule-name',
event_pattern='{"source": ["aws.ec2"]}')
stubbed_session.verify_stubs()
assert result == 'rule-arn'


def test_can_get_or_create_rule_arn(stubbed_session):
events = stubbed_session.stub('events')
events.put_rule(
Expand Down Expand Up @@ -1392,7 +1409,7 @@ def test_add_permission_for_scheduled_event(stubbed_session):
stubbed_session.activate_stubs()

awsclient = TypedAWSClient(stubbed_session)
awsclient.add_permission_for_scheduled_event(
awsclient.add_permission_for_cloud_watch_event(
'rule-arn', 'function-arn')

stubbed_session.verify_stubs()
Expand Down Expand Up @@ -1421,7 +1438,7 @@ def test_skip_if_permission_already_granted(stubbed_session):

stubbed_session.activate_stubs()
awsclient = TypedAWSClient(stubbed_session)
awsclient.add_permission_for_scheduled_event(
awsclient.add_permission_for_cloud_watch_event(
'rule-arn', 'function-arn')
stubbed_session.verify_stubs()

Expand Down
25 changes: 25 additions & 0 deletions tests/unit/deploy/test_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ def foo(event):
return app


@fixture
def cloud_watch_event_app():
app = Chalice('cloud-watch-event')

@app.on_cw_event({'source': {'source': ['aws.ec2']}})
def foo(event):
return event

return app


@fixture
def rest_api_app():
app = Chalice('rest-api')
Expand Down Expand Up @@ -617,6 +628,20 @@ def test_autogen_policy_for_function(self, lambda_app):
policy=models.AutoGenIAMPolicy(models.Placeholder.BUILD_STAGE),
)

def test_cloud_watch_event_models(self, cloud_watch_event_app):
config = self.create_config(cloud_watch_event_app,
app_name='cloud-watch-event',
autogen_policy=True)
builder = ApplicationGraphBuilder()
application = builder.build(config, stage_name='dev')
assert len(application.resources) == 1
event = application.resources[0]
assert isinstance(event, models.CloudWatchEvent)
assert event.resource_name == 'foo-event'
assert event.rule_name == 'cloud-watch-event-dev-foo-event'
assert isinstance(event.lambda_function, models.LambdaFunction)
assert event.lambda_function.resource_name == 'foo'

def test_scheduled_event_models(self, scheduled_event_app):
config = self.create_config(scheduled_event_app,
app_name='scheduled-event',
Expand Down

0 comments on commit 87080f0

Please sign in to comment.