# Customer Review Processing Pipeline with Firehose Data Streams
1. Reviews are submitted to Firehose Streams
2. Firehose then does data Transformation using Lambda
3. Lambda invokes Comprehed to assess sentiment and add sentiment to JSON
4. Firehose then collects the transforms records and stores
5. With this pipeline, firehose is ready to ingest streaming data continuously and process and send to S3

Objective: Use Comprehend Service to detect sentiment

Input: Customer Review
Output: Overall sentiment and scores for Positive, Negative, Neutral, Mixed  

https://docs.aws.amazon.com/comprehend/latest/dg/how-sentiment.html  

Dataset and Problem Description:
https://s3.amazonaws.com/amazon-reviews-pds/readme.html  
https://s3.console.aws.amazon.com/s3/buckets/amazon-reviews-pds/?region=us-east-2  

File: s3://amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re

### Customer Reviews for Major Appliances

In [2]:
!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz .

download: s3://amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz to ./amazon_reviews_us_Major_Appliances_v1_00.tsv.gz


### Prepare Training and Test data 

In [3]:
df = pd.read_csv('amazon_reviews_us_Major_Appliances_v1_00.tsv.gz',
                 sep='\t',error_bad_lines=False,warn_bad_lines=True)#,nrows=1000)

b'Skipping line 5583: expected 15 fields, saw 22\nSkipping line 22814: expected 15 fields, saw 22\nSkipping line 22883: expected 15 fields, saw 22\nSkipping line 29872: expected 15 fields, saw 22\nSkipping line 37242: expected 15 fields, saw 22\nSkipping line 59693: expected 15 fields, saw 22\n'


In [4]:
print('Rows: {0}, Columns: {1}'.format(df.shape[0],df.shape[1]))

Rows: 96834, Columns: 15


In [5]:
df.head()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,16199106,R203HPW78Z7N4K,B0067WNSZY,633038551,"FGGF3032MW Gallery Series 30"" Wide Freestandin...",Major Appliances,5,0,0,N,Y,"If you need a new stove, this is a winner.",What a great stove. What a wonderful replacem...,2015-08-31
1,US,16374060,R2EAIGVLEALSP3,B002QSXK60,811766671,Best Hand Clothes Wringer,Major Appliances,5,1,1,N,Y,Five Stars,worked great,2015-08-31
2,US,15322085,R1K1CD73HHLILA,B00EC452R6,345562728,Supco SET184 Thermal Cutoff Kit,Major Appliances,5,0,0,N,Y,Fast Shipping,Part exactly what I needed. Saved by purchasi...,2015-08-31
3,US,32004835,R2KZBMOFRMYOPO,B00MVVIF2G,563052763,Midea WHS-160RB1 Compact Single Reversible Doo...,Major Appliances,5,1,1,N,Y,Five Stars,Love my refrigerator! ! Keeps everything cold...,2015-08-31
4,US,25414497,R6BIZOZY6UD01,B00IY7BNUW,874236579,Avalon Bay Portable Ice Maker,Major Appliances,5,0,0,N,Y,Five Stars,No more running to the store for ice! Works p...,2015-08-31


In [6]:
df['review_headline'] = df['review_headline'].fillna(' ')
df['review_body'] = df['review_body'].fillna(' ')

In [7]:
# Replace embedded new lines, tabs and carriage return
pattern = r'[\n\t\r]+'

### Submit review to Firehose Stream

In [8]:
import boto3

In [9]:
session = boto3.Session(region_name='us-east-1')

In [10]:
client_firehose = session.client('firehose')

In [11]:
kinesis_delivery_stream_name = 'CustomerReviewStream'

### Warning: Sending all 100,000 reviews would incur a cost of USD 65 for sentiment analysis.
### In this lab, we need to send only the first 10 reviews

In [12]:
# Push Reviews to Firehose
# firehose to s3 json
# https://stackoverflow.com/questions/34468319/reading-the-data-written-to-s3-by-amazon-kinesis-firehose-stream/49417680#49417680

for i in range(10):
    # Strip out any new line, tab and carriage return from json payload
    # Add a new line at the end to ensure firehose places each json record in a separate
    # row. without the new line, firehose simply places all records in a single line in S3.
    payload = re.sub(pattern,' ', df.iloc[i].to_json()) + "\n"

    print(payload)
    response = client_firehose.put_record(
        DeliveryStreamName=kinesis_delivery_stream_name,
        Record={
            'Data': payload
        }
    )

    print ('Response',response['ResponseMetadata']['HTTPStatusCode'])
    print()
    '''if response['ResponseMetadata']['HTTPStatusCode'] != 200:
        print (response)
    else:
        print('.',end=' ')
'''        

{"marketplace":"US","customer_id":16199106,"review_id":"R203HPW78Z7N4K","product_id":"B0067WNSZY","product_parent":633038551,"product_title":"FGGF3032MW Gallery Series 30\" Wide Freestanding Gas Range 5 Sealed Burners Easy Temperature Probe Express-Select Controls One-Touch Self Clean:","product_category":"Major Appliances","star_rating":5,"helpful_votes":0,"total_votes":0,"vine":"N","verified_purchase":"Y","review_headline":"If you need a new stove, this is a winner.","review_body":"What a great stove.  What a wonderful replacement for my sort of antique.  Enjoy it every day.","review_date":"2015-08-31"}

Response 200

{"marketplace":"US","customer_id":16374060,"review_id":"R2EAIGVLEALSP3","product_id":"B002QSXK60","product_parent":811766671,"product_title":"Best Hand Clothes Wringer","product_category":"Major Appliances","star_rating":5,"helpful_votes":1,"total_votes":1,"vine":"N","verified_purchase":"Y","review_headline":"Five Stars","review_body":"worked great","review_date":"2015-

### Verify CloudWatch Log for the Lambda Function to confirm processing of review
### and check S3 bucket for the delivered data