Navigation Menu

Skip to content

Commit

Permalink
Add basic Lambda support
Browse files Browse the repository at this point in the history
- also: rename definition file to greengo.
  • Loading branch information
Dmitri committed Mar 28, 2018
1 parent 47811f1 commit f6a9c6c
Show file tree
Hide file tree
Showing 35 changed files with 1,117 additions and 31 deletions.
62 changes: 55 additions & 7 deletions greengo.py
Expand Up @@ -3,6 +3,7 @@
import fire
import json
import yaml
import shutil
from time import sleep
import logging
from boto3.session import Session
Expand All @@ -15,7 +16,9 @@
log.setLevel(logging.DEBUG)


STATE_FILE = '.group_state.json'
DEFINITION_FILE = 'greengo.yaml'
MAGIC_DIR = '.gg'
STATE_FILE = os.path.join(MAGIC_DIR, '.gg_state.json')


class GroupCommands(object):
Expand All @@ -25,10 +28,11 @@ def __init__(self):
session = Session()
self._gg = session.client("greengrass")
self._iot = session.client("iot")
self._lambda = session.client("lambda")
self._region = session.region_name
self._iot_endpoint = self._iot.describe_endpoint()['endpointAddress']

with open('group.yaml', 'r') as f:
with open(DEFINITION_FILE, 'r') as f:
self.group = self.group = yaml.safe_load(f)

self.state = _load_state()
Expand All @@ -38,11 +42,11 @@ def create(self):
log.error("Previously created group exists. Remove before creating!")
return False

log.info("[BEGIN] creating group {0}".format(self.group['name']))
log.info("[BEGIN] creating group {0}".format(self.group['Group']['name']))

# 1. Create group
# TODO: create group at the end, with "initial version"?
group = rinse(self._gg.create_group(Name=self.group['name']))
group = rinse(self._gg.create_group(Name=self.group['Group']['name']))
self.state['Group'] = group
_update_state(self.state)
# Must update state on every step, else how can I clean?
Expand All @@ -67,7 +71,7 @@ def create(self):
self.state['Group']['Version'] = group_ver
_update_state(self.state)

log.info("[END] creating group {0}".format(self.group['name']))
log.info("[END] creating group {0}".format(self.group['Group']['name']))

def deploy(self):
if not self.state:
Expand Down Expand Up @@ -130,6 +134,45 @@ def remove(self):

log.info("[END] removing group {0}".format(self.group['name']))

def create_lambdas(self):
self.state['Lambdas'] = []
for l in self.group['Lambdas']:
log.info("Creating Lambda function '{0}'".format(l['name']))

role_arn = l['role']

zf = shutil.make_archive(
os.path.join(MAGIC_DIR, l['name']), 'zip', l['package'])
log.debug("Lambda deployment Zipped to '{0}'".format(zf))

with open(zf, 'rb') as f:
lr = self._lambda.create_function(
FunctionName=l['name'],
Runtime='python2.7',
Role=role_arn,
Handler=l['handler'],
Code=dict(ZipFile=f.read()),
Environment=dict(Variables=l.get('environment', {}))
)

lr['ZipPath'] = zf
self.state['Lambdas'].append(rinse(lr))
_update_state(self.state)
log.info("Lambda function '{0}' created".format(lr['FunctionName']))

def remove_lambdas(self):
if not self.state and self.state.get('Lambdas'):
log.info("There seem to be nothing to remove.")
return

for l in self.state['Lambdas']:
log.info("Deleting Lambda function '{0}'".format(l['FunctionName']))
self._lambda.delete_function(FunctionName=l['FunctionName'])
os.remove(l['ZipPath'])

self.state.pop('Lambdas')
_update_state(self.state)

