In [None]:
import boto3

kda_client = boto3.client('kinesisanalytics')
kinesis = boto3.client('kinesis')
iam = boto3.client('iam')

## Input and Output Streams

In [None]:
cis = kinesis.create_stream(
    StreamName='ExampleInputStream',
    ShardCount=1
)

print(cis)

In [None]:
cos = kinesis.create_stream(
    StreamName='ExampleOutputStream',
    ShardCount=1
)

print(cos)

## Input Generator

In [None]:
import datetime
import json
import random
import boto3
import uuid
import time

def get_data():
    return {
        'things': [{'a':1},{'b':2}],
        'stuff':{'key':{'foo': True, 'bar': False}},
        'EVENT_TIME': datetime.datetime.now().isoformat(),
        'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'PRICE': round(random.random() * 100, 2)}

def get_cloud_event():
    return {
        "specversion" : "1.0",
        "type" : "tick",
        "source" : "sample-stream",
        "subject" : "delayed-data",
        "id" : str(uuid.uuid4()),
        "time" : datetime.datetime.now().isoformat(),
        "datacontenttype" : "application/json",
        "data" : get_data()
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_cloud_event()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")
        time.sleep(1)

In [None]:
# Run it
generate("ExampleInputStream", kinesis)

## IAM Role and Policy

In [None]:
assume_role_policy_document="""{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
"""

In [None]:
cr = iam.create_role(
    RoleName='kda-sample-role',
    Path='/service-role/',
    AssumeRolePolicyDocument=assume_role_policy_document
)

print(cr)

In [None]:
import os
account_no = os.environ['PA_ACCOUNT_NO']

In [None]:
policy_document="""{
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "ReadCode",
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject",
                        "s3:GetObjectVersion"
                    ],
                    "Resource": [
                        "arn:aws:s3:::dskdaj-getting-started/*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogGroups",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogGroups"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-west-2:""" + account_no + """:log-group:*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogStreams",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogStreams"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-west-2:""" + account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:*"
                    ]
                },
                {
                    "Sid": "PutCloudwatchLogs",
                    "Effect": "Allow",
                    "Action": [
                        "logs:PutLogEvents"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-west-2:""" + account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:kinesis-analytics-log-stream"
                    ]
                },
                {
                
                    "Sid": "ReadInputStream",
                    "Effect": "Allow",
                    "Action": [
                        "kinesis:*"
                    ],
                    "Resource": [
                        "arn:aws:kinesis:us-west-2:""" + account_no + """:stream/ExampleInputStream"
                    ]
                },
                {
                    "Sid": "WriteOutputStream",
                    "Effect": "Allow",
                    "Action": "kinesis:*",
                    "Resource": "arn:aws:kinesis:us-west-2:""" + account_no + """:stream/ExampleOutputStream"
                },
                {
                    "Sid": "UseLambdaFunction",
                    "Effect": "Allow",
                    "Action": [
                        "lambda:InvokeFunction",
                        "lambda:GetFunctionConfiguration"
                    ],
                    "Resource": [
                        "arn:aws:lambda:us-west-2:""" + account_no + """:function:postproc-dev-pp:$LATEST"
                    ]
                }
            ]
}"""

In [None]:
cp = iam.create_policy(
    PolicyName='kda-sample-app',
    Path='/service-role/',
    PolicyDocument=policy_document
)

print(cp)

In [None]:
ap = iam.attach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(account_no)
)

## Application Definition

In [None]:
ca = kda_client.create_application(
    ApplicationName='samplev1'
)
print(ca)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)
print(app_desc)

