# Amazon Review Data Warehouse

### Data Engineering Capstone Project

#### Project Summary

![](https://cdn.vox-cdn.com/thumbor/UJ_Tk3JEUG4RQWzlhrDyzou4cjk=/0x0:2040x1360/1200x800/filters:focal(857x517:1183x843)/cdn.vox-cdn.com/uploads/chorus_image/image/66674578/acastro_190920_1777_amazon_0002.0.0.jpg)

* [Data Reference](http://jmcauley.ucsd.edu/data/amazon/)

* [project rubric](https://review.udacity.com/#!/rubrics/2497/view)


#### Load Modules 

In [1]:
%matplotlib inline
import os
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import types as T
from configparser import ConfigParser
import psycopg2
from datetime import datetime
import matplotlib.pyplot as plt

config = ConfigParser()
config.read("../aws.cfg")

os.environ['AWS_ACCESS_KEY_ID']=config["CREDENTIALS"]['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config["CREDENTIALS"]['AWS_SECRET_ACCESS_KEY']

AWS_ACCESS_KEY=config["CREDENTIALS"]['AWS_ACCESS_KEY_ID']
AWS_SECRET_KEY=config["CREDENTIALS"]['AWS_SECRET_ACCESS_KEY']

#### Create Spark Session

In [2]:
def create_spark_session():
    global AWS_ACCESS_KEY, AWS_SECRET_KEY
    return (
        SparkSession.builder
        .config("fs.s3a.awsAccessKeyId", AWS_ACCESS_KEY)
        .config("fs.s3a.awsSecretAccessKey", AWS_SECRET_KEY)
        .config("spark.jars.packages", 
                "org.apache.hadoop:hadoop-aws:2.7.3"
                ",org.postgresql:postgresql:42.2.6")
        .getOrCreate())

In [3]:
spark = create_spark_session()

## Step 1: Scope the Project and Gather Data

### Scope

Amazon Customer Reviews is one of Amazons iconic products. In a period of over two decades since the first review in 1995, millions of Amazon customers have contributed over a hundred million reviews to express opinions and describe their experiences regarding products on the Amazon.com website. Specifically, this dataset was constructed to represent a sample of customer evaluations and opinions, variation in the perception of a product across geographical regions, and promotional intent or bias in reviews.

We build a data warehouse which analyze customre responses using data from 'books' categoreis in Amazon's reviews.



### Describe and Gather Data

1. **meta data** : including product metadata(descriptions, category information, price, brand, and image features)
2. **review data** : including reviews (ratings, text, helpfulness votes)

#### 1. meta-data

| columns    | description |
| :--------  | :---- |
| asin       | ID of the product, e.g. 000031852 | 
| title      | name of the product |
| feature    | bullet-point format features of the product |
| desription | description of the product |
| price      | price in US dollars (at time of crawl) |
| image      | url of the product image |
| also_buy    | related products (also buy) |
| also_view    | related products (also view) |
| rank  | sales rank information |
| brand      | brand name |
| category | list of categories the product belongs to |

In [118]:
metaSchema = T.StructType([
    T.StructField('asin', T.StringType()),
    T.StructField('title', T.StringType()),
    T.StructField('feature', T.ArrayType(T.StringType())),
    T.StructField('description', T.ArrayType(T.StringType())),
    T.StructField('price', T.StringType()),
    T.StructField('image', T.ArrayType(T.StringType())),
    T.StructField('also_buy', T.ArrayType(T.StringType())),
    T.StructField('also_view', T.ArrayType(T.StringType())),    
    T.StructField('rank', T.StringType()),
    T.StructField('brand', T.StringType()),
    T.StructField('category', T.ArrayType(T.StringType()))
])

metaDf = (
    spark.read
    .schema(metaSchema)
    .option('mode','DROPMALFORMED')
    .json("s3a://udacity-capstone-craftsangjae/meta_Books.json.gz"))

In [119]:
metaDf.limit(10).toPandas()

Unnamed: 0,asin,title,feature,description,price,image,also_buy,also_view,rank,brand,category
0,0000092878,Biology Gods Living Creation Third Edition 10 ...,,[It is a biology book with God's perspective.],$39.94,,"[0669009075, B000K2P5SA, B00MD4G2N0, B000ASIPT...","[0019777701, B000AUCX7I, B000K2P5SA, B001CK63X...","1,349,781inBooks(",Keith Graham,
1,000047715X,Mksap 16 Audio Companion: Medical Knowledge Se...,,,,,,"[B01MUCYEV7, B01KUGTY6O]","1,702,625inBooks(",Acp,"[Books, New, Used & Rental Textbooks, Medicine..."
2,0000004545,"Flex! Discography of North American Punk, Hard...",,"[Discography of American Punk, Hardcore, and P...",$199.99,,,,"6,291,012inBooks(",Burkhard Jarisch,"[Books, Arts & Photography, Music]"
3,0000013765,Heavenly Highway Hymns: Shaped-Note Hymnal,,[This is a collection of classic gospel hymns ...,,,,"[0006180116, 0996092730, B000QFOGY0, B06WWKNDL...","2,384,057inBooks(",Stamps/Baxter,"[Books, Arts & Photography, Music]"
4,0000000116,Georgina Goodman Nelson Womens Size 8.5 Purple...,,,$164.10,,,,"11,735,726inBooks(",,
5,0000555010,Principles of Analgesic Use in the Treatment o...,,[Brand new; never used.],,,"[0323056962, 0123979285]","[0323056962, 0521879272]","2,906,939inBooks(",American Pain Society,"[Books, New, Used & Rental Textbooks, Medicine..."
6,0000477141,MKSAP 15 Audio Companion,,[Flash cards used with accompany MKSAP 15 audi...,,,,,"2,236,549inBooks(",ACP,"[Books, Medical Books, Medicine]"
7,0000230022,The Simple Truths of Service: Inspired by John...,,[Simple Truths of Service: Inspired by Jonny t...,,,"[1492630519, 0071819045, 0688123163, 160810640...","[0692842004, 1492630519, 1978489552, 160810640...","2,566,783inBooks(",Visit Amazon's Ken Blanchard Page,"[Books, New, Used & Rental Textbooks, Business..."
8,0000038504,Double-Speak: From Revenue Enhancement to Term...,,"[This book will alert, amuse and appall you as...",$198.70,,,"[0060171340, 0060161345, 0062734121]","2,505,873inBooks(",William Lutz,"[Books, Education & Teaching, Schools & Teaching]"
9,0000001589,LJ Classique Interchangeable Ladies Gift Set W...,,,,,,,"4,368,310inBooks(",,


#### 2. review data

| columns    | description |
| :--------  | :---- |
| reviewerID       | ID of the reviewer, e.g. A2SUAM1J3GNN3B | 
| asin      |  ID of the product, e.g. 0000013714 |
| reviewerName    | name of the reviewer |
| vote  | helpful votes of the review |
| reviewText      | text of the review |
| overall    | rating of the product |
| summary  | summary of the review |
| unixReviewTime      | time of the review (unix time) |
| reviewTime | time of the review (raw) |
| image | images that users post after they have received the product |

In [120]:
reviewSchema = T.StructType([
    T.StructField('reviewerID', T.StringType()),    
    T.StructField('asin', T.StringType()),
    T.StructField('reviewerName', T.StringType()),    
    T.StructField('vote', T.StringType()),    
    T.StructField('reviewText', T.StringType()),    
    T.StructField('overall', T.FloatType()),
    T.StructField('summary', T.StringType()),
    T.StructField('unixReviewTime', T.LongType()),
    T.StructField('reviewTime', T.StringType()),    
    T.StructField('image', T.ArrayType(T.StringType()))])

reviewDf = (
    spark.read
    .schema(reviewSchema)
    .json("s3a://udacity-capstone-craftsangjae/Books.json.gz"))

In [121]:
reviewDf.limit(10).toPandas()

Unnamed: 0,reviewerID,asin,reviewerName,vote,reviewText,overall,summary,unixReviewTime,reviewTime,image
0,A1C6M8LCIX4M6M,1713353,June Bug,,This book is a winner with both of my boys. T...,5.0,Children's favorite,1123804800,"08 12, 2005",
1,A1REUF3A1YCPHM,1713353,TW Ervin II,,"The King, the Mice and the Cheese by Nancy Gur...",5.0,A story children will love and learn from,1112140800,"03 30, 2005",
2,A1YRBRK2XM5D5,1713353,Rebecca L. Menner,5.0,My daughter got her first copy from her great-...,5.0,Third copy,1081036800,"04 4, 2004",
3,A1V8ZR5P78P4ZU,1713353,Mindy Stone,,I remember this book from when I was a child a...,5.0,Graphically Wonderful!,1077321600,"02 21, 2004",
4,A2ZB06582NXCIV,1713353,B. Deniger,,"Just as I remembered it, one of my favorites f...",5.0,"Great condition, very happy to have this to sh...",1475452800,"10 3, 2016",
5,ACPQVNRD3Z09X,1713353,Terri Dickson,,It is a very cute book with great illustration...,5.0,Five Stars,1469750400,"07 29, 2016",
6,AVP0HXC9FG790,1713353,Amazon Customer,,The kids loved it!,5.0,Five Stars,1466380800,"06 20, 2016",
7,A32MQTLQQN44WW,1713353,jackie hogan,,"I was just so hapoy to have found it, thank yo...",5.0,Got a special part of my childhood back aand a...,1461456000,"04 24, 2016",
8,A13CHIJPFCEP2M,1713353,Janice Cunningham,,good comdition,5.0,Five Stars,1455408000,"02 14, 2016",
9,A324TTUBKTN73A,1713353,Tekla Borner,,My students (3 & 4 year olds) loved this book!...,5.0,Five Stars,1453593600,"01 24, 2016",


## Step 2: Explore and Assess the Data

* Explore the data to identify data quality issues, like missing values, duplicate data, etc.
* Document steps necessary to clean the data

### Check the number of rows  given data

####  - meta data

In [25]:
meta_data_size = metaDf.count()

In [28]:
print(f"metaDf's rows : {meta_data_size:,}")

metaDf's rows : 2,935,525


#### - review data

In [29]:
review_size = reviewDf.count()

In [30]:
print(f"reviewDf's rows : {review_size:,}")

reviewDf's rows : 51,311,621


### Check null existence

In [31]:
from pyspark.sql import functions as F

In [34]:
def countNullValues(df):
    return (
        df
        .select(
            [F.count(F.when(F.isnull(c), c))
             .alias(c) for c in df.columns])
        .toPandas()
    )

#### - Metadata

In [35]:
null_in_meta = countNullValues(metaDf)
null_in_meta

Unnamed: 0,asin,title,feature,description,price,image,also_buy,also_view,rank,brand,category
0,0,828,2933983,551164,1058084,2933938,1590876,1730233,24256,101021,389619


In [37]:
# percentage
null_in_meta / meta_data_size

Unnamed: 0,asin,title,feature,description,price,image,also_buy,also_view,rank,brand,category
0,0.0,0.000282,0.999475,0.187757,0.360441,0.999459,0.541939,0.589412,0.008263,0.034413,0.132725


#### - Review Data

In [38]:
null_in_review = countNullValues(reviewDf)
null_in_review

Unnamed: 0,reviewerID,asin,reviewerName,vote,reviewerText,overall,summary,unixReviewTime,reviewTime,image
0,10457215,10457215,10458745,51311621,51311621,10457215,10470166,10457215,10457215,51206718


In [39]:
# percentage
null_in_review / review_size

Unnamed: 0,reviewerID,asin,reviewerName,vote,reviewerText,overall,summary,unixReviewTime,reviewTime,image
0,0.203798,0.203798,0.203828,1.0,1.0,0.203798,0.204051,0.203798,0.203798,0.997956


### Check Duplicates existence

In [44]:
# metaDF
meta_data_size == (
    metaDf
    .dropDuplicates(['asin'])
    .count()
)

False

In [45]:
# reviewDf
review_size == (
    reviewDf
    .dropDuplicates(
        [reviewDf.reviewerID, reviewDf.asin, reviewDf.unixReviewTime])
    .count()
)

False

### Cleansing Steps

* eliminate duplicated and null values in the ID(asin, reviewerID)
* add ID Column in reviewDf
* cache cleansed dataframes in parquet format.

In [132]:
os.makedirs("../data",exist_ok=True)
(
    metaDf
    .filter(metaDf.asin.isNotNull())
    .dropDuplicates(['asin'])
    .write
    .parquet(os.path.join("../data", "meta.parquet"), 'overwrite')
)

(
    reviewDf
    .filter(reviewDf.asin.isNotNull())
    .filter(reviewDf.reviewerID.isNotNull())
    .dropDuplicates(["reviewerID", "asin", "unixReviewTime"])
    .withColumn('ID', F.monotonically_increasing_id())
    .write    
    .parquet(os.path.join("../data", "review.parquet"), 'overwrite')
)

### Load Cached Data

In [195]:
metaDf = (
    spark.read
    .schema(metaSchema)
    .parquet("../data/meta.parquet")
)

reviewSchema = T.StructType([
    T.StructField('ID', T.LongType()),
    T.StructField('reviewerID', T.StringType()),    
    T.StructField('asin', T.StringType()),
    T.StructField('reviewerName', T.StringType()),    
    T.StructField('vote', T.StringType()),    
    T.StructField('reviewText', T.StringType()),    
    T.StructField('overall', T.FloatType()),
    T.StructField('summary', T.StringType()),
    T.StructField('unixReviewTime', T.LongType()),
    T.StructField('reviewTime', T.StringType()),    
    T.StructField('image', T.ArrayType(T.StringType()))])

reviewDf = (
    spark.read
    .schema(reviewSchema)
    .parquet("../data/review.parquet")
)

In [197]:
cleansed_meta_size = metaDf.count()
cleansed_review_size = reviewDf.count()

print(
    f"the num of meta data cleansed : {cleansed_meta_size:,}")
print(
    f"the num of review data cleansed : {cleansed_review_size:,}")

the num of meta data cleansed : 2,930,600
the num of review data cleansed : 51,161,643


## Step 3: Define the Data Model

### 3.1 Conceptual Data Model

![](https://imgur.com/ivCE1lk.png)


We constructed a conceptual data model as above through two datasets. In the above structure, which takes the form of Star Schema, the reviewlog containing the rating data evaluated by the customer is located at the center of the fact table, and the user, product, review text, and time information are arranged in the dimension table. Through the above structure, you can quickly find out which products have received the highest ratings from customers. It can be easily expanded to implement a recommendation algorithm such as collaborative filtering through a fact table.

### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. converts price and vote information in string type into float & int type.
2. split unixtime data to year, month, day, hour and minute data.
3. Split into dimension table and fact table

## Step 4: Run Pipelines to Model the Data 

### 4.1 Create the data model
Build the data pipelines to create the data model.

#### CREATE DATABASES

In [150]:
conn = psycopg2.connect("host=postgres user=postgres password=postgres")
conn.set_session(autocommit=True)

cur = conn.cursor()
cur.execute('DROP DATABASE IF EXISTS amazonreview;')
cur.execute('CREATE DATABASE amazonreview;')

conn.close()
cur.close()

#### CREATE TABLES

* Each primary key exists in every dimension table
* Add `not null` constraint to fact table

In [156]:
product_table_create = '''
CREATE TABLE IF NOT EXISTS product (
    productID varchar(12) PRIMARY KEY,
    title text,
    feature text [],
    description text [],
    price float,
    image text [],
    also_buy text [],
    also_view text [],
    rank text,
    brand text,
    category text []
);
'''

time_table_create = '''
CREATE TABLE IF NOT EXISTS time (
    unixTime bigint PRIMARY KEY,
    year int,
    month int,
    day int,
    hour int,
    minute int
);
'''

reviewuser_table_create = '''
CREATE TABLE IF NOT EXISTS reviewuser (
    userID varchar(20) PRIMARY KEY,
    userName text
    );
'''

review_table_create = '''
CREATE TABLE IF NOT EXISTS review (
    reviewID bigint PRIMARY KEY,
    reviewText text,
    summary text,
    image text []
);
'''

reviewlog_table_create = '''
CREATE TABLE IF NOT EXISTS reviewlog (
    reviewID bigint NOT NULL,
    userID varchar(20) NOT NULL,
    productID varchar(12) NOT NULL,
    vote INT,
    overall FLOAT,
    unixTime bigint NOT NULL
);
'''

create_table_queries = [
    product_table_create,
    time_table_create,
    reviewuser_table_create,
    review_table_create,
    reviewlog_table_create
]

In [158]:
with psycopg2.connect("host=postgres dbname=amazonreview user=postgres password=postgres") as conn:
    cur = conn.cursor()
    for query in create_table_queries:
        cur.execute(query)

### Load data into a table

In [160]:
url = "jdbc:postgresql://postgres/amazonreview"

db_configs = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

#### (1)  product Dimension Table 

In [229]:
table_name = 'product'

price2float = F.udf(
    lambda x : float(x[1:].replace(',',"")) 
    if x != None else -1., T.FloatType())

(
    metaDf
    .withColumnRenamed("asin", "productID")
    .withColumn("price", price2float('price'))
    .select(['productID', 'title', 'feature', 
             'description', 'price', 'image', 
             'also_buy', 'also_view', 'rank', 
             'brand', 'category'])
    .write
    .jdbc(url=url, table=table_name, 
          properties=db_configs, mode='append')
)

In [230]:
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT * FROM {table_name} limit 10;")
    print(cur.fetchall())

[('0544108620', "Beauty's Daughter: The Story of Hermione and Helen of Troy", None, ['Meyers latest historical novel delves into Greek mythology with a retelling of the Trojan War from the point of view of Hermione, the daughter of Helen and Menelaus of Sparta. Hermione lives in the shadow of her vainglorious mother, whose beauty bewitches all men. Though her mother is often critical or neglectful, Hermione is hurt when Helen runs away with Paris, abandoning her and her father but taking her brother. Menelaus and the Greek army follow in the lovers wake, with Hermione as a stowaway. As the war wages, Hermione grows into a woman with her own romantic and political prospects, including an arranged marriage to a hated man. The dry litany of Greek figures and myths that Hermione recites throughout the story may provide context, but they overshadow the scant details she offers about her own life. Still, the sheer scope and brutality of the war and the complications caused by the gods meddli

#### (2) reviewUser Dimension Table

In [198]:
table_name = 'reviewuser'
(
    reviewDf.dropna()
    .select(['reviewerID', 'reviewerName'])
    .withColumnRenamed('reviewerID','userID')
    .withColumnRenamed('reviewerName','userName')
    .dropDuplicates(['userID'])
    .write
    .jdbc(url=url, table=table_name, 
          properties=db_configs, mode='append')
)

In [232]:
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT * FROM {table_name} limit 10;")
    print(cur.fetchall())

[('A10R2Z2FQLPYEY', 'Buys Too Much Stuff'), ('A11T9RH986WFWA', 'Siamese MMA'), ('A126XB22Y6RXYW', 'Lucy'), ('A12LJXFVA2DT8C', 'Shannon'), ('A12UL2KGFLQP9E', 'Lynda&#039;s AWOL'), ('A12YSWLNZXMZVM', 'K.Brooklyn'), ('A131JP42GCM8Z8', 'Amazon Customer'), ('A13FGQDHTDRV14', 'John Wheeler'), ('A13O3ZQKG7NWFR', 'Annalisa'), ('A149KMYLTU33M2', 'Hullabaloo Brew Co')]


#### (3) review Dimension Table

In [233]:
table_name = 'review'
(
    reviewDf
    .select(['ID', 'reviewText', 'summary', 'image'])
    .withColumn('reviewText', F.regexp_replace('reviewText', u'\u0000', ""))
    .withColumn('summary', F.regexp_replace('summary', u'\u0000', ""))    
    .withColumnRenamed('ID','reviewID')    
    .write
    .jdbc(url=url, table=table_name, 
          properties=db_configs, mode='append')
)

In [234]:
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT * FROM {table_name} limit 10;")
    print(cur.fetchall())

[(1623497637888, 'Immensely readable as is typical with John Grisham.  The protagonists were not particularly likable it their story was pretty enjoyable. I look forward to the next novel by Mr. Grisham.', 'Not his best but pretty good', None), (1623497637889, 'Love it ! Love it ! Love it ! After getting this book, I can give it a glowing recommendation to anyone looking to expand to or are already involved in foreign investment in any capacity. The avenues to bring about your desired result are new and and exciting as advertised.', 'Love it! Love it', None), (1623497637890, 'I absolutely did not want to put this book down, and I was disappointed when it was over. The author has a way of bringing out each personality and remaining consistent throughout the book. I could actually see the landscapes and picture in my mind where each place was when described. I never thought I would want to go to Alaska, but now I do!', 'Exciting and adventurous, with very clear descriptions of both peopl

#### (4) time Dimension Table

In [237]:
table_name = 'time'

time2year = F.udf(
    lambda x : datetime.fromtimestamp(x).year, T.IntegerType())
time2month = F.udf(
    lambda x : datetime.fromtimestamp(x).month, T.IntegerType())
time2day = F.udf(
    lambda x : datetime.fromtimestamp(x).day, T.IntegerType())
time2hour = F.udf(
    lambda x : datetime.fromtimestamp(x).hour, T.IntegerType())
time2minute = F.udf(
    lambda x : datetime.fromtimestamp(x).minute, T.IntegerType())

(
    reviewDf
    .select(['unixReviewTime'])
    .dropDuplicates(['unixReviewTime'])
    .withColumnRenamed('unixReviewTime', 'unixTime')
    .withColumn('year', time2year('unixTime'))
    .withColumn('month',time2month('unixTime'))
    .withColumn('day', time2day('unixTime'))
    .withColumn('hour',time2hour('unixTime'))
    .withColumn('minute',time2minute('unixTime'))
    .write
    .jdbc(url=url, table=table_name, 
          properties=db_configs, mode='append')
    
)

In [238]:
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT * FROM {table_name} limit 10;")
    print(cur.fetchall())

[(1440201600, 2015, 8, 22, 0, 0), (1515628800, 2018, 1, 11, 0, 0), (1509926400, 2017, 11, 6, 0, 0), (1429142400, 2015, 4, 16, 0, 0), (1482278400, 2016, 12, 21, 0, 0), (1458000000, 2016, 3, 15, 0, 0), (1292371200, 2010, 12, 15, 0, 0), (1420243200, 2015, 1, 3, 0, 0), (1412121600, 2014, 10, 1, 0, 0), (1201651200, 2008, 1, 30, 0, 0)]


#### (5) reviewlog Fact Table

In [266]:
vote2int = F.udf(
    lambda x : int(x.replace(',','')) if x!= None else 0, T.IntegerType())

table_name = 'reviewlog'
(
    reviewDf
    .select(['ID', 'reviewerID', 'asin', 
             'vote', 'overall', 'unixReviewTime'])
    .withColumn("vote", vote2int('vote'))
    .withColumnRenamed('ID','reviewID')    
    .withColumnRenamed('reviewerID','userID')
    .withColumnRenamed('asin','productID')
    .withColumnRenamed('unixReviewTime','unixTime')
    .write
    .jdbc(url=url, table=table_name, 
          properties=db_configs, mode='append')
)

In [222]:
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT * FROM {table_name} limit 10;")
    print(cur.fetchall())

[(214748364800, 'A00463782V7TKAP9EMNL', '1978447221', 0, 4.0, 1512604800), (214748364801, 'A0061296P44YI9YQBDHH', '1978393296', 0, 5.0, 1508889600), (214748364802, 'A0061296P44YI9YQBDHH', '1981380043', 0, 5.0, 1512604800), (214748364803, 'A0089073KZNVNZG94KE7', '0345804406', 0, 5.0, 1420416000), (214748364804, 'A00929559H7AYKDCV7UE', '1451649320', 0, 4.0, 1485388800), (214748364805, 'A0099735VDZ3HDCAAYKL', '1540772888', 0, 5.0, 1493078400), (214748364806, 'A0110125LPMFR5Q4Z44R', '0811821846', 2, 3.0, 1433116800), (214748364807, 'A0137974MNNUR2E8S9SH', '0615937284', 2, 5.0, 1391212800), (214748364808, 'A01423305PUDQS2VV7QX', '1508715041', 0, 5.0, 1428019200), (214748364809, 'A0149534Z6YNFOGAJD5E', '0307881393', 0, 4.0, 1366848000)]


### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:

#### `product` table check

In [269]:
table_name = 'product'
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT COUNT(*) FROM {table_name};")
    nums = cur.fetchall()[0][0]

In [270]:
assert nums == cleansed_meta_size, "#Cleansed Data Row == # RDBS DATA ROW"

#### `reviewlog` table check

In [267]:
table_name = 'reviewlog'
with psycopg2.connect("host=postgres dbname=amazonreview "
                      "user=postgres password=postgres") as conn:
    cur = conn.cursor()
    
    cur.execute(f"SELECT COUNT(*) FROM {table_name};")
    nums = cur.fetchall()[0][0]

In [268]:
assert nums == cleansed_review_size

### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.




#### `Product` Table

| columns    | description |
| :--------  | :---- |
| productID       | ID of the product, e.g. 000031852 | 
| title      | name of the product |
| feature    | bullet-point format features of the product |
| desription | description of the product |
| price      | price in US dollars (at time of crawl) |
| image      | url of the product image |
| also_buy    | related products (also buy) |
| also_view    | related products (also view) |
| rank  | sales rank information |
| brand      | brand name |
| category | list of categories the product belongs to |

#### `reivew` table

| columns    | description |
| :--------  | :---- |
| userID       | ID of the reviewer, e.g. A2SUAM1J3GNN3B | 
| userName    | name of the reviewer |

#### `reivewUser` table

| columns    | description |
| :--------  | :---- |
| userID       | ID of the reviewer, e.g. A2SUAM1J3GNN3B | 
| userName    | name of the reviewer |

#### `unixTime` table

| columns    | description |
| :--------  | :---- |
| unixTime   | time of the review (unix time) | 
| year       |  year of unixTime |
| month      | month of unixTime |
| day        | day of unixTime |
| hour       | hour of unixTime |
| minute     | minute of unixtime |

#### `reviewlog` table

| columns    | description |
| :--------  | :---- |
| reviewID       | ID of the review log| 
| productID      |  ID of the product, e.g. 0000013714 |
| userID    | ID of the reviewer |
| vote  | helpful votes of the review |
| overall    | rating of the product |
| unixTime | time of the review by unix |

## Step 5: Complete Project Write Up


#### Environment Configuration

1. DataLake : Amazon S3 
2. ETL Process : Spark (by Docker image)
3. Data Warehouse : Postgresql-Cstore (by Docker image)

#### Senarios

  In the development environment, the Docker images are configured to operate on a single machine. However, when the volume of data grows, we can easily convert the ETL Process to Amazon EMR and the Postgresql-cstore to Amazon redshift. Redishift and amazon EMR, which perform distributed computing, can flexibly scale up and down to meet demand, so even more than 100+ people can handle it without problems.

In order to operate periodically(such as The pipelines would be run on a daily basis by 7 am every day) , we can easily convert the ETL pipeline currently written as a notebook file to airflow. In the case of books, it is not very sensitive to fashion and trend, so the update cycle is enough daily.


