# Getting Started

Attempted walkthrough of the [getting started](https://docs.aws.amazon.com/kinesisanalytics/latest/java/get-started-exercise.html) tutorial for KDA-J with the [two account modifications](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html).

## Source Account

In [None]:
import boto3

source_session = boto3.Session(profile_name='pa')

source_kda_client = source_session.client('kinesisanalyticsv2')
source_kinesis = source_session.client('kinesis')
source_iam = source_session.client('iam')
source_kda2 = source_session.client('kinesisanalyticsv2')

### Input Stream

In [None]:
cis = source_kinesis.create_stream(
    StreamName='ExampleInputStream',
    ShardCount=1
)

print(cis)

### Input Generator

In [None]:
import datetime
import json
import random
import boto3

def get_data():
    return {
        'EVENT_TIME': datetime.datetime.now().isoformat(),
        'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'PRICE': round(random.random() * 100, 2)}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

In [None]:
# Run it
generate("ExampleInputStream", source_kinesis)

### Source Policy

In the source account, we need a policy the sink account can consume to give access to the input stream.

*** Note *** Do this after the Sink role is created

In [None]:
import os
source_account = os.environ['PRODUCER_ACCOUNT_NO']
sink_account = os.environ['CONSUMER_ACCOUNT_NO']

In [None]:
input_stream_reader_trust_relationship="""{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "AWS": "arn:aws:iam::""" + sink_account + """:role/service-role/kda-sample-role"
        },
        "Action": "sts:AssumeRole"
      }
    ]
}"""

In [None]:
input_stream_reader_policy="""{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": 
               "arn:aws:kinesis:us-east-1:""" + source_account + """:stream/ExampleInputStream"
        }
    ]
}"""
print(input_stream_reader_policy)

In [None]:
cr = source_iam.create_role(
    RoleName='KA-Source-Stream-Role',
    Path='/service-role/',
    AssumeRolePolicyDocument=input_stream_reader_trust_relationship
)

print(cr)

In [None]:
cp = source_iam.create_policy(
    PolicyName='kda-stream-reader',
    Path='/service-role/',
    PolicyDocument=input_stream_reader_policy
)

print(cp)

In [None]:
ap = source_iam.attach_role_policy(
    RoleName='KA-Source-Stream-Role',
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-stream-reader".format(source_account)
)

## Sink Account

In [None]:
import boto3

sink_session = boto3.Session(profile_name='ca')

sink_kda_client = sink_session.client('kinesisanalyticsv2')
sink_kinesis = sink_session.client('kinesis')
sink_iam = sink_session.client('iam')
sink_kda2 = sink_session.client('kinesisanalyticsv2')

In [None]:
cos = sink_kinesis.create_stream(
    StreamName='ExampleOutputStream',
    ShardCount=1
)

print(cos)

### Application Code File

Compile the getting started code, create a bucket, and upload it to the bucket.

e.g.

```
mvn package
aws s3api create-bucket --bucket dskdaj-getting-started-2
aws s3 cp target/kda-java-apps-2-accounts-1.0.jar  s3://dskdaj-getting-started-2/kda-java-apps-2-accounts-1.0.jar
```

In [None]:
code_bucket='dskdaj-getting-started-2'
jar_key='kda-java-apps-2-accounts-1.0.jar'

### IAM Role and Policy



In the sink account, we will need to allow the role associated with the KDA application to assume a specific role in the stream source account.

In [None]:
import os
source_account = os.environ['PRODUCER_ACCOUNT_NO']

In [None]:
source_role_arn = "arn:aws:iam::{}:role/KA-Source-Stream-Role".format(source_account)
print(source_role_arn)

In [None]:
assume_role_policy_document="""{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
"""

In [None]:
cr = sink_iam.create_role(
    RoleName='kda-sample-role',
    Path='/service-role/',
    AssumeRolePolicyDocument=assume_role_policy_document
)

print(cr)

In [None]:
import os
sink_account_no = os.environ['CONSUMER_ACCOUNT_NO']
print(sink_account_no)

In [None]:
policy_document="""{
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AssumeRoleInSourceAccount",
                    "Effect": "Allow",
                    "Action": "sts:AssumeRole",
                    "Resource": "arn:aws:iam::""" + source_account + """:role/service-role/KA-Source-Stream-Role"
                },
                {
                    "Sid": "ReadCode",
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject",
                        "s3:GetObjectVersion"
                    ],
                    "Resource": [
                        "arn:aws:s3:::""" + code_bucket + """/*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogGroups",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogGroups"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + sink_account_no + """:log-group:*"
                    ]
                },
                {
                    "Sid": "ListCloudwatchLogStreams",
                    "Effect": "Allow",
                    "Action": [
                        "logs:DescribeLogStreams"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + sink_account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:*"
                    ]
                },
                {
                    "Sid": "PutCloudwatchLogs",
                    "Effect": "Allow",
                    "Action": [
                        "logs:PutLogEvents"
                    ],
                    "Resource": [
                        "arn:aws:logs:us-east-1:""" + sink_account_no + """:log-group:/aws/kinesis-analytics/sample:log-stream:kinesis-analytics-log-stream"
                    ]
                },
                {
                    "Sid": "WriteOutputStream",
                    "Effect": "Allow",
                    "Action": "kinesis:*",
                    "Resource": "arn:aws:kinesis:us-east-1:""" + sink_account_no + """:stream/ExampleOutputStream"
                }
            ]
}"""

In [None]:
print(policy_document)

In [None]:
cp = sink_iam.create_policy(
    PolicyName='kda-sample-app',
    Path='/service-role/',
    PolicyDocument=policy_document
)

print(cp)

In [None]:
ap = sink_iam.attach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(sink_account_no)
)

### Application Definition

Details from creating the application in the console then dumping it via describe_application


In [None]:
print(sink_account_no)

In [None]:
kca = sink_kda2.create_application(
    ApplicationName='sample',
    ApplicationDescription='Sample getting started application',
    RuntimeEnvironment='FLINK-1_11',
    ServiceExecutionRole='arn:aws:iam::{}:role/service-role/kda-sample-role'.format(sink_account_no),
    ApplicationConfiguration={
        'ApplicationCodeConfiguration': {
            'CodeContent': {
                'S3ContentLocation': {
                    'BucketARN': 'arn:aws:s3:::' + code_bucket,
                    'FileKey': jar_key,
                }
            },
            'CodeContentType': 'ZIPFILE'
        },
        'FlinkApplicationConfiguration': {
            'CheckpointConfiguration': {
                'ConfigurationType': 'DEFAULT'
            },
            'MonitoringConfiguration': {
                'ConfigurationType': 'CUSTOM',
                'MetricsLevel': 'APPLICATION',
                'LogLevel': 'DEBUG'
            },
            'ParallelismConfiguration': {
                'ConfigurationType': 'CUSTOM',
                'Parallelism': 1,
                'ParallelismPerKPU': 1,
                'AutoScalingEnabled': True
            }
        },
        'EnvironmentProperties': {
            'PropertyGroups': [
                {
                    "PropertyGroupId":"ProducerConfigProperties",
                      "PropertyMap":{
                         "AggregationEnabled":"false",
                         "aws.region":"us-east-1",
                         "flink.inputstream.initpos":"LATEST"
                      }
                }
            ]
        },
        'ApplicationSnapshotConfiguration': {
            'SnapshotsEnabled': False
        }
        
    },
    CloudWatchLoggingOptions=[
        {
            "LogStreamARN":"arn:aws:logs:us-east-1:{}:log-group:/aws/kinesis-analytics/sample:log-stream:kinesis-analytics-log-stream".format(sink_account_no)
        }
    ]
    
)
print(kca)

In [None]:
sao = sink_kda2.start_application(
    ApplicationName='sample',
    RunConfiguration={
        'FlinkRunConfiguration': {
            'AllowNonRestoredState': False
        },
        'ApplicationRestoreConfiguration': {
            'ApplicationRestoreType': 'SKIP_RESTORE_FROM_SNAPSHOT'
        }
    } 
)
print(sao)

In [None]:
# List applications and see their states...
sink_kda2.list_applications()

### Read From Output

In [None]:
shards = sink_kinesis.list_shards(
    StreamName='ExampleOutputStream'
)
print(shards)

In [None]:
itor = sink_kinesis.get_shard_iterator(
    StreamName='ExampleOutputStream',
    ShardIteratorType='TRIM_HORIZON',
    ShardId=shards['Shards'][0]['ShardId']
)
print(itor)

In [None]:
sink_kinesis.get_records(
    ShardIterator=itor['ShardIterator']
)

## Clean Up

#### Application

In [None]:
sink_kda2.stop_application(
    ApplicationName='sample',
    Force=True
)

In [None]:
da = sink_kda2.describe_application(
    ApplicationName='sample'
)
create_timestamp = da['ApplicationDetail']['CreateTimestamp']

In [None]:
from bson import json_util
import json

json.dumps(da, default=json_util.default)

In [None]:
summaries = sink_kda2.list_applications()['ApplicationSummaries']
print(summaries)

In [None]:
sink_kda2.delete_application(
    ApplicationName='sample',
    CreateTimestamp=create_timestamp
)

#### IAM

In [None]:
policy_arn="arn:aws:iam::{}:policy/service-role/kda-sample-app".format(sink_account)
print(policy_arn)

In [None]:
sink_iam.detach_role_policy(
    RoleName='kda-sample-role',
    PolicyArn=policy_arn
)

In [None]:
sink_iam.delete_policy(
    PolicyArn=policy_arn
)

In [None]:
sink_iam.delete_role(
    RoleName='kda-sample-role'
)

In [None]:
source_iam.detach_role_policy(
    RoleName='KA-Source-Stream-Role',
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-stream-reader".format(source_account)
)

In [None]:
source_iam.delete_policy(
    PolicyArn="arn:aws:iam::{}:policy/service-role/kda-stream-reader".format(source_account)
)

In [None]:
source_iam.delete_role(
    RoleName='KA-Source-Stream-Role'
)

#### Streams

In [None]:
source_kinesis.delete_stream(
    StreamName='ExampleInputStream'
)

In [None]:
sink_kinesis.delete_stream(
    StreamName='ExampleOutputStream'
)