Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CloudWatch events - Fixes #52 #69

Merged
merged 5 commits into from
Jun 27, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion kappa/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import kappa.event_source.kinesis
import kappa.event_source.s3
import kappa.event_source.sns
import kappa.event_source.cloudwatch
import kappa.policy
import kappa.role
import kappa.awsclient
Expand Down Expand Up @@ -181,6 +182,7 @@ def _create_event_sources(self):
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': kappa.event_source.sns.SNSEventSource,
'events': kappa.event_source.cloudwatch.CloudWatchEventSource
}
for event_source_cfg in event_sources:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
Expand Down Expand Up @@ -226,7 +228,7 @@ def create(self):
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
LOG.debug('Waiting for policy/role propagation')
time.sleep(5)
self.function.create()
self.add_event_sources()
Expand All @@ -239,6 +241,7 @@ def deploy(self):
self.function.deploy()
if self.restapi:
self.restapi.deploy()
self.add_event_sources()

def invoke(self, data):
return self.function.invoke(data)
Expand Down
113 changes: 113 additions & 0 deletions kappa/event_source/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kappa.event_source.base
import logging
import uuid

LOG = logging.getLogger(__name__)


class CloudWatchEventSource(kappa.event_source.base.EventSource):

def __init__(self, context, config):
super(CloudWatchEventSource, self).__init__(context, config)
self._events = kappa.awsclient.create_client('events', context.session)
self._lambda = kappa.awsclient.create_client('lambda', context.session)
self._name = config['arn'].split('/')[-1]
self._context = context
self._config = config

def get_rule(self):
response = self._events.call('list_rules', NamePrefix=self._name)
LOG.debug(response)
if 'Rules' in response:
for r in response['Rules']:
if r['Name'] == self._name:
return r
return None

def add(self, function):
kwargs = {
'Name': self._name,
'State': 'ENABLED' if self.enabled else 'DISABLED'
}
if 'schedule' in self._config:
kwargs['ScheduleExpression'] = self._config['schedule']
if 'pattern' in self._config:
kwargs['EventPattern'] = self._config['pattern']
if 'description' in self._config:
kwargs['Description'] = self._config['description']
if 'role_arn' in self._config:
kwargs['RoleArn'] = self._config['role_arn']
try:
response = self._events.call('put_rule', **kwargs)
LOG.debug(response)
self._config['arn'] = response['RuleArn']
response = self._lambda.call('add_permission',
FunctionName=function.name,
StatementId=str(uuid.uuid4()),
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=response['RuleArn'])
LOG.debug(response)
response = self._events.call('put_targets',
Rule=self._name,
Targets=[{
'Id': function.name,
'Arn': function.arn
}])
LOG.debug(response)
except Exception:
LOG.exception('Unable to put CloudWatch event source')

def update(self, function):
self.add(function)

def remove(self, function):
LOG.debug('removing CloudWatch event source')
try:
rule = self.get_rule()
if rule:
response = self._events.call('remove_targets',
Rule=self._name,
Ids=[function.name])
LOG.debug(response)
response = self._events.call('delete_rule',
Name=self._name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove CloudWatch event source %s', self._name)

def status(self, function):
LOG.debug('status for CloudWatch event for %s', function.name)
return self._to_status(self.get_rule())

def enable(self, function):
if self.get_rule():
self._events.call('enable_rule', Name=self._name)

def disable(self, function):
if self.get_rule():
self._events.call('disable_rule', Name=self._name)

def _to_status(self, rule):
if rule:
return {
'EventSourceArn': rule['Arn'],
'State': rule['State']
}
else:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the else clause here and just return None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that's done.

4 changes: 4 additions & 0 deletions samples/cron/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.kappa/
kappa.yml
*.zip

10 changes: 10 additions & 0 deletions samples/cron/_src/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import logging
import time

LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)


def handler(event, context):
LOG.debug(event)
return {'status': 'success', 'time': time.time()}
4 changes: 4 additions & 0 deletions samples/cron/_tests/test_one.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"foo": "bar",
"fie": "baz"
}
23 changes: 23 additions & 0 deletions samples/cron/kappa.yml.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
name: kappa-cron
environments:
dev:
profile: <your profile here>
region: <your region here>
policy:
resources:
- arn: arn:aws:logs:*:*:*
actions:
- "*"
event_sources:
- arn: arn:aws:events:<your region here>:<your account id>:rule/kappa-cron-dev
schedule: rate(1 minute)
description: cron to run this lambda function every minute
enabled: true
lambda:
description: Kappa sample lambda that runs every minute
handler: simple.handler
runtime: python2.7
memory_size: 128
timeout: 3