## Kinesis Data Analytics for SQL Sample



In [None]:
import boto3

kda_client = boto3.client('kinesisanalyticsv2')
kinesis = boto3.client('kinesis')
iam = boto3.client('iam')
kda2 = boto3.client('kinesisanalyticsv2')

### Input and Output Streams

In [None]:
cis = kinesis.create_stream(
    StreamName='ExampleInputStream',
    ShardCount=1
)

print(cis)

In [None]:
cos = kinesis.create_stream(
    StreamName='ExampleOutputStream',
    ShardCount=1
)

print(cos)

### IAM Role and Policy




In [None]:
assume_role_policy_document="""{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
"""

In [None]:
cr = iam.create_role(
    RoleName='kda-sample-role',
    Path='/service-role/',
    AssumeRolePolicyDocument=assume_role_policy_document
)

print(cr)

In [None]:
import os
account_no = os.environ['PRODUCER_ACCOUNT_NO']

In [None]:
policy_document="""{
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "ReadCode",
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject",
                        "s3:GetObjectVersion"
                    ],
                    "Resource": [
                        "arn:aws:s3:::dskdaj-getting-started/*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogGroups",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogGroups"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + account_no + """:log-group:*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogStreams",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogStreams"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:*"
                    ]
                },
                {
                    "Sid": "PutCloudwatchLogs",
                    "Effect": "Allow",
                    "Action": [
                        "logs:PutLogEvents"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:kinesis-analytics-log-stream"
                    ]
                },
                {
                
                    "Sid": "ReadInputStream",
                    "Effect": "Allow",
                    "Action": [
                        "kinesis:*"
                    ],
                    "Resource": [
                        "arn:aws:kinesis:us-east-1:""" + account_no + """:stream/ExampleInputStream"
                    ]
                },
                {
                    "Sid": "WriteOutputStream",
                    "Effect": "Allow",
                    "Action": "kinesis:*",
                    "Resource": "arn:aws:kinesis:us-east-1:""" + account_no + """:stream/ExampleOutputStream"
                }
            ]
}"""

In [None]:
cp = iam.create_policy(
    PolicyName='kda-sample-app',
    Path='/service-role/',
    PolicyDocument=policy_document
)

print(cp)

In [None]:
ap = iam.attach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(account_no)
)

### Application Definition

In [None]:
app_code="""CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (event_count integer);
CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM COUNT(*) AS "event_count"
FROM "SOURCE_SQL_STREAM_001"
-- Uses a 10-second tumbling time window
GROUP BY FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
"""

In [None]:
print(app_code)

In [None]:
kca = kda2.create_application(
    ApplicationName='sample',
    ApplicationDescription='Sample getting started application',
    RuntimeEnvironment='SQL-1_0',
    ServiceExecutionRole='arn:aws:iam::{}:role/service-role/kda-sample-role'.format(account_no),
    ApplicationConfiguration={
        'ApplicationCodeConfiguration': {
            'CodeContent': {
                'TextContent': app_code
            },
            'CodeContentType': 'PLAINTEXT'
        },
        'SqlApplicationConfiguration': {
            'Inputs': [
                {
                    'NamePrefix': 'SOURCE_SQL_STREAM',
                    'KinesisStreamsInput': {
                        'ResourceARN': "arn:aws:kinesis:us-east-1:" + account_no + ":stream/ExampleInputStream"
                    },
                    
                    "InputSchema":{
                     "RecordFormat":{
                        "RecordFormatType":"JSON",
                        "MappingParameters":{
                           "JSONMappingParameters":{
                              "RecordRowPath":"$"
                           }
                        }
                     },
                     "RecordEncoding":"UTF-8",
                     "RecordColumns":[
                        {
                           "Name":"specversion",
                           "Mapping":"$.specversion",
                           "SqlType":"DECIMAL(1,1)"
                        },
                        {
                           "Name":"type",
                           "Mapping":"$.type",
                           "SqlType":"VARCHAR(4)"
                        },
                        {
                           "Name":"source",
                           "Mapping":"$.source",
                           "SqlType":"VARCHAR(16)"
                        },
                        {
                           "Name":"subject",
                           "Mapping":"$.subject",
                           "SqlType":"VARCHAR(16)"
                        },
                        {
                           "Name":"id",
                           "Mapping":"$.id",
                           "SqlType":"VARCHAR(64)"
                        },
                        {
                           "Name":"COL_time",
                           "Mapping":"$.time",
                           "SqlType":"VARCHAR(32)"
                        },
                        {
                           "Name":"datacontenttype",
                           "Mapping":"$.datacontenttype",
                           "SqlType":"VARCHAR(16)"
                        },
                        {
                           "Name":"EVENT_TIME",
                           "Mapping":"$.data.EVENT_TIME",
                           "SqlType":"VARCHAR(32)"
                        },
                        {
                           "Name":"TICKER",
                           "Mapping":"$.data.TICKER",
                           "SqlType":"VARCHAR(4)"
                        },
                        {
                           "Name":"PRICE",
                           "Mapping":"$.data.PRICE",
                           "SqlType":"REAL"
                        }
                     ]
                  },
                  "InputParallelism":{
                     "Count":1
                  }
                },
            ],
            'Outputs': [
                {
                    "Name":"DESTINATION_SQL_STREAM",
                  "KinesisStreamsOutput":{
                     "ResourceARN":"arn:aws:kinesis:us-east-1:" + account_no + ":stream/ExampleOutputStream"
                  },
                  "DestinationSchema":{
                     "RecordFormatType":"JSON"
                  }
                },
            ]
        }
        
    },
    CloudWatchLoggingOptions=[
        {
            "LogStreamARN":"arn:aws:logs:us-east-1:{}:log-group:/aws/kinesis-analytics/sample:log-stream:kinesis-analytics-log-stream".format(account_no)
        }
    ]
    
)
print(kca)

In [None]:
ad = kda2.describe_application(
    ApplicationName='sample'
)

input_id = ad['ApplicationDetail']['ApplicationConfigurationDescription']['SqlApplicationConfigurationDescription']['InputDescriptions'][0]['InputId']
print(input_id)

In [None]:
sao = kda2.start_application(
    ApplicationName='sample',
    RunConfiguration={
        'SqlRunConfigurations': [
            {
                'InputId': input_id,
                'InputStartingPositionConfiguration': {
                    'InputStartingPosition': 'TRIM_HORIZON'
                }
            },
        ],
    } 
)
print(sao)

In [None]:
# List applications and see their states...
kda2.list_applications()

### Input Generator

In [None]:
import datetime
import json
import random
import boto3
import uuid
import time

def get_data():
    return {
        'EVENT_TIME': datetime.datetime.now().isoformat(),
        'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'PRICE': round(random.random() * 100, 2)}

def get_cloud_event():
    return {
        "specversion" : "1.0",
        "type" : "tick",
        "source" : "sample-stream",
        "subject" : "delayed-data",
        "id" : str(uuid.uuid4()),
        "time" : datetime.datetime.now().isoformat(),
        "datacontenttype" : "application/json",
        "data" : get_data()
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_cloud_event()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")
        time.sleep(1)

In [None]:
# Run it
generate("ExampleInputStream", kinesis)

### Read From Output

In [None]:
shards = kinesis.list_shards(
    StreamName='ExampleOutputStream'
)
print(shards)

In [None]:
itor = kinesis.get_shard_iterator(
    StreamName='ExampleOutputStream',
    ShardIteratorType='TRIM_HORIZON',
    ShardId=shards['Shards'][0]['ShardId']
)
print(itor)

In [None]:
kinesis.get_records(
    ShardIterator=itor['ShardIterator']
)

## Clean Up

#### Application

In [None]:
kda2.stop_application(
    ApplicationName='sample'
)

In [None]:
da = kda2.describe_application(
    ApplicationName='sample'
)
create_timestamp = da['ApplicationDetail']['CreateTimestamp']

In [None]:
from bson import json_util
import json

json.dumps(da, default=json_util.default)

In [None]:
summaries = kda2.list_applications()['ApplicationSummaries']
print(summaries)

In [None]:
kda2.delete_application(
    ApplicationName='sample',
    CreateTimestamp=create_timestamp
)

#### IAM

In [None]:
policy_arn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(account_no)
print(policy_arn)

In [None]:
iam.detach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn=policy_arn
)

In [None]:
iam.delete_policy(
    PolicyArn=policy_arn
)

In [None]:
iam.delete_role(
    RoleName='kda-sample-role'
)

#### Streams

In [None]:
kinesis.delete_stream(
    StreamName='ExampleInputStream'
)

In [None]:
kinesis.delete_stream(
    StreamName='ExampleOutputStream'
)

In [None]:
kinesis.list_streams()