In [None]:
import boto3

kda_client = boto3.client('kinesisanalytics')
kinesis = boto3.client('kinesis')
iam = boto3.client('iam')

## Input and Output Streams

In [None]:
qs = kinesis.create_stream(
    StreamName='QuoteStream',
    ShardCount=1
)

print(qs)

In [None]:
cos = kinesis.create_stream(
    StreamName='ExampleOutputStream',
    ShardCount=1
)

print(cos)

## IAM Role and Policy

In [None]:
assume_role_policy_document="""{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
"""

In [None]:
cr = iam.create_role(
    RoleName='kda-sample-role',
    Path='/service-role/',
    AssumeRolePolicyDocument=assume_role_policy_document
)

print(cr)

In [None]:
import os
account_no = os.environ['PRODUCER_ACCOUNT_NO']

In [None]:
policy_document="""{
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "ReadCode",
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject",
                        "s3:GetObjectVersion"
                    ],
                    "Resource": [
                        "arn:aws:s3:::dskdaj-getting-started/*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogGroups",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogGroups"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + account_no + """:log-group:*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogStreams",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogStreams"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:*"
                    ]
                },
                {
                    "Sid": "PutCloudwatchLogs",
                    "Effect": "Allow",
                    "Action": [
                        "logs:PutLogEvents"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:kinesis-analytics-log-stream"
                    ]
                },
                {
                
                    "Sid": "ReadInputStream",
                    "Effect": "Allow",
                    "Action": [
                        "kinesis:*"
                    ],
                    "Resource": [
                        "arn:aws:kinesis:us-east-1:""" + account_no + """:stream/QuoteStream"
                    ]
                },
                {
                    "Sid": "WriteOutputStream",
                    "Effect": "Allow",
                    "Action": "kinesis:*",
                    "Resource": "arn:aws:kinesis:us-east-1:""" + account_no + """:stream/ExampleOutputStream"
                }
            ]
}"""

In [None]:
cp = iam.create_policy(
    PolicyName='kda-sample-app',
    Path='/service-role/',
    PolicyDocument=policy_document
)

print(cp)

In [None]:
ap = iam.attach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(account_no)
)

## Application Definition

In [None]:
ca = kda_client.create_application(
    ApplicationName='samplev1'
)
print(ca)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)
print(app_desc)

In [None]:
kda_client.add_application_input(
    ApplicationName='samplev1',
    CurrentApplicationVersionId=app_desc['ApplicationDetail']['ApplicationVersionId'],
    Input={
        'NamePrefix':'SOURCE_SQL_STREAM',
        'KinesisStreamsInput': {
            'ResourceARN': "arn:aws:kinesis:us-east-1:" + account_no + ":stream/QuoteStream",
            'RoleARN': 'arn:aws:iam::{}:role/service-role/kda-sample-role'.format(account_no),
        },
        'InputSchema': {
            "RecordFormat":{
                "RecordFormatType":"JSON",
                "MappingParameters":{
                   "JSONMappingParameters":{
                      "RecordRowPath":"$"
                   }
                }
             },
             "RecordEncoding":"UTF-8",
             "RecordColumns":[
                {
                   "Name":"specversion",
                   "Mapping":"$.specversion",
                   "SqlType":"DECIMAL(1,1)"
                },
                {
                   "Name":"type",
                   "Mapping":"$.type",
                   "SqlType":"VARCHAR(4)"
                },
                {
                   "Name":"source",
                   "Mapping":"$.source",
                   "SqlType":"VARCHAR(16)"
                },
                {
                   "Name":"subject",
                   "Mapping":"$.subject",
                   "SqlType":"VARCHAR(16)"
                },
                {
                   "Name":"id",
                   "Mapping":"$.id",
                   "SqlType":"VARCHAR(64)"
                },
                {
                   "Name":"COL_time",
                   "Mapping":"$.time",
                   "SqlType":"VARCHAR(32)"
                },
                {
                   "Name":"datacontenttype",
                   "Mapping":"$.datacontenttype",
                   "SqlType":"VARCHAR(16)"
                },
                {
                   "Name":"EVENT_TIME",
                   "Mapping":"$.data.EVENT_TIME",
                   "SqlType":"VARCHAR(32)"
                },
                {
                   "Name":"TICKER",
                   "Mapping":"$.data.TICKER",
                   "SqlType":"VARCHAR(4)"
                },
                {
                   "Name":"PRICE",
                   "Mapping":"$.data.PRICE",
                   "SqlType":"REAL"
                },
                {
                   "Name":"OWNER",
                   "Mapping":"$.owner",
                   "SqlType":"VARCHAR(12)"
                },
                {
                   "Name":"SYMBOL",
                   "Mapping":"$.symbol",
                   "SqlType":"VARCHAR(4)"
                },
                {
                   "Name":"AMOUNT",
                   "Mapping":"$.amount",
                   "SqlType":"REAL"
                }
             ]
        },
        "InputParallelism":{
            "Count":1
        }
    }
)

