# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [4]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os
import psycopg2
import configparser
import boto3

# Set up Configuration file
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ["AWS_ACCESS_KEY_ID"] = config['AWS_ACCESS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS_ACCESS']['AWS_SECRET_ACCESS_KEY']

In [5]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [6]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

Yelp's dataset contains its business and reviews data across eleven metropolitan areas in four different countries. 

I am going to build pipelines to create tables of star schema in the database.  

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

The data is from Kaggle Open dataset, which could be used to do various different analysis.  


#### Load Data
First step is to load the data and check how it looks. 

In [7]:
# Read in the data here
def read_data(name):
    MainLocation = "Yelp_Data/"
    #print(MainLocation + name)
    return spark.read.json(MainLocation + name)       

In [8]:
business_data = read_data("yelp_academic_dataset_business.json")

In [9]:
review_data = read_data("yelp_academic_dataset_review.json")

## Explore data

In [10]:
business_data.limit(2).toPandas()

Unnamed: 0,_corrupt_record,address,attributes,business_id,categories,city,hours,is_open,latitude,longitude,name,postal_code,review_count,stars,state
0,,2818 E Camino Acequia Drive,"(None, None, None, None, None, None, None, Non...",1SWheh84yJXfytovILXOAQ,"Golf, Active Life",Phoenix,,0,33.522143,-112.018481,Arizona Biltmore Golf Club,85016,5,3.0,AZ
1,,30 Eglinton Avenue W,"(None, None, u'full_bar', {'romantic': False, ...",QXAEGFB4oINsVuTFxEYKFQ,"Specialty Food, Restaurants, Dim Sum, Imported...",Mississauga,"(9:0-1:0, 9:0-0:0, 9:0-1:0, 9:0-0:0, 9:0-0:0, ...",1,43.605499,-79.652289,Emerald Chinese Restaurant,L5R 3E7,128,2.5,ON


In [11]:
review_data.limit(2).toPandas()

Unnamed: 0,_corrupt_record,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,,ujmEBvifdJM6h6RLv4wQIg,0,2013-05-07 04:34:36,1,Q1sbwvVQXV2734tPgoKj4Q,1.0,Total bill for this horrible service? Over $8G...,6,hG7b0MtEbXx5QzbzE6C_VA
1,,NZnhc2sEQy3RmzKTZnqtwQ,0,2017-01-14 21:30:33,0,GJXCdrto3ASJOqKeVWPi6Q,5.0,I *adore* Travis at the Hard Rock's new Kelly ...,0,yXQM5uF2jS6es16SJzNHfg


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data
Identidy Missing Values, Duplicate columns, primary keys, etc, which would help design tables

In [12]:
print (business_data.select('business_id').distinct().count(),business_data.count())

140271 140271


Since the count of unique business_id is the same as overall row account, we could defer there is no duplicates in the raw data

Now We are going to look at specific columns - Considering it is relational database, so I would not include columns like attributes, which is json farmats embedded in the json file; But it also posts a good question to me - how to store json values in database?

In [19]:
category_store = business_data.select('business_id','categories').limit(5).withColumn("categories", F.explode(F.split("categories",","))).toPandas()

In [20]:
category_store.head(5)

Unnamed: 0,business_id,categories
0,1SWheh84yJXfytovILXOAQ,Golf
1,1SWheh84yJXfytovILXOAQ,Active Life
2,QXAEGFB4oINsVuTFxEYKFQ,Specialty Food
3,QXAEGFB4oINsVuTFxEYKFQ,Restaurants
4,QXAEGFB4oINsVuTFxEYKFQ,Dim Sum


The first step is to split categories into different columns, once it is done, the second step is to create a table of dummy values, so that no duplicates in categories column

In [24]:
category_table = pd.get_dummies(category_store, columns= ['categories'])
category_table.head(5)

Unnamed: 0,business_id,categories_ Active Life,categories_ Chinese,categories_ Dim Sum,categories_ Ethnic Food,categories_ Financial Services,categories_ Food,categories_ Home & Garden,categories_ Home Services,categories_ Imported Food,...,categories_ Local Services,categories_ Restaurants,categories_ Seafood,categories_ Shopping,categories_ Water Heater Installation/Repair,categories_Golf,categories_Insurance,categories_Plumbing,categories_Specialty Food,categories_Sushi Bars
0,1SWheh84yJXfytovILXOAQ,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
1,1SWheh84yJXfytovILXOAQ,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,QXAEGFB4oINsVuTFxEYKFQ,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,0
3,QXAEGFB4oINsVuTFxEYKFQ,0,0,0,0,0,0,0,0,0,...,0,1,0,0,0,0,0,0,0,0
4,QXAEGFB4oINsVuTFxEYKFQ,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


Once it is done, the categories look nice but still some duplicates in business_id column. I would use groupby to aggregate the table

In [26]:
category_table = category_table.groupby(['business_id']).sum()

In [27]:
category_table.head(5)

Unnamed: 0_level_0,categories_ Active Life,categories_ Chinese,categories_ Dim Sum,categories_ Ethnic Food,categories_ Financial Services,categories_ Food,categories_ Home & Garden,categories_ Home Services,categories_ Imported Food,categories_ Japanese,...,categories_ Local Services,categories_ Restaurants,categories_ Seafood,categories_ Shopping,categories_ Water Heater Installation/Repair,categories_Golf,categories_Insurance,categories_Plumbing,categories_Specialty Food,categories_Sushi Bars
business_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1SWheh84yJXfytovILXOAQ,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
HhyxOkGAM07SRYtlQ4wMFQ,0,0,0,0,0,0,1,1,0,0,...,1,0,0,1,1,0,0,1,0,0
QXAEGFB4oINsVuTFxEYKFQ,0,1,1,1,0,1,0,0,1,0,...,0,1,1,0,0,0,0,0,1,0
gnKjwL_1w79qoiV3IC_xQQ,0,0,0,0,0,0,0,0,0,1,...,0,1,0,0,0,0,0,0,0,1
xvX2CttrVhyG2z1dFg_0xw,0,0,0,0,1,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0