def _create_cores(self):
self.state['Cores'] = []
cores = []
Expand Down Expand Up @@ -262,7 +305,7 @@ def _create_and_attach_thing_policy(self, thing_name, policy_doc, thing_cert_arn
return policy

def _create_core_policy(self):
# TODO: redo as template and read from group.yaml
# TODO: redo as template and read from definition file
core_policy = {
"Version": "2012-10-17",
"Statement": [
Expand Down Expand Up @@ -334,10 +377,15 @@ def rinse(boto_response):


def _update_state(group_state):
if not group_state:
os.remove(STATE_FILE)
log.debug("State is empty, removed state file '{0}'".format(STATE_FILE))
return

with open(STATE_FILE, 'w') as f:
json.dump(group_state, f, indent=2,
separators=(',', ': '), sort_keys=True)
log.debug("Updated group state in state file: {0}".format(STATE_FILE))
log.debug("Updated group state in state file '{0}'".format(STATE_FILE))


def _state_exists():
Expand Down
27 changes: 27 additions & 0 deletions greengo.yaml
@@ -0,0 +1,27 @@
# Greeengrass Group definition file
Group:
name: ml_take2_group
Cores:
- name: ml_take2_core_1
key_path: ./certs
config_path: ./config
SyncShadow: False

Lambdas:
- name: GreengrassHelloWorld
handler: function.handler
package: lambdas/GreengrassHelloWorld
role: 'arn:aws:iam::012345678901:role/ExistingLambdaRole'
environment:
foo: bar

Subscriptions: # not implemented
- Id: 1
Source: ""
Subject: ""
Target: ""

Devices: # not implemented
- name: ml_take2_thing_1
key_path: ./certs

24 changes: 0 additions & 24 deletions group.yaml

This file was deleted.

Empty file.
38 changes: 38 additions & 0 deletions lambdas/GreengrassHelloWorld/function.py
@@ -0,0 +1,38 @@
import os
import sys
import platform
from threading import Timer

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

import greengrasssdk

INTERVAL = 5

# Creating a greengrass core sdk client
client = greengrasssdk.client('iot-data')

# Retrieving platform information to send from Greengrass Core
my_platform = platform.platform()


def run():
print "Executing run..."
if not my_platform:
client.publish(topic='hello/world', payload='Hello from Greengrass Core.')
else:
client.publish(
topic='hello/world',
payload='Hello from Greengrass Core running on platform: {}'.format(my_platform))

# Asynchronously schedule this function to be run again in 5 seconds
Timer(INTERVAL, run).start()

# Start executing the function above
run()


# This is a dummy handler and will not be invoked
# Instead the code above will be executed in an infinite loop for our example
def handler(event=None, context=None):
return
6 changes: 6 additions & 0 deletions lambdas/GreengrassHelloWorld/greengrass_common/__init__.py
@@ -0,0 +1,6 @@
#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#

from .function_arn_fields import FunctionArnFields
from .greengrass_message import GreengrassMessage
Binary file not shown.
@@ -0,0 +1,16 @@
#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# this log appender is shared among all components of python lambda runtime, including:
# greengrass_common/greengrass_message.py, greengrass_ipc_python_sdk/ipc_client.py,
# greengrass_ipc_python_sdk/utils/exponential_backoff.py, lambda_runtime/lambda_runtime.py.
# so that all log records will be emitted to local Cloudwatch.
import logging.handlers

from greengrass_common.local_cloudwatch_handler import LocalCloudwatchLogHandler

# https://docs.python.org/2/library/logging.html#logrecord-attributes
LOCAL_CLOUDWATCH_FORMAT = '[%(levelname)s]-%(filename)s:%(lineno)d,%(message)s'

local_cloudwatch_handler = LocalCloudwatchLogHandler('GreengrassSystem', 'python_runtime')
local_cloudwatch_handler.setFormatter(logging.Formatter(LOCAL_CLOUDWATCH_FORMAT))
Binary file not shown.
10 changes: 10 additions & 0 deletions lambdas/GreengrassHelloWorld/greengrass_common/env_vars.py
@@ -0,0 +1,10 @@
#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#

import os

AUTH_TOKEN = os.getenv('AWS_CONTAINER_AUTHORIZATION_TOKEN')
MY_FUNCTION_ARN = os.getenv('MY_FUNCTION_ARN')
SHADOW_FUNCTION_ARN = os.getenv('SHADOW_FUNCTION_ARN')
ROUTER_FUNCTION_ARN = os.getenv('ROUTER_FUNCTION_ARN')
Binary file not shown.
@@ -0,0 +1,46 @@
#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#

import re

ARN_FIELD_REGEX = \
'arn:aws:lambda:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):function:([a-zA-Z0-9-_]+)(?::(\$LATEST|[a-zA-Z0-9-_]+))?'


class FunctionArnFields:
"""
This class takes in a string representing a Lambda function's ARN (the qualifier is optional), parses that string
into individual fields for region, account_id, name and qualifier. It also has a static method for creating a
Function ARN string from those subfields.
"""
@staticmethod
def build_arn_string(region, account_id, name, qualifier):
if qualifier:
return 'arn:aws:lambda:{region}:{account_id}:function:{name}:{qualifier}'.format(
region=region, account_id=account_id, name=name, qualifier=qualifier
)
else:
return 'arn:aws:lambda:{region}:{account_id}:function:{name}'.format(
region=region, account_id=account_id, name=name
)

def __init__(self, function_arn_string):
self.parse_function_arn(function_arn_string)

def parse_function_arn(self, function_arn_string):
regex_match = re.match(ARN_FIELD_REGEX, function_arn_string)
if regex_match:
region, account_id, name, qualifier = map(
lambda s: s.replace(':', '') if s else s, regex_match.groups()
)
else:
raise ValueError('Cannot parse given string as a function ARN.')

self.region = region
self.account_id = account_id
self.name = name
self.qualifier = qualifier

def to_arn_string(self):
return FunctionArnFields.build_arn_string(self.region, self.account_id, self.name, self.qualifier)
Binary file not shown.
@@ -0,0 +1,76 @@
#
# Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#

import base64
import json
import logging
from greengrass_common.common_log_appender import local_cloudwatch_handler

# Log messages here are not part of customer's log because anything that
# goes wrong here has nothing to do with customer's lambda code. Since we configured
# the root logger to log to customer's log, we need to turn off propagation.
runtime_logger = logging.getLogger(__name__)
runtime_logger.addHandler(local_cloudwatch_handler)
runtime_logger.propagate = False
# set to the lowest possible level so all log messages will be sent to local cloudwatch handler
runtime_logger.setLevel(logging.DEBUG)


class GreengrassMessage:
"""
Holds the payload and extension_map fields making up messages exchanged over the IPC. Provides methods for encoding
and decoding to/from strings.
"""

def __init__(self, payload=b'', **extension_map):
self.payload = payload
self.extension_map = extension_map

@classmethod
def decode(cls, encoded_string):
if encoded_string:
try:
data_map = json.loads(encoded_string)
except (ValueError, TypeError) as e:
runtime_logger.exception(e)
raise ValueError('Could not load provided encoded string "{}" as JSON due to exception: {}'.format(
repr(encoded_string), str(e)
))

try:
payload = base64.b64decode(data_map['Payload'])
except (ValueError, TypeError) as e:
runtime_logger.exception(e)
raise ValueError(
'Could not decode payload of Greengrass Message data'
'"{}" from base64 due to exception: {}'.format(repr(data_map), str(e))
)

extension_map = data_map['ExtensionMap_']
else:
payload = None
extension_map = {}

return cls(payload, **extension_map)

def encode(self):
try:
# .decode to convert bytes -> string
payload = base64.b64encode(self.payload).decode()
except (ValueError, TypeError) as e:
runtime_logger.exception(e)
raise ValueError('Could not encode Greengrass Message payload "{}" as base64 due to exception: {}'.format(
repr(self.payload), str(e)
))

try:
return json.dumps({'Payload': payload, 'ExtensionMap_': self.extension_map})
except (ValueError, TypeError) as e:
runtime_logger.exception(e)
raise ValueError('Could not encode Greengrass Message fields "{}" as JSON due to exception: {}'.format(
str(self), str(e)
))

def __str__(self):
return str({'Payload': self.payload, 'ExtensionMap_': self.extension_map})
Binary file not shown.

0 comments on commit f6a9c6c

Please sign in to comment.