In [None]:
app_code="""CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), PRICE REAL);
CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM TICKER,PRICE
FROM "SOURCE_SQL_STREAM_001";
"""

In [None]:
# This one doesn't work
app_code="""CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM TICKER,MIN(PRICE), MAX(PRICE)
FROM "SOURCE_SQL_STREAM_001"
    GROUP BY TICKER
    STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
"""

In [None]:
app_code = """CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    TICKER VARCHAR(4), 
    sum_price     DOUBLE);
-- CREATE OR REPLACE PUMP to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM 
        TICKER,
        SUM(price) AS sum_price
    FROM "SOURCE_SQL_STREAM_001"
     GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
"""

In [None]:
app_code = """CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    TICKER VARCHAR(4), 
    min_price     DOUBLE);
-- CREATE OR REPLACE PUMP to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM 
        TICKER,
        MIN(price) AS min_price
    FROM "SOURCE_SQL_STREAM_001"
     GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
"""

In [None]:
app_code = """
CREATE OR REPLACE STREAM "POSITIONS_SQL_STREAM" (OWNER VARCHAR(12), SYMBOL VARCHAR(4), AMOUNT REAL);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "POSITIONS_SQL_STREAM"
SELECT STREAM OWNER, SYMBOL, AMOUNT
FROM "SOURCE_SQL_STREAM_001"
WHERE OWNER IS NOT NULL;

CREATE OR REPLACE STREAM "QUOTES_SQL_STREAM" (
    TICKER VARCHAR(4), 
    min_price     DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP2" AS 
  INSERT INTO "QUOTES_SQL_STREAM" 
    SELECT STREAM 
        TICKER,
        MIN(price) AS MIN_PRICE
    FROM "SOURCE_SQL_STREAM_001"
    WHERE OWNER IS NULL
     GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
     
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (OWNER VARCHAR(12), SYMBOL VARCHAR(4), AMOUNT REAL, MIN_PRICE REAL);
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS 
INSERT INTO "DESTINATION_SQL_STREAM"
  SELECT STREAM p.OWNER, p.SYMBOL, p.AMOUNT, q.MIN_PRICE
    FROM POSITIONS_SQL_STREAM AS p
      JOIN QUOTES_SQL_STREAM AS q
        ON p.SYMBOL = q.TICKER;

"""

In [None]:
print(app_code)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)

ua = kda_client.update_application(
    ApplicationName='samplev1',
    CurrentApplicationVersionId=app_desc['ApplicationDetail']['ApplicationVersionId'],
    ApplicationUpdate={
        'ApplicationCodeUpdate':app_code
    }
)

print(ua)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)

print(app_desc)

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)

ao = kda_client.add_application_output(
    ApplicationName='samplev1',
    CurrentApplicationVersionId=app_desc['ApplicationDetail']['ApplicationVersionId'],
    Output={
        'Name':'DESTINATION_SQL_STREAM',
        'KinesisStreamsOutput': {
            "ResourceARN":"arn:aws:kinesis:us-east-1:" + account_no + ":stream/ExampleOutputStream",
            'RoleARN': 'arn:aws:iam::{}:role/service-role/kda-sample-role'.format(account_no)
        },
        'DestinationSchema': {
            'RecordFormatType': 'JSON'
        }
    }
)

print(ao)

In [None]:
input_id = app_desc['ApplicationDetail']['InputDescriptions'][0]['InputId']
print(input_id)

In [None]:
kda_client.start_application(
    ApplicationName='samplev1',
    InputConfigurations=[
        {
            'Id': input_id,
            'InputStartingPositionConfiguration': {
                'InputStartingPosition':'NOW'
            }
        }
    ]
)

In [None]:
kda_client.describe_application(
    ApplicationName='samplev1'
)['ApplicationDetail']['ApplicationStatus']

## Clean Up

### Application

In [None]:
app_desc = kda_client.describe_application(
    ApplicationName='samplev1'
)
print(app_desc)

In [None]:
kda_client.delete_application(
    ApplicationName='samplev1',
    CreateTimestamp=app_desc['ApplicationDetail']['CreateTimestamp']
)

### IAM

In [None]:
policy_arn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(account_no)
print(policy_arn)

In [None]:
iam.detach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn=policy_arn
)

In [None]:
iam.delete_policy(
    PolicyArn=policy_arn
)

In [None]:
iam.delete_role(
    RoleName='kda-sample-role'
)

### Streams

In [None]:
kinesis.delete_stream(
    StreamName='QuoteStream'
)

In [None]:
kinesis.delete_stream(
    StreamName='ExampleOutputStream'
)