Skip to content

Commit

Permalink
Merge pull request #69 from coopernurse/cloudwatch-events
Browse files Browse the repository at this point in the history
CloudWatch events - Fixes #52
  • Loading branch information
josegonzalez committed Jun 27, 2016
2 parents ff1035e + a285ae4 commit eab3559
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 1 deletion.
5 changes: 4 additions & 1 deletion kappa/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import kappa.event_source.kinesis
import kappa.event_source.s3
import kappa.event_source.sns
import kappa.event_source.cloudwatch
import kappa.policy
import kappa.role
import kappa.awsclient
Expand Down Expand Up @@ -181,6 +182,7 @@ def _create_event_sources(self):
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': kappa.event_source.sns.SNSEventSource,
'events': kappa.event_source.cloudwatch.CloudWatchEventSource
}
for event_source_cfg in event_sources:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
Expand Down Expand Up @@ -226,7 +228,7 @@ def create(self):
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
LOG.debug('Waiting for policy/role propagation')
time.sleep(5)
self.function.create()
self.add_event_sources()
Expand All @@ -239,6 +241,7 @@ def deploy(self):
self.function.deploy()
if self.restapi:
self.restapi.deploy()
self.add_event_sources()

def invoke(self, data):
return self.function.invoke(data)
Expand Down
112 changes: 112 additions & 0 deletions kappa/event_source/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kappa.event_source.base
import logging
import uuid

LOG = logging.getLogger(__name__)


class CloudWatchEventSource(kappa.event_source.base.EventSource):

def __init__(self, context, config):
super(CloudWatchEventSource, self).__init__(context, config)
self._events = kappa.awsclient.create_client('events', context.session)
self._lambda = kappa.awsclient.create_client('lambda', context.session)
self._name = config['arn'].split('/')[-1]
self._context = context
self._config = config

def get_rule(self):
response = self._events.call('list_rules', NamePrefix=self._name)
LOG.debug(response)
if 'Rules' in response:
for r in response['Rules']:
if r['Name'] == self._name:
return r
return None

def add(self, function):
kwargs = {
'Name': self._name,
'State': 'ENABLED' if self.enabled else 'DISABLED'
}
if 'schedule' in self._config:
kwargs['ScheduleExpression'] = self._config['schedule']
if 'pattern' in self._config:
kwargs['EventPattern'] = self._config['pattern']
if 'description' in self._config:
kwargs['Description'] = self._config['description']
if 'role_arn' in self._config:
kwargs['RoleArn'] = self._config['role_arn']
try:
response = self._events.call('put_rule', **kwargs)
LOG.debug(response)
self._config['arn'] = response['RuleArn']
response = self._lambda.call('add_permission',
FunctionName=function.name,
StatementId=str(uuid.uuid4()),
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=response['RuleArn'])
LOG.debug(response)
response = self._events.call('put_targets',
Rule=self._name,
Targets=[{
'Id': function.name,
'Arn': function.arn
}])
LOG.debug(response)
except Exception:
LOG.exception('Unable to put CloudWatch event source')

def update(self, function):
self.add(function)

def remove(self, function):
LOG.debug('removing CloudWatch event source')
try:
rule = self.get_rule()
if rule:
response = self._events.call('remove_targets',
Rule=self._name,
Ids=[function.name])
LOG.debug(response)
response = self._events.call('delete_rule',
Name=self._name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove CloudWatch event source %s', self._name)

def status(self, function):
LOG.debug('status for CloudWatch event for %s', function.name)
return self._to_status(self.get_rule())

def enable(self, function):
if self.get_rule():
self._events.call('enable_rule', Name=self._name)

def disable(self, function):
if self.get_rule():
self._events.call('disable_rule', Name=self._name)

def _to_status(self, rule):
if rule:
return {
'EventSourceArn': rule['Arn'],
'State': rule['State']
}
return None
4 changes: 4 additions & 0 deletions samples/cron/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.kappa/
kappa.yml
*.zip

10 changes: 10 additions & 0 deletions samples/cron/_src/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import logging
import time

LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)


def handler(event, context):
LOG.debug(event)
return {'status': 'success', 'time': time.time()}
4 changes: 4 additions & 0 deletions samples/cron/_tests/test_one.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"foo": "bar",
"fie": "baz"
}
23 changes: 23 additions & 0 deletions samples/cron/kappa.yml.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
name: kappa-cron
environments:
dev:
profile: <your profile here>
region: <your region here>
policy:
resources:
- arn: arn:aws:logs:*:*:*
actions:
- "*"
event_sources:
- arn: arn:aws:events:<your region here>:<your account id>:rule/kappa-cron-dev
schedule: rate(1 minute)
description: cron to run this lambda function every minute
enabled: true
lambda:
description: Kappa sample lambda that runs every minute
handler: simple.handler
runtime: python2.7
memory_size: 128
timeout: 3

0 comments on commit eab3559

Please sign in to comment.