In [None]:
kda_client.add_application_input(
    ApplicationName='samplev1',
    CurrentApplicationVersionId=app_desc['ApplicationDetail']['ApplicationVersionId'],
    Input={
        'NamePrefix':'SOURCE_SQL_STREAM',
        'KinesisStreamsInput': {
            'ResourceARN': "arn:aws:kinesis:us-west-2:" + account_no + ":stream/ExampleInputStream",
            'RoleARN': 'arn:aws:iam::{}:role/service-role/kda-sample-role'.format(account_no),
        },
        'InputSchema': {
            "RecordFormat":{
                "RecordFormatType":"JSON",
                "MappingParameters":{
                   "JSONMappingParameters":{
                      "RecordRowPath":"$"
                   }
                }
             },
             "RecordEncoding":"UTF-8",
             "RecordColumns":[
                {
                   "Name":"specversion",
                   "Mapping":"$.specversion",
                   "SqlType":"DECIMAL(1,1)"
                },
                {
                   "Name":"type",
                   "Mapping":"$.type",
                   "SqlType":"VARCHAR(4)"
                },
                {
                   "Name":"source",
                   "Mapping":"$.source",
                   "SqlType":"VARCHAR(16)"
                },
                {
                   "Name":"subject",
                   "Mapping":"$.subject",
                   "SqlType":"VARCHAR(16)"
                },
                {
                   "Name":"id",
                   "Mapping":"$.id",
                   "SqlType":"VARCHAR(64)"
                },
                {
                   "Name":"COL_time",
                   "Mapping":"$.time",
                   "SqlType":"VARCHAR(32)"
                },
                {
                   "Name":"datacontenttype",
                   "Mapping":"$.datacontenttype",
                   "SqlType":"VARCHAR(16)"
                },
                {
                   "Name":"EVENT_TIME",
                   "Mapping":"$.data.EVENT_TIME",
                   "SqlType":"VARCHAR(32)"
                },
                {
                   "Name":"TICKER",
                   "Mapping":"$.data.TICKER",
                   "SqlType":"VARCHAR(4)"
                },
                {
                   "Name":"PRICE",
                   "Mapping":"$.data.PRICE",
                   "SqlType":"REAL"
                }
             ]
        },
        "InputParallelism":{
            "Count":1
        }
    }
)

In [None]:
app_code="""CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (event_count integer);
CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM COUNT(*) AS "event_count"
FROM "SOURCE_SQL_STREAM_001"
-- Uses a 10-second tumbling time window
GROUP BY FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
"""

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)

ua = kda_client.update_application(
    ApplicationName='samplev1',
    CurrentApplicationVersionId=app_desc['ApplicationDetail']['ApplicationVersionId'],
    ApplicationUpdate={
        'ApplicationCodeUpdate':app_code
    }
)

print(ua)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)

print(app_desc)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)

ao = kda_client.add_application_output(
    ApplicationName='samplev1',
    CurrentApplicationVersionId=app_desc['ApplicationDetail']['ApplicationVersionId'],
    Output={
        'Name':'DESTINATION_SQL_STREAM',
        'LambdaOutput': {
            "ResourceARN":"arn:aws:lambda:us-west-2:" + account_no + ":function:postproc-dev-pp:$LATEST",
            'RoleARN': 'arn:aws:iam::{}:role/service-role/kda-sample-role'.format(account_no)
        },
        'DestinationSchema': {
            'RecordFormatType': 'JSON'
        }
    }
)

print(ao)

In [None]:
input_id = app_desc['ApplicationDetail']['InputDescriptions'][0]['InputId']
print(input_id)

In [None]:
kda_client.start_application(
    ApplicationName='samplev1',
    InputConfigurations=[
        {
            'Id': input_id,
            'InputStartingPositionConfiguration': {
                'InputStartingPosition':'NOW'
            }
        }
    ]
)

In [None]:
kda_client.describe_application(
    ApplicationName='samplev1'
)['ApplicationDetail']['ApplicationStatus']

## Read Output

In [None]:
import time

def read_stream(stream_name):
    shards = kinesis.list_shards(
        StreamName='ExampleOutputStream'
    )
    
    itor = kinesis.get_shard_iterator(
        StreamName='ExampleOutputStream',
        ShardIteratorType='LATEST',
        ShardId=shards['Shards'][0]['ShardId']
    )
    
    shardIterator = ShardIterator=itor['ShardIterator']
    
    while True:
        recs = kinesis.get_records(
            ShardIterator=shardIterator
        )
        
        if(len(recs['Records']) == 0):
            time.sleep(1)

        for rec in recs['Records']:
            print(rec['Data'])
        shardItor = recs['NextShardIterator']

In [None]:
read_stream('ExampleOutputStream')

## Clean Up

### Application

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)
print(app_desc)

In [None]:
kda_client.delete_application(
    ApplicationName='samplev1',
    CreateTimestamp=app_desc['ApplicationDetail']['CreateTimestamp']
)

### IAM

In [None]:
policy_arn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(account_no)
print(policy_arn)

In [None]:
iam.detach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn=policy_arn
)

In [None]:
iam.delete_policy(
    PolicyArn=policy_arn
)

In [None]:
iam.delete_role(
    RoleName='kda-sample-role'
)

### Streams

In [None]:
kinesis.delete_stream(
    StreamName='ExampleInputStream'
)

In [None]:
kinesis.delete_stream(
    StreamName='ExampleOutputStream'
)