## Event Sourcing with Amazon Kinesis

This example demonstrates an event sourcing architecture using Amazon Kinesis. Records are loaded into the kinesis stream using a Lambda function which is integrated with Amazon API Gateway. The Lambda function takes the input from the gateway and writes it to an Amazon Kinesis data stream.

The event triggers 2 additional Lambda functions. The invoice function writes the event object to an Amazon S3 bucket, and the function writes the event data to Amazon DynamoDB. You would use this pattern or something similar, when you require real-time or near real-time data record processing.

There are 2 additional functions, one returns the orders from Amazon DynamoDB, the other returns objects from the Amazon S3 bucket.


![architecture](../images/architecture_4.png "Architecture")

**Jupyter Notebook Scripts**
1.	The first script creates items (orders) and posts them to an Amazon API Gateway which triggers a Lambda function. The Lambda function writes each item to an Amazon Kinesis Stream (event_sourcing_kinesis)

The Kinesis Stream triggers 2 Lambda functions. Both functions pull the records from the stream. The invoice function writes each record as object to an Amazon S3 bucket. The order function writes each record as an item to an Amazon DynamoDB table.

2.	The second script is a json formatter which renders json data into a readable format.

3.	The third script retrieves an item from the event_sourcing_kinesis_order table using the parition key (accountid) and sort key (vendorid)

4.	The fourth script makes a call to Amazon ApiGateway service which in turn triggers a function that generates a pre-signed url for an S3 object.

**Note:** Make sure you set gwid to your gateway id using - gwid = '...'

In [None]:
import json, boto3, requests, datetime

#Set gateway id
gwid = '...'
    
url = (f'https://{gwid}.execute-api.ap-southeast-2.amazonaws.com/prod/order')
    
for i in range(10):
    x = datetime.datetime.now()
    accountid = 'a' + str(i)
    vendorid = 'v' + str(i)
    orderdate = str(x)
    coffeetype = 'Short Black'
    coffeesize = 'Small'
    unitprice = str(4.50 * (i+1))
    quantity = str(i+1)

    response = requests.post(url,json={'order':{
                'accountid': accountid,
                'vendorid': vendorid,
                'orderdate':orderdate,
                'details':{
                    'coffeetype': coffeetype,
                    'coffeesize': coffeesize,
                    'unitprice': unitprice,
                    'quantity': quantity
                }
            }
        })
    print(response)

**json formatter** - Run the following script to create a class which will be used to render json objects in a readable format.

In [None]:
import json, uuid
from IPython.display import display_javascript, display_html, display

class RenderJSON(object):
    def __init__(self, json_data):
        if isinstance(json_data, dict) or isinstance(json_data, list):
            self.json_str = json.dumps(json_data)
        else:
            self.json_str = json_data
        self.uuid = str(uuid.uuid4())

    def _ipython_display_(self):
        display_html('<div id="{}" style="height: 600px; width:100%;font: 12px/18px monospace !important;"></div>'.format(self.uuid), raw=True)
        display_javascript("""
        require(["https://rawgit.com/caldwell/renderjson/master/renderjson.js"], function() {
            renderjson.set_show_to_level(2);
            document.getElementById('%s').appendChild(renderjson(%s))
        });
      """ % (self.uuid, self.json_str), raw=True)

**Get item** from the event_sourcing_kinesis_order table using the parition key (accountid) and sort key (vendorid)

In [None]:
#Get Script

url = (f'https://{gwid}.execute-api.ap-southeast-2.amazonaws.com/prod/order')

response_get = requests.get(url, params={'accountid':'a1','vendorid':'v1'})

RenderJSON(response_get.json())

**Generate Pre-signed URL**

In [None]:
#Get Script

url = (f'https://{gwid}.execute-api.ap-southeast-2.amazonaws.com/prod/invoice')

response_get = requests.get(url, params={'accountid':'a1'})

print(response_get.json())