Skip to content

Commit

Permalink
WIP Commit. Updating to use new GA version of the Lambda API. Also mo…
Browse files Browse the repository at this point in the history
…ving from botocore to boto3. Also adding SNS example. No longer using CloudFormation for policies since we only need one and CloudFormation does not yet support managed policies. Haven't updated any tests yet so they will all be failing for now. Also need to update README.
  • Loading branch information
garnaat committed Apr 23, 2015
1 parent 3670fb0 commit 2bbf5fa
Show file tree
Hide file tree
Showing 18 changed files with 540 additions and 243 deletions.
48 changes: 26 additions & 22 deletions bin/kappa
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# language governing permissions and limitations under the License.
from datetime import datetime
import logging
import base64

import click

Expand All @@ -38,18 +39,20 @@ def cli(ctx, config=None, debug=False):

@cli.command()
@click.pass_context
def deploy(ctx):
def create(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('deploying...')
context.deploy()
context.create()
click.echo('...done')

@cli.command()
@click.pass_context
def test(ctx):
def invoke(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('testing...')
context.test()
click.echo('invoking...')
response = context.invoke()
log_data = base64.b64decode(response['LogResult'])
click.echo(log_data)
click.echo('...done')

@cli.command()
Expand All @@ -67,31 +70,32 @@ def tail(ctx):
def status(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
status = context.status()
click.echo(click.style('Stack', bold=True))
if status['stack']:
for stack in status['stack']['Stacks']:
line = ' {}: {}'.format(stack['StackId'], stack['StackStatus'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Policy', bold=True))
if status['policy']:
line = ' {} ({})'.format(
status['policy']['PolicyName'],
status['policy']['Arn'])
click.echo(click.style(line, fg='green'))
click.echo(click.style('Role', bold=True))
if status['role']:
line = ' {} ({})'.format(
status['role']['Role']['RoleName'],
status['role']['Role']['Arn'])
click.echo(click.style(line, fg='green'))
click.echo(click.style('Function', bold=True))
if status['function']:
line = ' {}'.format(
status['function']['Configuration']['FunctionName'])
line = ' {} ({})'.format(
status['function']['Configuration']['FunctionName'],
status['function']['Configuration']['FunctionArn'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Event Sources', bold=True))
if status['event_sources']:
for event_source in status['event_sources']:
if 'EventSource' in event_source:
line = ' {}: {}'.format(
event_source['EventSource'], event_source['IsActive'])
click.echo(click.style(line, fg='green'))
else:
line = ' {}'.format(
event_source['CloudFunctionConfiguration']['Id'])
click.echo(click.style(line, fg='green'))
line = ' {}: {}'.format(
event_source['EventSourceArn'], event_source['State'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))

Expand Down
15 changes: 7 additions & 8 deletions kappa/aws.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014,2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
Expand All @@ -11,21 +11,20 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import botocore.session
import boto3


class __AWS(object):

def __init__(self, profile=None, region=None):
def __init__(self, profile_name=None, region_name=None):
self._client_cache = {}
self._session = botocore.session.get_session()
self._session.profile = profile
self._region = region
self._session = boto3.session.Session(
region_name=region_name, profile_name=profile_name)

def create_client(self, client_name):
if client_name not in self._client_cache:
self._client_cache[client_name] = self._session.create_client(
client_name, self._region)
self._client_cache[client_name] = self._session.client(
client_name)
return self._client_cache[client_name]


Expand Down
81 changes: 53 additions & 28 deletions kappa/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

import kappa.function
import kappa.event_source
import kappa.stack
import kappa.policy
import kappa.role

LOG = logging.getLogger(__name__)

Expand All @@ -32,8 +33,16 @@ def __init__(self, config_file, debug=False):
else:
self.set_logger('kappa', logging.INFO)
self.config = yaml.load(config_file)
self._stack = kappa.stack.Stack(
self, self.config['cloudformation'])
if 'policy' in self.config.get('iam', ''):
self.policy = kappa.policy.Policy(
self, self.config['iam']['policy'])
else:
self.policy = None
if 'role' in self.config.get('iam', ''):
self.role = kappa.role.Role(
self, self.config['iam']['role'])
else:
self.role = None
self.function = kappa.function.Function(
self, self.config['lambda'])
self.event_sources = []
Expand All @@ -57,11 +66,7 @@ def lambda_config(self):

@property
def exec_role_arn(self):
return self._stack.exec_role_arn

@property
def invoke_role_arn(self):
return self._stack.invoke_role_arn
return self.role.arn

def debug(self):
self.set_logger('kappa', logging.DEBUG)
Expand Down Expand Up @@ -90,44 +95,64 @@ def set_logger(self, logger_name, level=logging.INFO):
log.addHandler(ch)

def _create_event_sources(self):
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
if 'event_sources' in self.config['lambda']:
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
else:
msg = 'Unsupported event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
elif svc == 'sns':
self.event_sources.append(
kappa.event_source.SNSEventSource(self,
event_source_cfg))
else:
msg = 'Unknown event source: %s' % event_source_cfg['arn']
raise ValueError(msg)

def add_event_sources(self):
for event_source in self.event_sources:
event_source.add(self.function)

def deploy(self):
self._stack.update()
self.function.upload()
def create(self):
if self.policy:
self.policy.create()
if self.role:
self.role.create()
self.function.create()

def test(self):
self.function.test()
def invoke(self):
return self.function.invoke()

def tail(self):
return self.function.tail()

def delete(self):
self._stack.delete()
if self.policy:
self.policy.delete()
if self.role:
self.role.delete()
self.function.delete()
for event_source in self.event_sources:
event_source.remove(self.function)

def status(self):
status = {}
status['stack'] = self._stack.status()
if self.policy:
status['policy'] = self.policy.status()
else:
status['policy'] = None
if self.role:
status['role'] = self.role.status()
else:
status['role'] = None
status['function'] = self.function.status()
status['event_sources'] = []
for event_source in self.event_sources:
status['event_sources'].append(event_source.status(self.function))
if self.event_sources:
for event_source in self.event_sources:
status['event_sources'].append(
event_source.status(self.function))
return status
69 changes: 60 additions & 9 deletions kappa/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def __init__(self, context, config):
def arn(self):
return self._config['arn']

@property
def starting_position(self):
return self._config.get('starting_position', 'TRIM_HORIZON')

@property
def batch_size(self):
return self._config.get('batch_size', 100)
Expand All @@ -44,21 +48,21 @@ def __init__(self, context, config):

def _get_uuid(self, function):
uuid = None
response = self._lambda.list_event_sources(
response = self._lambda.list_event_source_mappings(
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSources']) > 0:
uuid = response['EventSources'][0]['UUID']
if len(response['EventSourceMappings']) > 0:
uuid = response['EventSourceMappings'][0]['UUID']
return uuid

def add(self, function):
try:
response = self._lambda.add_event_source(
response = self._lambda.create_event_source_mapping(
FunctionName=function.name,
Role=self._context.invoke_role_arn,
EventSource=self.arn,
BatchSize=self.batch_size)
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add Kinesis event source')
Expand All @@ -67,15 +71,15 @@ def remove(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.remove_event_source(
response = self._lambda.delete_event_source_mapping(
UUID=uuid)
LOG.debug(response)
return response

def status(self, function):
LOG.debug('getting status for event source %s', self.arn)
try:
response = self._lambda.get_event_source(
response = self._lambda.get_event_source_mapping(
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
Expand Down Expand Up @@ -134,3 +138,50 @@ def status(self, function):
if 'CloudFunctionConfiguration' not in response:
response = None
return response


class SNSEventSource(EventSource):

def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._sns = aws.create_client('sns')

def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name

def exists(self, function):
try:
response = self._sns.list_subscriptions_by_topic(
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
if subscription['Endpoint'] == function.arn:
return subscription
return None
except Exception:
LOG.exception('Unable to find event source %s', self.arn)

def add(self, function):
try:
response = self._sns.subscribe(
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add SNS event source')

def remove(self, function):
LOG.debug('removing SNS event source')
try:
subscription = self.exists(function)
if subscription:
response = self._sns.unsubscribe(
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove event source %s', self.arn)

def status(self, function):
LOG.debug('status for SNS notification for %s', function.name)
return self.exist(function)
Loading

0 comments on commit 2bbf5fa

Please sign in to comment.