# Process Status Tracking Service

## Introduction

This notebook explores the design of a status tracking service that allows a model of
a multi-step process to be created and its state progression maintained as events
are emitted from various systems contributing to the overall progression of the process.

In the most general case, an abstraction of the steps a business performs to satisfy
a customer service request is created with the purpose of helping the customer
understand the end to end process, and the current state of things. Assume several line of
business systems are involved, perhaps coordinated by a BPM system, with some tasks automated
and others performed manually.

If the events that signal activity related to the customer service request can be captured, then
they can be correlated to the completion and progression of the milestones captured in the 
status model.

## General Model

In the general model, a sequence of events results in a sequence of state transitions in the status model.

<pre>
e0 -> [create model]
e1 -> t1
e2 -> t2
...
en -> tn
</pre>

The above can be model as applying a function to the event and the model state to get the new state of the model:

<pre>
f(e,m) -> m'
</pre>

To build a general system, we need some context to be present on all events:

* A transaction id use to correlate all the events associated with a specific service request
* A model id to indicate the model the events are applied to for the purpose of maintaining the 
model state

For a service that can maintain models on behalf of multiple service request types, there needs to
be a way to partition the event processing space by model. When an event is received by the system, the
model state transition function is selected using the model id on the event. Then, the transaction id is
used to select the model instance used as the input (along with the event) to the state transtion function

## AWS Implementation

For an AWS implementation, Kinesis can be used to feed events into the system. A lambda function can be defined to receive the events, and look up the lambda function used to process the event, then invoke the lambda function with the event. The lambda function can perform the model instance state retrieval, then apply the state transition function based on the model instance and the event.

Note DynamoDB is used to store both the lambda function reference for a model, and to store model instance state as well.

### Why Not Use Step Functions?

On the surface, Step Functions might seem like the appropriate service in AWS to define and maintain model state. However, it is not a good fit in this case as receipt of the event indicated the work needed to transition model state has already occured, as opposed to the Step Function engine scheduling an activity to be performed by a worker process.

## DynamoDB Tables

The following CloudFormation creates the DynamoDB tables used in this implementation.

In [None]:
cf_bucket = '84101-demo-bucket'
template_name = 'ddb.yml'
ddb_stack = 'sddb'

In [None]:
%%bash -s "$template_name" "$cf_bucket"
echo $2
ls $1
aws s3 cp ./$1 s3://$2

In [None]:
import boto3

client = boto3.client('cloudformation')

In [None]:
def form_s3_url_prefix(region):
    prefix = ''
    if region == 'us-east-1':
        prefix = 'https://s3.amazonaws.com'
    else:
        prefix = 'https://s3-' + region + '.amazonaws.com'
    return prefix

In [None]:
import os
region = os.environ['AWS_DEFAULT_REGION']
bucketRoot = form_s3_url_prefix(region) + '/' + cf_bucket
print bucketRoot

In [None]:
response = client.create_stack(
    StackName=ddb_stack,
    TemplateURL=bucketRoot + '/' + template_name
)

print response

In [None]:
# Wait for it...
waiter = client.get_waiter('stack_create_complete')
waiter.wait(
    StackName=ddb_stack
)

print 'stack created'

## Kinesis Stream and Event Delegation

The front end of the event processing runtime is a Kinesis stream with a lambda that reads the records written to the stream.

In [None]:
# Stage the cloud formation and the zip file containing the lambda function.
lambda_zip = 'kreader.zip'
front_end_cf = 'k2l.yml'
front_end_stack='sfe'

In [None]:
%%bash -s "$cf_bucket" "$front_end_cf" "$lambda_zip"
make
aws s3 cp $2 s3://$1
aws s3 cp $3 s3://$1

In [None]:
response = client.create_stack(
    StackName=front_end_stack,
    TemplateURL=bucketRoot + '/' + front_end_cf,
    Parameters=[
        {
            'ParameterKey': 'CodeBucketName',
            'ParameterValue': cf_bucket
        }
    ],
    Capabilities=[
        'CAPABILITY_IAM',
    ]
)

print response

In [None]:
# Wait for it...
waiter = client.get_waiter('stack_create_complete')
waiter.wait(
    StackName=front_end_stack
)

print 'stack created'

### Try Out the Event Delegation Lambda Function

The cloud formation template creates both the lambda function, the kinesis stream, and ties the two together. From this notebook we can call the function directly as well.

In [None]:
# First get the lambda function that was created in the stack
response = client.list_stack_resources(
    StackName=front_end_stack
)

print response

In [None]:
resource_name = 'KinesisReader'
lambda_fn = ''

resource_summaries = response['StackResourceSummaries']

for r in resource_summaries:
    if r['LogicalResourceId'] == resource_name:
        lambda_fn = r['PhysicalResourceId']
        
print 'function created by stack is {}'.format(lambda_fn)
    

In [None]:
import base64

event_data = base64.b64encode('{"foo":"data for you"}')

In [None]:
event_payload = '''
{
  "Records": [
    {
      "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
      "eventVersion": "1.0",
      "kinesis": {
        "approximateArrivalTimestamp": 1428537600,
        "partitionKey": "partitionKey-3",
        "data": "''' + event_data + '''",
        "kinesisSchemaVersion": "1.0",
        "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
      },
      "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
      "eventName": "aws:kinesis:record",
      "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
      "eventSource": "aws:kinesis",
      "awsRegion": "us-east-1"
    }
  ]
}
'''

print event_payload

In [None]:
# Invoke the function synchronously, and print the output from the logs
import base64

lambda_client = boto3.client('lambda')

response = lambda_client.invoke(
    FunctionName=lambda_fn,
#    InvocationType='Event',
    LogType='Tail',
    ClientContext=base64.b64encode('{"ctx":"what, me worry?"}'),
    Payload=event_payload
)

log_result = base64.b64decode(response['LogResult'])
print log_result

## Clean Up

This section clean up the stacks created earlier

In [None]:
response = client.delete_stack(
    StackName=front_end_stack
)

print response



In [None]:
print 'waiting for destroy of {}...'.format(front_end_stack)
waiter = client.get_waiter('stack_delete_complete')
waiter.wait(
    StackName=front_end_stack
)

print 'front end stack destroyed'

In [None]:
print 'destroy {}'.format(ddb_stack)
response = client.delete_stack(
    StackName=ddb_stack
)

print response

print 'waiting for destroy of {}...'.format(ddb_stack)
waiter.wait(
    StackName=ddb_stack
)

print 'ddb_stack stack destroyed'