In [16]:
import os
import sys
import uuid

pipeline_dir = os.path.join(os.getcwd(), '..')
if pipeline_dir not in sys.path:
    sys.path.append(pipeline_dir)

In [17]:
from pipeline import Pipeline, events, resources

# Pipeline
## Creating a Pipeline
Create a new project with the `pipeline-create <new-directory>` command.  You can create a pipeline by inheriting from the `pipeline.Pipeline` base class (see `<new-directory>/handler.py`).  A pipeline contains functions and resources  A pipeline must contain at least one function.

In [4]:
class SimplestPipeline(Pipeline):
    
    def __init__(self):
        super().__init__()
    
    @events.invoke
    def lambda_func(self, event, context):
        print(event)

More complex pipelines will contain functions and resources.  The following pipeline will send a message to an SQS queue when a file is uploaded to a S3 Bucket.  The messages in the queue will then be processed by a lambda function.

In [5]:
class MyBucket(resources.S3Bucket):
    
    def __init__(self):
        super().__init__()

class MyQueue(resources.SQSQueue):
    
    def __init__(self):
        super().__init__()

bucket = MyBucket()
queue = MyQueue()

class NotSoSimplePipeline(Pipeline):
    
    def __init__(self):
        super().__init__(resources=[bucket, queue])
    
    @events.bucket_notification(bucket=bucket, event_type="s3:ObjectCreated:Put", destination=queue)
    def bucket_kickoff(self, event, context):
        print(f"A file was uploaded to s3://{event['bucket']}/{event['key']}")

pipeline = NotSoSimplePipeline()

## Deploying a Pipeline
When your pipeline is ready to deploy, add a deployment function to your script.

In [6]:
def deploy():
    pipeline.deploy()

Run the `pipeline-deploy <pipeline-directory>` command to deploy the pipeline.  This will generate a `serverless.yml` used by Serverless Framework to deploy your pipeline and an `outputs.yml` containing the outputs of the deployed CloudFormation stack.  You can use the `--dry-run` flag to only generate `serverless.yml` without deploying.

## Pipeline Functionality
The pipeline object provides an object-oriented interface for interacting with your resources and functions.
#### Functions
Access a function using the dictionary interface of the `functions` attribute.

In [9]:
my_func = pipeline.functions['bucket_kickoff']
print(type(my_func))

<class 'pipeline.functions.Function_BUCKET_NOTIFICATION'>


Functions have a trigger and a CloudFormation template describing the trigger.  In this case our trigger is a `bucket_notification` which sends a message to `MyQueue`.  The CloudFormation template describes the event trigger for our lambda function, which is `MyQueue`.

In [11]:
print("Trigger: {}".format(my_func.trigger))
print("CloudFormation Template: {}".format(my_func.template()))

Trigger: bucket_notification
CloudFormation Template: {'events': [{'sqs': {'arn': 'arn:aws:sqs:us-east-1:725820063953:MyQueue'}}]}


Once your pipeline is deployed to AWS, you can invoke your function client-side using the `invoke` method.  The specific input is different from event to event.

In [29]:
# This will error since pipeline is not deployed
try:
    my_func.invoke('testing')
except:
    print("Failed")

Failed


#### Resources
Access a resource using the dictionary interface of the `resources` attribute.

In [21]:
my_resource = pipeline.resources['MyBucket']
print(type(my_resource))

<class '__main__.MyBucket'>


Similar to functions, resources contain a CloudFormation event describing the resource:

In [22]:
print(my_resource)

{'Type': 'AWS::S3::Bucket', 'Properties': {'BucketName': 'mybucket'}}


Resources provide helper methods for interacing with the resource.  In this case `read_file`, `download_image`, `upload_file`, and `upload_image`

In [30]:
print(dir(my_resource))

['__class__', '__contains__', '__delattr__', '__delitem__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'arn', 'build_resource', 'clear', 'copy', 'download_image', 'fromkeys', 'get', 'items', 'keys', 'name', 'pop', 'popitem', 'read_file', 'resource', 'setdefault', 'update', 'upload_file', 'upload_image', 'values']


### Gotchas
Our pipeline exposes two ways of invoking functions.  The first is invoking the function through its function object.  The second is invoking the function through its associated resource object.  In our above pipeline, for example, we could call `pipeline.functions['bucket_kickoff'].invoke`.  We could also call `pipeline.resources['MyBucket'].upload_file`.  Why does the library expose multiple ways of invoking functions? 

The short answer is that different AWS events are invoked differently.  HTTP events, for example, require sending a message to an endpoint generated post-deployment.  This means that we don't have access to the endpoint at runtime and can't trigger our function.  This is not problematic with SNS events, for example, as the SNS Topic's ARN can be dynamically generated from other information available at runtime.  It is therefore neccessary to have two different ways of invoking functions.

Invoking through the function object requires an `output.yml` file which is automatically generated post-deployment.  Because it is generated post-deployment, it is not included in the lambda deployment package.  This means that calling `function.invoke` inside a lambda function is prone to fail for certain event types.  Invoking through the resource object will never fail inside lambda functions due to this reason.  As such, `function.invoke` is reccomended for client-side use while `resource.invoke` is reccomended for server-side use.

## Pipeline Execution Environment
You can change the region, runtime, and stage of your pipeline using the appropriate setters of the execution attribute
#### AWS Environment

In [25]:
class MyPipeline(Pipeline):
    
    def __init__(self):
        super().__init__()
        self.execution.runtime = "python2.7"
        self.execution.region = "us-west-2"
        self.execution.stage = "prod"

#### IAM Role
As of now, the library automatically generates the pipeline's IAM role based on the declared resources.  By default, the role is assigned `*` permission to each resource.  Future versions will restrict permissions to only the actions in use by the pipeline.

#### Dependencies
Deployment uses the Serverless Framework plugin `serverless-python-requirements` to install your pipeline dependencies from requirements.txt.  Alternatively, you can specify a lambda deployment zipfile in your pipeline's deployment function.

In [28]:
def deploy():
    pipeline.deploy(package='path/to/deployment.zip')

## Testing
Cognition-pipeline integrates with unittests through its object oriented framework of functions and resources.  In general, there are two types of functions to test.  The first type have direct returns.  An example is a lambda function which takes an input, does something, and returns an output directly to the client.  This type is the easiest to test, as we can directly invoke the function inside a test case and return an output for testing.

The second type does not have a direct return.  An example is a lambda function which listens to a SNS topic, does something to the message, and relays it to a DynamoDB table (or other resource).  In this situation there is no direct output returned to the client upon invocation.  This leaves us two options for testing.  The first is monitoring the affected resources to confirm our function is performing the right behavior.  The second is to create a new SQS queue which we can use to redirect messages clientside.  Check out the test cases `cognition-pipeline/tests` for examples of both of these techniques.