# Kinesis Data Stream
* https://github.com/aws-samples/aws-ml-data-lake-workshop
* https://aws.amazon.com/blogs/big-data/snakes-in-the-stream-feeding-and-eating-amazon-kinesis-streams-with-python/

In [None]:
import boto3
import sagemaker
import pandas as pd

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name='sagemaker', region_name=region)
kn_data = boto3.Session().client(service_name='kinesis', region_name=region)
kn_firehose = boto3.Session().client(service_name='firehose', region_name=region)
sts = boto3.Session().client(service_name='sts', region_name=region)

In [None]:
kn_data.list_streams()

## Step 1: Create a Kinesis Data Stream

In [None]:
stream_name = "dsoaws-data-stream"
shard_count = 2

In [None]:
response = kn_data.create_stream(
    StreamName=stream_name, 
    ShardCount=shard_count
)


In [None]:
print(response)

In [None]:
data_stream_response = kn_data.describe_stream(
    StreamName=stream_name
)

print(data_stream_response)

In [None]:
data_stream_arn = data_stream_response['StreamDescription']['StreamARN']
print(data_stream_arn)

In [None]:
iam_kinesis_role_name = 'DSOAWS_Kinesis'

In [None]:
assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesis.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }      
  ]
} 

In [None]:
import json
import boto3
import time

from botocore.exceptions import ClientError

try:
    iam = boto3.client('iam')

    iam_role_kinesis = iam.create_role(
        RoleName=iam_kinesis_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description='DSOAWS Kinesis Role'
    )
except ClientError as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        iam_role_kinesis = iam.get_role(RoleName=iam_kinesis_role_name)
        print("Role already exists")
    else:
        print("Unexpected error: %s" % e)
        
time.sleep(10)

In [None]:
iam_role_kinesis_arn = iam_role_kinesis['Role']['Arn']
print(iam_role_kinesis_arn)

iam_role_kinesis_name = iam_role_kinesis['Role']['RoleName']
print(iam_role_kinesis_name)

In [None]:
account_id = sts.get_caller_identity()['Account']

In [None]:
kinesis_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
         {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination",
            ],
            "Resource": [
                "arn:aws:firehose:{}:{}:deliverystream/{}".format(region, account_id, stream_name)
            ]
         },
         {
            "Effect": "Allow",
            "Action": [
                "kinesis:Get*",
                "kinesis:DescribeStream",
                "kinesis:Put*",
                "kinesis:List*",                
            ],
            "Resource": [
                "arn:aws:kinesis:{}:{}:stream/{}".format(region, account_id, stream_name)
            ]
         }
        
    ]
}

print(kinesis_policy_doc)

# Update Policy

In [None]:
import time

response = iam.put_role_policy(
    RoleName=iam_role_kinesis_name,
    PolicyName='DSOAWS_KinesisPolicy',
    PolicyDocument=json.dumps(kinesis_policy_doc)
)

time.sleep(10)

In [None]:
print(response)

# Create a Kinesis Firehose Stream with Source Data Stream

In [None]:
firehose_name = 'dsoaws-firehose-stream'
delivery_stream_type = 'KinesisStreamAsSource'
bucket_arn = 'arn:aws:s3:::dsoaws-streaming-data'

In [None]:
firehose_response = kn_firehose.create_delivery_stream(
    DeliveryStreamName=firehose_name,
    DeliveryStreamType=delivery_stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': data_stream_arn,
        'RoleARN': iam_role_kinesis_arn
    },
    ExtendedS3DestinationConfiguration={
        'RoleARN': iam_role_kinesis_arn,
        'BucketARN': bucket_arn
    }
)

In [None]:
print(firehose_response)

# Put Records

In [None]:
!aws s3 cp 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/

In [None]:
import csv
import pandas as pd

df = pd.read_csv('./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz', 
                 delimiter='\t', 
                 quoting=csv.QUOTE_NONE,
                 compression='gzip')
df.shape

In [None]:
df.head(5)

In [None]:
partition_key = '123'
#reviews_tsv = '5\tThis is a 5 star review\n1\tThis is a 1 star review\n'

In [None]:
df_star_rating_and_review_body = df[['star_rating', 'review_body']][:100]
df_star_rating_and_review_body.shape

In [None]:
df_star_rating_and_review_body.head()

In [None]:
reviews_tsv = df_star_rating_and_review_body.to_csv(sep='\t',
                                                    header=None,
                                                    index=False)

In [None]:
reviews_tsv

In [None]:
data_stream = boto3.Session().client(service_name='kinesis', region_name=region)

response = data_stream.put_records(
    Records=[
        {
            'Data': reviews_tsv.encode('utf-8'),
            'PartitionKey': partition_key
        },
    ],
    StreamName=stream_name
)

# Get Records

In [None]:
shard_id_1 = 'shardId-000000000000'
shard_id_2 = 'shardId-000000000001'

In [None]:
shard_iter_1 = data_stream.get_shard_iterator(StreamName=stream_name, 
                                            ShardId=shard_id_1, 
                                            ShardIteratorType='TRIM_HORIZON')['ShardIterator']

shard_iter_2 = data_stream.get_shard_iterator(StreamName=stream_name, 
                                            ShardId=shard_id_2, 
                                            ShardIteratorType='TRIM_HORIZON')['ShardIterator']

In [None]:
records_response_1 = data_stream.get_records(
    ShardIterator=shard_iter_1,
    Limit=100
)
print(records_response_1)

In [None]:
print(records_response_1['Records'][0]['Data'].decode('utf-8'))

In [None]:
records_response_2 = data_stream.get_records(
    ShardIterator=shard_iter_2,
    Limit=100
)
print(records_response_2)

In [None]:
print(records_response_2['Records'][0]['Data'].decode('utf-8'))