Now, we've got what we need! And a file would be created to store the data in the local drive.
-In this notebook, all other files would be loaded directly with Python script, but for category table, I would suggest using ETL tool. The reason is that the column names of the table would be hard to create. And if we do bulk insert, we don't need to worry about column names. 

In [23]:
category_table.to_json ("Yelp_Data/category_table.json" )

### Analysis on reciew_staging table

Have a look at the data first

In [28]:
review_data.limit(5).toPandas()

Unnamed: 0,_corrupt_record,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,,ujmEBvifdJM6h6RLv4wQIg,0,2013-05-07 04:34:36,1,Q1sbwvVQXV2734tPgoKj4Q,1.0,Total bill for this horrible service? Over $8G...,6,hG7b0MtEbXx5QzbzE6C_VA
1,,NZnhc2sEQy3RmzKTZnqtwQ,0,2017-01-14 21:30:33,0,GJXCdrto3ASJOqKeVWPi6Q,5.0,I *adore* Travis at the Hard Rock's new Kelly ...,0,yXQM5uF2jS6es16SJzNHfg
2,,WTqjgwHlXbSFevF32_DJVw,0,2016-11-09 20:09:03,0,2TzJjDVDEuAW6MR5Vuc1ug,5.0,I have to say that this office really has it t...,3,n6-Gk65cPZL6Uz8qRm3NYw
3,,ikCg8xy5JIg_NGPx-MSIDA,0,2018-01-09 20:56:38,0,yi0R0Ugj_xUx_Nek0-_Qig,5.0,Went in for a lunch. Steak sandwich was delici...,0,dacAIZ6fTM6mqwW5uxkskg
4,,b1b1eb3uo-w561D0ZfCEiQ,0,2018-01-30 23:07:38,0,11a8sVPMUFtaC7_ABRkmtw,1.0,Today was my second out of three sessions I ha...,7,ssoyf2_x0EQMed6fgHeMyQ


In [29]:
print (review_data.select('review_id').distinct().count(), review_data.count())

87062 87062


Here, the count of distinct review_id is also the same as row count, we could also safely defer that review_id is unique and could save as primary key in the table

And we are not going to build relationship with user table so user_id would be skipped at this step

In [30]:
review_data.select('funny').distinct().count()

50

## Staging Table

##### load Files as staging tables into Redshift

In [41]:
import configparser
import psycopg2
import numpy as np
from sql_queries import drop_table_queries, create_staging_table_queries, copy_table_queries, staging_business_copy,create_fact_dimension_table_queries, insert_table_queries 

import necessary packages and queries first

In [34]:
#Drop all tables before we go ahead
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()
drop_tables(cur, conn)

In [35]:
def create_staging_tables(cur, conn):
    for query in create_staging_table_queries:
        cur.execute(query)
        conn.commit()
create_staging_tables(cur, conn)
                      
def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()
load_staging_tables(cur, conn)        

Staging tables are successfully loaded into database!

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

For my design, I would split business into three tables - Business_location, category and operation, review into time and context. 

- Business_location: business_id, name, city, address, latitude, longitude, postal_code, state 
- Business_Category: categoty_id, category_name
- Business_operation: business_id, review_count, stars, is_open

- Review_Text: review_id, business_id, cool, funny, stars, text, useful
- Review_Date: date, hour, day, week, month, year, weekday

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [38]:
def create_fact_dimension_tables(cur, conn):
    for query in create_fact_dimension_table_queries:
        cur.execute(query)
        conn.commit()
create_fact_dimension_tables(cur, conn)

In [39]:
def insert_tables(cur, conn):
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit() 
insert_tables(cur, conn)

All values have been inserted into tables!

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [50]:
cur.execute("""
select count(*) from business_location where business_id is null
""")
np.array(cur.fetchall())

array([[0]])

In [51]:
cur.execute("""
select count(*) from business_operation where business_id is null
""")
np.array(cur.fetchall())

array([[0]])

In [52]:
cur.execute("""
select count(*) from review_date where date is null
""")
np.array(cur.fetchall())

array([[0]])

In [53]:
cur.execute("""
select count(*) from review_text where review_id is null
""")
np.array(cur.fetchall())

array([[0]])

In [48]:
cur.execute("""
select count(*) from business_staging
""")
np.array(cur.fetchall())

array([[192609]])

No unnecessary null values!

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

The data is from Kaggle open database, which could be used for various purposes, from Natural Language Processing to data visuilization, etc.
Original datasets contain five different files and for building data model purpose, I only include two files here. If anyone who is interested in analysis, feel free to download and have a look!

Business data is from Yelp_Business file:

- Business_location: business_id, name, city, address, latitude, longitude, postal_code, state 
- Business_Category: categoty_id, category_name
- Business_operation: business_id, review_count, stars, is_open

Review data is from Yelp_Riview file:

- Review_Text: review_id, business_id, cool, funny, stars, text, useful
- Review_Date: date, hour, day, week, month, year, weekday

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.