# Customer Review Processing Pipeline with Firehose Data Streams


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re

### Customer Reviews for Cameras

In [None]:
!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Camera_v1_00.tsv.gz .

### Prepare Training and Test data 

In [None]:
df = pd.read_csv('amazon_reviews_us_Camera_v1_00.tsv.gz',
                 sep='\t', error_bad_lines=False, warn_bad_lines=False)


In [None]:
print('Rows: {0}, Columns: {1}'.format(df.shape[0],df.shape[1]))

In [None]:
df.head()

In [None]:
df['review_headline'] = df['review_headline'].fillna(' ')
df['review_body'] = df['review_body'].fillna(' ')

# Replace embedded new lines, tabs and carriage return
pattern = r'[\n\t\r]+'

## Pushing review data to Kinesis Data Firehose

In [None]:
import boto3

In [None]:
session = boto3.Session(region_name='us-east-1')

In [None]:
client_firehose = session.client('firehose')

In [None]:
kinesis_delivery_stream_name = 'CustomerReviewStream'

In [None]:
for i in range(100):

    payload = re.sub(pattern,' ', df.iloc[i].to_json()) + "\n"

    print(payload)
    response = client_firehose.put_record(
        DeliveryStreamName=kinesis_delivery_stream_name,
        Record={
            'Data': payload
        }
    )

    print ('Response',response['ResponseMetadata']['HTTPStatusCode'])
   