# Put Customer Reviews On Kinesis Data Firehose

<img src="img/kinesis-complete.png" width="80%" align="left">

In [None]:
import boto3
import sagemaker
import pandas as pd
import json

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name='sagemaker', region_name=region)
firehose = boto3.Session().client(service_name='firehose', region_name=region)
kinesis_analytics = boto3.Session().client(service_name='kinesisanalytics', region_name=region)


In [None]:
%store -r firehose_name

In [None]:
try:
    firehose_name
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run the notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [None]:
print(firehose_name)

In [None]:
%store -r firehose_arn

In [None]:
try:
    firehose_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run the notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [None]:
print(firehose_arn)

In [None]:
%store -r iam_role_kinesis_arn

In [None]:
try:
    iam_role_kinesis_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run the notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [None]:
print(iam_role_kinesis_arn)

In [None]:
%store -r kinesis_data_analytics_app_name

In [None]:
try:
    kinesis_data_analytics_app_name
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run the notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [None]:
print(kinesis_data_analytics_app_name)

In [None]:
%store -r lambda_fn_name

In [None]:
try:
    lambda_fn_name
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run the notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [None]:
print(lambda_fn_name)

In [None]:
firehoses = firehose.list_delivery_streams(DeliveryStreamType='DirectPut')

print(json.dumps(firehoses, indent=4, sort_keys=True, default=str))

# Download Dataset

In [None]:
!aws s3 cp 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/

In [None]:
import csv
import pandas as pd

df = pd.read_csv('./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz', 
                 delimiter='\t', 
                 quoting=csv.QUOTE_NONE,
                 compression='gzip')
df.shape

In [None]:
df.head(5)

In [None]:
df_star_rating_and_review_body = df[['review_id', 
                                         'star_rating', 
                                         'product_category', 
                                         'review_body']][0:1]

df_star_rating_and_review_body.to_csv(sep='\t',
                                      header=None,
                                      index=False)

# Check that Kinesis Data Analytics Application Is Running

In [None]:
response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)

In [None]:
%%time

import time

app_status = response['ApplicationDetail']['ApplicationStatus']

while app_status != 'RUNNING':
    time.sleep(5)
    response = kinesis_analytics.describe_application(
        ApplicationName=kinesis_data_analytics_app_name)
    app_status = response['ApplicationDetail']['ApplicationStatus']
    print('Application status {}'.format(app_status))

print('Application status {}'.format(app_status))

# _Wait For The Application Status ^^ Running ^^_

# Simulate Producer Application Writing Records to the Stream

# Open Lambda Logs

In [None]:
from IPython.core.display import display, HTML
    
display(HTML('<b>Review <a target="top" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=%252Faws%252Flambda%252F{}">Lambda Logs</a></b>'.format(region, lambda_fn_name)))


# _Keep That ^^ Link ^^ Open In Your Browser_

# Open Custom CloudWatch Metrics

In [None]:
from IPython.core.display import display, HTML
    
display(HTML("""<b>Review <a target="top" href="https://console.aws.amazon.com/cloudwatch/home?region={}#metricsV2:graph=~(metrics~(~(~'kinesis*2fanalytics*2fAVGStarRating~'AVGStarRating~'Product*20Category~'All))~view~'timeSeries~stacked~false~start~'-PT5M~end~'P0D~region~'us-east-1~liveData~true~stat~'Average~period~1~title~'Avg*20Star*20Rating);query=~'*7bkinesis*2fanalytics*2fAVGStarRating*2c*22Product*20Category*22*7d">CloudWatch Metrics</a></b>""".format(region, region)))


# _Keep That ^^ Link ^^ Open In Your Browser_

# Open Kinesis Data Analytics Console UI

In [None]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Review <a target="top" href="https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}"> Kinesis Data Analytics App</a></b>'.format(region, kinesis_data_analytics_app_name)))


# _Keep That ^^ Link ^^ Open In Your Browser To See Live Records Coming In After Running Next Cells_

# Put Records onto Firehose

In [None]:
firehose_response = firehose.describe_delivery_stream(
    DeliveryStreamName=firehose_name
)

print(json.dumps(firehose_response, indent=4, sort_keys=True, default=str))

In [None]:
%%time

step = 1

for start_idx in range(0, 1000, step):
    end_idx = start_idx + step

    df_star_rating_and_review_body = df[['review_id', 
                                         'product_category', 
                                         'review_body']][start_idx:end_idx]

    reviews_tsv = df_star_rating_and_review_body.to_csv(sep='\t',
                                                        header=None,
                                                        index=False)
    
    # print(reviews_tsv.encode('utf-8'))
    
    response = firehose.put_record(        
        Record={
            'Data': reviews_tsv.encode('utf-8')
        },
        DeliveryStreamName=firehose_name
    )

In [None]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Review <a target="top" href="https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}"> Kinesis Data Analytics App</a></b>'.format(region, kinesis_data_analytics_app_name)))


# Go To Kinesis Analytics UI: 

# _Note: If You See This Error `No rows in source stream`:_

<img src="img/no_rows_in_source_kinesis_firehose_stream.png" width="80%" align="left">

## _Click On `Source` Or `Real-Time analytics` Tab Or Re-Run ^^ Above ^^ Cell `Put Records onto Firehose`_

# Go To Kinesis Analytics UI: 

In [None]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Go To UI <a target="top" href="https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}"> Kinesis Data Analytics App</a></b>'.format(region, kinesis_data_analytics_app_name)))


# ---- You can see our reviews streaming data coming in under the `Source` tab:

<img src="img/kinesis_analytics_1.png" width="80%" align="left">

# ---- Go to `Real-time analytics` tab and select `AVG_STAR_RATING_SQL_STREAM`

<img src="img/kinesis_analytics_5.png" width="80%" align="left">

## ------ Go to `Real-time analytics` tab and select `APPROXIMATE_COUNT_SQL_STREAM`

<img src="img/kinesis_analytics_4.png" width="80%" align="left">

# Go To Kinesis Analytics UI and Check Anomaly Detection Score

In [None]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Go To <a target="top" href="https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}"> Kinesis Data Analytics App</a></b>'.format(region, kinesis_data_analytics_app_name)))


# Create and Put Anomaly Data Onto Stream
Here, we are hard-coding a bad review to trigger an anomaly.

In [None]:
%%time

import time

anomaly_step = 1

for start_idx in range(0, 1000, anomaly_step):
    timestamp = int(time.time())

    df_anomalies = pd.DataFrame([
        {'review_id': str(timestamp), 
         'product_category': 'Digital_Software', 
         'review_body': 'This is an awful waste of time.'},     
    ], columns=['review_id', 'star_rating', 'product_category', 'review_body'])

    reviews_tsv_anomalies = df_anomalies.to_csv(sep='\t',
                                                header=None,
                                                index=False)
    
    response = firehose.put_record(           
        Record={
            'Data': reviews_tsv_anomalies.encode('utf-8')
        },
        DeliveryStreamName=firehose_name
    )

In [None]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Go To <a target="top" href="https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}"> Kinesis Data Analytics App</a></b>'.format(region, kinesis_data_analytics_app_name)))


# ---- Go to `Real-time analytics` tab and select `ANOMALY_SCORE_SQL_STREAM`

<img src="img/kinesis_analytics_3.png" width="80%" align="left">

In [None]:
#%%javascript
#Jupyter.notebook.save_checkpoint();
#Jupyter.notebook.session.delete();