### Data Engineering Capstone

### Project Summary

* This project aims to build datawarehouse (star schema) consists fact table and dimensions tables to be able to make far analysis and reporting and dashboards. 
    * so we can answer a questions like which product category has a large number of orders due to specified year?
    * how reviews can affect in product selling? etc..
* I've userd Ecommerce data by Olist store what I've liked that is real commercial data, has a 100k orders and also a wide range of time for better analysis from 2016 to 2018.
* Data URL: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?resource=download
* Using Apache spark for processing and handling data cleaning and writing data final output in a files (parquet).

### Data dictionary:- 

* olist_customers_dataset.csv --> This dataset has information about the customer and its location
* olist_order_items_dataset.csv --> This dataset includes data about the items purchased within each order.
* olist_order_payments_dataset.csv --> This dataset includes data about the orders payment options.
* olist_order_reviews_dataset.csv --> This dataset includes data about the reviews made by the customers.
* olist_orders_dataset.csv --> This is the core dataset.
* olist_products_dataset.csv --> This dataset includes data about the products sold by Olist.
* olist_sellers_dataset.csv --> This dataset includes data about the sellers that fulfilled orders made at Olist.
* product_category_name_translation.csv --> Category Name Translation to ENG

In [11]:
## Imports
import psycopg2
import psycopg2
import configparser
import os
import pandas as pd 
from pyspark.sql import SparkSession
from helper_functions import get_number_of_nulls_in_df,drop_columns_from_df,check_duplicate_of_column,add_surrogate_key_to_df

### Creating Spark Session

In [2]:
spark = SparkSession.builder.appName("processing_Ecommerce_data").config("spark.sql.caseSensitive", "True").getOrCreate()
spark

In [3]:

config = configparser.ConfigParser()
config.read('config.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['aws']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['aws']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark



In [5]:
# spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", config['aws']['AWS_ACCESS_KEY_ID'])
# spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", config['aws']['AWS_SECRET_ACCESS_KEY'])

### Defining Data paths

In [7]:
customers = spark.read.csv('data//olist_customers_dataset.csv',header=True)
orderItem = spark.read.csv('data//olist_order_items_dataset.csv',header=True)
orderPayments = spark.read.csv('data//olist_order_payments_dataset.csv',header=True)
orderReview = spark.read.csv('data//olist_order_reviews_dataset.csv',header=True)
orders = spark.read.csv('data//olist_orders_dataset.csv',header=True)
products = spark.read.csv('data//olist_products_dataset.csv',header=True)
sellers = spark.read.csv('data//olist_sellers_dataset.csv',header=True)
translations = spark.read.csv('data//product_category_name_translation.csv',header=True)

In [9]:
os.getcwd()

'e:\\Data Engineering NanoDegree Udacity\\Git\\dataEngineering_nanoDegree\\data_engineer_capstone'

___
## Step 1: Scope the Project and Gather Data
___

First we've to explore the datasets that we have, all data coming from kaggle.

#### Customers
___

In [17]:
customers.limit(5).toPandas()

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP


<b><i>customers data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">customer_id</td><td class="tg-0pky">customer id</td>
 <tr><td class="tg-0pky">customer_unique_id</td><td class="tg-0pky">unique customer id accross all data</td>
 <tr><td class="tg-0pky">customer_zip_code_prefix</td><td class="tg-0pky">zip code</td>
 <tr><td class="tg-0pky">customer_city</td><td class="tg-0pky">customer city</td>
 <tr><td class="tg-0pky">customer_state</td><td class="tg-0pky">customer state</td>
</table>

### Order Item
___

In [18]:
orderItem.limit(5).toPandas()

Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87
3,00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15 10:10:18,12.99,12.79
4,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51,199.9,18.14


<b><i>Order items data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">order_id</td><td class="tg-0pky">order id</td>
 <tr><td class="tg-0pky">order_item_id</td><td class="tg-0pky">unique id for each item in the order</td>
 <tr><td class="tg-0pky">product_id</td><td class="tg-0pky">product unique identifier</td>
 <tr><td class="tg-0pky">seller_id</td><td class="tg-0pky">seller unique identifier</td>
 <tr><td class="tg-0pky">shipping_limit_date</td><td class="tg-0pky">seller shipping limit date</td>
 <tr><td class="tg-0pky">price</td><td class="tg-0pky">item price</td>
 <tr><td class="tg-0pky">freight_value</td><td class="tg-0pky">freight</td>
</table>

### Order payments
___

In [19]:
orderPayments.limit(5).toPandas()

Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
3,ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
4,42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45


<b><i>Order payments data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">order_id</td><td class="tg-0pky">order id</td>
 <tr><td class="tg-0pky">payment_sequential</td><td class="tg-0pky">sequention num of payment</td>
 <tr><td class="tg-0pky">payment_installments</td><td class="tg-0pky">number of months to installment</td>
 <tr><td class="tg-0pky">payment_value</td><td class="tg-0pky">Value or amount of payment</td>
</table>

### Order Reviews data
___

In [20]:
orderReview.limit(5).toPandas()

Unnamed: 0,review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
0,7bc2406110b926393aa56f80a40eba40,73fc7af87114b39712e6da79b0a377eb,4,,,2018-01-18 00:00:00,2018-01-18 21:46:59
1,80e641a11e56f04c1ad469d5645fdfde,a548910a1c6147796b98fdf73dbeba33,5,,,2018-03-10 00:00:00,2018-03-11 03:05:13
2,228ce5500dc1d8e020d8d1322874b6f0,f9e4b658b201a9f2ecdecbb34bed034b,5,,,2018-02-17 00:00:00,2018-02-18 14:36:24
3,e64fb393e7b32834bb789ff8bb30750e,658677c97b385a9be170737859d3511b,5,,Recebi bem antes do prazo estipulado.,2017-04-21 00:00:00,2017-04-21 22:02:06
4,f7c4243c7fe1938f181bec41a392bdeb,8e6bfb81e283fa7e4f11123a3fb894f1,5,,Parabéns lojas lannister adorei comprar pela I...,2018-03-01 00:00:00,2018-03-02 10:26:53


<b><i>Order Reviews data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">review_id</td><td class="tg-0pky">unique review identifier</td>
 <tr><td class="tg-0pky">order_id</td><td class="tg-0pky">unique order identifier</td>
 <tr><td class="tg-0pky">review_score</td><td class="tg-0pky"> 1 to 5 given satisfaction survey</td>
 <tr><td class="tg-0pky">review_comment_title</td><td class="tg-0pky">Comment title</td>
 <tr><td class="tg-0pky">review_comment_message</td><td class="tg-0pky">Comment message</td>
 <tr><td class="tg-0pky">review_creation_date</td><td class="tg-0pky">date satisfaction survey sent to customer</td>
 <tr><td class="tg-0pky">review_answer_timestamp</td><td class="tg-0pky">satisfaction survey answer timestamp</td>
</table>

### Order data
___

In [21]:
orders.limit(5).toPandas()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00


<b><i>Order data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">order_id</td><td class="tg-0pky">unique identifier of the order.</td>
 <tr><td class="tg-0pky">customer_id</td><td class="tg-0pky">Each order has a unique customer_id</td>
 <tr><td class="tg-0pky">order_status</td><td class="tg-0pky">Reference to the order status (delivered, shipped, etc).</td>
 <tr><td class="tg-0pky">order_purchase_timestamp</td><td class="tg-0pky">Shows the purchase timestamp.</td>
 <tr><td class="tg-0pky">order_approved_at</td><td class="tg-0pky"> payment approval timestamp.</td>
 <tr><td class="tg-0pky">order_delivered_carrier_date</td><td class="tg-0pky">order posting timestamp</td>
 <tr><td class="tg-0pky">order_delivered_customer_date</td><td class="tg-0pky">Shows the actual order delivery date to the customer</td>
 <tr><td class="tg-0pky">order_estimated_delivery_date</td><td class="tg-0pky">Shows the estimated delivery date</td>
</table>

### products
___

In [22]:
products.limit(5).toPandas()

Unnamed: 0,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0,1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14
1,3aa071139cb16b67ca9e5dea641aaa2f,artes,44,276,1,1000,30,18,20
2,96bd76ec8810374ed1b65e291975717f,esporte_lazer,46,250,1,154,18,9,15
3,cef67bcfe19066a932b7673e239eb23d,bebes,27,261,1,371,26,4,26
4,9dc1a7de274444849c219cff195d0b71,utilidades_domesticas,37,402,4,625,20,17,13


<b><i>products data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">product_id</td><td class="tg-0pky">unique product identifier</td>
 <tr><td class="tg-0pky">product_category_name</td><td class="tg-0pky">root category of product</td>
 <tr><td class="tg-0pky">product_name_lenght</td><td class="tg-0pky">number of characters extracted from the product name</td>
 <tr><td class="tg-0pky">product_description_lenght</td><td class="tg-0pky">number of characters extracted from the product description.</td>
 <tr><td class="tg-0pky">product_photos_qty</td><td class="tg-0pky">number of product published photos</td>
 <tr><td class="tg-0pky">product_weight_g</td><td class="tg-0pky">product weight measured in grams.</td>
 <tr><td class="tg-0pky">product_length_cm</td><td class="tg-0pky">product length measured in centimeters.</td>
 <tr><td class="tg-0pky">product_height_cm</td><td class="tg-0pky">product height measured in centimeters.</td>
 <tr><td class="tg-0pky">product_width_cm</td><td class="tg-0pky">product width measured in centimeters.</td>
</table>

### Sellers data
___

In [23]:
sellers.limit(5).toPandas()

Unnamed: 0,seller_id,seller_zip_code_prefix,seller_city,seller_state
0,3442f8959a84dea7ee197c632cb2df15,13023,campinas,SP
1,d1b65fc7debc3361ea86b5f14c68d2e2,13844,mogi guacu,SP
2,ce3ad9de960102d0677a81f5d0bb7b2d,20031,rio de janeiro,RJ
3,c0f3eea2e14555b6faeea3dd58c1b1c3,4195,sao paulo,SP
4,51a04a8a6bdcb23deccc82b0b80742cf,12914,braganca paulista,SP


<b><i>sellers data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">seller_id</td><td class="tg-0pky">seller unique identifier</td>
 <tr><td class="tg-0pky">seller_zip_code_prefix</td><td class="tg-0pky">first 5 digits of seller zip code</td>
 <tr><td class="tg-0pky">seller_city</td><td class="tg-0pky">seller city name</td>
 <tr><td class="tg-0pky">seller_state</td><td class="tg-0pky">seller state</td>
</table>

### Translations lookup
___

In [24]:
translations.limit(5).toPandas()

Unnamed: 0,product_category_name,product_category_name_english
0,beleza_saude,health_beauty
1,informatica_acessorios,computers_accessories
2,automotivo,auto
3,cama_mesa_banho,bed_bath_table
4,moveis_decoracao,furniture_decor


<b><i>translations lookup data</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Column</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">product_category_name</td><td class="tg-0pky">category name in Portuguese</td>
 <tr><td class="tg-0pky">product_category_name_english</td><td class="tg-0pky">category name in English</td>
</table>

___
## Step 2: Exploration of datasets (preprocessing)
___

In [25]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(customers)
def_result.show()


+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|      0.0 %|             0.0 %|                   0.0 %|        0.0 %|         0.0 %|
+-----------+------------------+------------------------+-------------+--------------+



As we see in customers data has no nulls so we don't need to drop any colmns

In [26]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(orderItem)
def_result.show()


+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|   0.0 %|        0.0 %|     0.0 %|    0.0 %|              0.0 %|0.0 %|        0.0 %|
+--------+-------------+----------+---------+-------------------+-----+-------------+



As we see in orderItem data has no nulls so we don't need to drop any colmns

In [325]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(orderPayments)
def_result.show()


+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|   0.0 %|             0.0 %|       0.0 %|               0.0 %|        0.0 %|
+--------+------------------+------------+--------------------+-------------+



 As we see in orderPayments data has no nulls so we don't need to drop any colmns

In [326]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(orderReview)
def_result.show()


+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|    0.0 %|  2.15 %|      2.28 %|             88.47 %|               60.56 %|              8.41 %|                 8.43 %|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



* Here we see that both review_comment_title and review_comment_message have significance level of null values 
* so both of the will be dropped in our data model 

In [327]:
## Dropping null colummns
columns_to_drop =  ["review_comment_title","review_comment_message"] 
orderReview = drop_columns_from_df(orderReview,columns_to_drop)


In [328]:
orderReview.show(5)

+--------------------+--------------------+------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5| 2018-03-10 00:00:00|    2018-03-11 03:05:13|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e...|           5| 2018-02-17 00:00:00|    2018-02-18 14:36:24|
|e64fb393e7b32834b...|658677c97b385a9be...|           5| 2017-04-21 00:00:00|    2017-04-21 22:02:06|
|f7c4243c7fe1938f1...|8e6bfb81e283fa7e4...|           5| 2018-03-01 00:00:00|    2018-03-02 10:26:53|
+--------------------+--------------------+------------+--------------------+-----------------------+
only showing top 5 rows



In [329]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(orders)
def_result.show()


+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|   0.0 %|      0.0 %|       0.0 %|                   0.0 %|           0.16 %|                      1.79 %|                       2.98 %|                        0.0 %|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



As we see in orders data nulls data but it not significant

In [330]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(products)
def_result.show()


+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|     0.0 %|               1.85 %|             1.85 %|                    1.85 %|            1.85 %|          0.01 %|           0.01 %|           0.01 %|          0.01 %|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



As we see in products data nulls data but it not significant

In [331]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(sellers)
def_result.show()


+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
|    0.0 %|                 0.0 %|      0.0 %|       0.0 %|
+---------+----------------------+-----------+------------+



As we see in sellers data has no nulls data 

In [332]:
## Getting percentage of null values in each column
def_result = get_number_of_nulls_in_df(translations)
def_result.show()


+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|                0.0 %|                        0.0 %|
+---------------------+-----------------------------+



As we see in translations data has no nulls data 

* **Finally we need to check the duplicates on the primary keys of tables**
* **If there are duplicates we drop duplicates**


In [333]:
customers, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(customers,"customer_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  False
number of dropped columns in df :-  0
number of records before drop in df :-  99441
number of records after drop in df :-  99441


In [334]:
orderItem, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(orderItem,"order_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  True
number of dropped columns in df :-  13984
number of records before drop in df :-  112650
number of records after drop in df :-  98666


In [335]:
orderPayments, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(orderPayments,"order_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  True
number of dropped columns in df :-  4446
number of records before drop in df :-  103886
number of records after drop in df :-  99440


In [336]:
orderReview, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(orderReview,"review_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  True
number of dropped columns in df :-  1204
number of records before drop in df :-  104162
number of records after drop in df :-  102958


In [337]:
orders, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(orders,"order_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  False
number of dropped columns in df :-  0
number of records before drop in df :-  99441
number of records after drop in df :-  99441


In [338]:
products, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(products,"product_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  False
number of dropped columns in df :-  0
number of records before drop in df :-  32951
number of records after drop in df :-  32951


In [339]:
sellers, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(sellers,"seller_id")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  False
number of dropped columns in df :-  0
number of records before drop in df :-  3095
number of records after drop in df :-  3095


In [340]:
translations, had_duplicates_flag,num_of_dropped_cols,counts_before_drop,counts_after_drop = check_duplicate_of_column(translations,"product_category_name")
print("Dataframe had duplicates flag :- ",had_duplicates_flag)
print("number of dropped columns in df :- ",num_of_dropped_cols)
print("number of records before drop in df :- ",counts_before_drop)
print("number of records after drop in df :- ",counts_after_drop)

Dataframe had duplicates flag :-  False
number of dropped columns in df :-  0
number of records before drop in df :-  71
number of records after drop in df :-  71


___

___
## Step 3: Define the Data Model
___


#### Star Schema model:

* Dimension tables:- 
    * customer_dim 
    * order_payments_dim    
    * order_review_dim
    * order_date_key
    * product_dim
    * seller_dm
    * order_item_dim
* Fact table:-     
    * order_trx_fact

<p align="center">
  <img src="images/ecommerce_data_model.png" />
</p>

___
### Step 4: Run ETL to Model the Data
___

#### Adding surrogate key to all dimensions table then doing the transformations on the fact table

In [341]:
## Creating customers dim
customers = add_surrogate_key_to_df(customers,"customer_key")
customers.show(5)

+------------+--------------------+--------------------+------------------------+-------------------+--------------+
|customer_key|         customer_id|  customer_unique_id|customer_zip_code_prefix|      customer_city|customer_state|
+------------+--------------------+--------------------+------------------------+-------------------+--------------+
|           0|00050bf6e01e69d5c...|e3cf594a99e810f58...|                   98700|               ijui|            RS|
|           1|000598caf2ef41174...|7e0516b486e92ed3f...|                   35540|           oliveira|            MG|
|           2|0013cd8e350a7cc76...|334fed5abcee3aa96...|                   03585|          sao paulo|            SP|
|           3|0015bc9fd2d539544...|490c854539b21598c...|                   12233|sao jose dos campos|            SP|
|           4|001f150aebb5d897f...|0f88eb431888ffb9d...|                   79031|       campo grande|            MS|
+------------+--------------------+--------------------+--------

In [342]:
## creating order Payments dim
orderPayments = add_surrogate_key_to_df(orderPayments,"payment_key")
orderPayments.show(4)

+-----------+--------------------+------------------+------------+--------------------+-------------+
|payment_key|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+-----------+--------------------+------------------+------------+--------------------+-------------+
|          0|00018f77f2f0320c5...|                 1| credit_card|                   3|       259.83|
|          1|00042b26cf59d7ce6...|                 1| credit_card|                   3|       218.04|
|          2|00054e8431b9d7675...|                 1| credit_card|                   1|        31.75|
|          3|0006ec9db01a64e59...|                 1| credit_card|                   4|        97.32|
+-----------+--------------------+------------------+------------+--------------------+-------------+
only showing top 4 rows



In [343]:
## creating order Review dim
orderReview = add_surrogate_key_to_df(orderReview,"review_key")
orderReview.show(3)

+----------+--------------------+-------------------+-------------------+--------------------+-----------------------+
|review_key|           review_id|           order_id|       review_score|review_creation_date|review_answer_timestamp|
+----------+--------------------+-------------------+-------------------+--------------------+-----------------------+
|         0| Ao se colocar o ...|           na CAPA |         os BOTOES | 2018-04-04 00:00:00|    2018-04-06 16:36:40|
|         1| Gostaria que a e...|2017-09-24 00:00:00|2017-09-25 11:04:39|                null|                   null|
|         2| Mas chegou dentr...|2018-08-10 00:00:00|2018-08-13 11:02:33|                null|                   null|
+----------+--------------------+-------------------+-------------------+--------------------+-----------------------+
only showing top 3 rows



In [344]:
## creating order date key
columns_to_select = ["order_id","order_purchase_timestamp","order_approved_at","order_delivered_carrier_date","order_delivered_customer_date","order_estimated_delivery_date"]	
order_date_key_df = orders.select(columns_to_select)
order_date_key_df = add_surrogate_key_to_df(order_date_key_df,"date_key")
order_date_key_df.show(3)

+--------+--------------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|date_key|            order_id|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+--------------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|       0|00018f77f2f0320c5...|     2017-04-26 10:53:06|2017-04-26 11:05:13|         2017-05-04 14:35:00|          2017-05-12 16:04:24|          2017-05-15 00:00:00|
|       1|00042b26cf59d7ce6...|     2017-02-04 13:57:51|2017-02-04 14:10:13|         2017-02-16 09:46:09|          2017-03-01 16:42:31|          2017-03-17 00:00:00|
|       2|00054e8431b9d7675...|     2017-12-10 11:53:48|2017-12-10 12:10:31|         2017-12-12 01:07:48|          2017-12-18 22:03:38|          2018-01-04 00:00:00|
+---

In [345]:
## creating products
products = add_surrogate_key_to_df(products,"product_key")
products.limit(3).toPandas()

Unnamed: 0,product_key,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0,0,00066f42aeeb9f3007548bb9d3f33c38,perfumaria,53,596,6,300,20,16,16
1,1,00088930e925c41fd95ebfe695fd2655,automotivo,56,752,4,1225,55,10,26
2,2,0011c512eb256aa0dbbb544d8dffcf6e,automotivo,58,177,1,100,16,15,16


In [346]:
## creating sellers dim
sellers = add_surrogate_key_to_df(sellers,"seller_key")
sellers.limit(3).toPandas()

Unnamed: 0,seller_key,seller_id,seller_zip_code_prefix,seller_city,seller_state
0,0,0015a82c2db000af6aaaf3ae2ecb0532,9080,santo andre,SP
1,1,001cca7ae9ae17fb1caed9dfb1094831,29156,cariacica,ES
2,2,001e6ad469a905060d959994f1b41e4f,24754,sao goncalo,RJ


In [347]:
## creating order item dim
cols_to_select  = ["order_id","order_item_id","product_id","seller_id","shipping_limit_date","price"]
order_item_df = orderItem.select(cols_to_select)

order_item_df = order_item_df.join(products.select("product_key","product_id"),on="product_id",how="inner")
order_item_df = order_item_df.join(sellers.select("seller_key","seller_id"),on="seller_id",how="inner")
order_item_df = add_surrogate_key_to_df(order_item_df,"order_item_key")
cols_to_select =  ["order_id","order_item_id","product_key","seller_key"]
order_item_dim_df = order_item_df.select(cols_to_select)

In [348]:
## Creating orders_trx_fact_df fact table

orders_trx_fact_df = orders.join(customers.select("customer_id","customer_key"),on="customer_id",how="inner")\
                            .select("order_id","customer_key","order_status")
orders_trx_fact_df = orders_trx_fact_df.join(orderPayments.select("order_id","payment_key"),on="order_id",how="inner")
orders_trx_fact_df = orders_trx_fact_df.join(orderReview.select("order_id","review_key","review_score"),on="order_id",how="inner")
orders_trx_fact_df = orders_trx_fact_df.join(order_date_key_df.select("order_id","date_key"),on="order_id",how="inner")
orders_trx_fact_df = orders_trx_fact_df.join(order_item_df.select("order_id","order_item_key","product_key","seller_key","price","shipping_limit_date"),on="order_id",how="inner")
orders_trx_fact_df = add_surrogate_key_to_df(orders_trx_fact_df,"trx_key")

cols_to_select = ["trx_key","customer_key","payment_key","review_key","product_key","seller_key","order_item_key",
                    "date_key","price","shipping_limit_date","review_score","order_status"]
                    
orders_trx_fact_df  = orders_trx_fact_df.select(cols_to_select)   
orders_trx_fact_df.limit(5).toPandas()                 


Unnamed: 0,trx_key,customer_key,payment_key,review_key,product_key,seller_key,order_item_key,date_key,price,shipping_limit_date,review_score,order_status
0,0,9640,5483,42949686297,22062,1044,0,21644,239.9,2017-05-03 11:05:13,4,delivered
1,1,8589960862,5629,42949673700,3913,1719,1,26914,19.9,2017-12-14 12:10:31,4,delivered
2,2,17179875141,4871,25769803994,20828,87,2,26843,144.0,2018-05-16 20:57:03,1,delivered
3,3,17179872654,2704,17179877404,22394,614,3,20772,39.9,2018-08-17 15:04:25,1,canceled
4,4,7066,1135,34359741493,13370,665,4,20222,159.0,2018-03-15 19:28:51,1,delivered


In [349]:
orders_trx_fact_df.count()

97684

In [None]:
output_data =config['s3']['output_path']

In [None]:
customers.write.parquet(output_data+'output_capstone/customers.parquet', 'overwrite')
orderPayments.write.parquet(output_data+'output_capstone/orderPayments.parquet', 'overwrite')
orderReview.write.parquet(output_data+'output_capstone/orderReview.parquet', 'overwrite')
order_date_key_df.write.parquet(output_data+'output_capstone/order_date_key_df.parquet', 'overwrite')
products.write.parquet(output_data+'output_capstone/products.parquet', 'overwrite')
sellers.write.parquet(output_data+'output_capstone/sellers.parquet', 'overwrite')
order_item_dim_df.write.parquet(output_data+'output_capstone/order_item_dim_df.parquet', 'overwrite')
orders_trx_fact_df.write.parquet(output_data+'output_capstone/orders_trx_fact_df.parquet', 'overwrite')

## Data Quality

In [None]:
# Connect to redshift cluster directly


conn = psycopg2.connect(
    host='',
    user='',
    port='',
    password='',
    dbname='')

In [None]:
cur = conn.cursor()

In [None]:
cur.execute("SELECT COUNT(*) FROM customer_dim")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  customer_dim")
#######################################################
cur.execute("SELECT COUNT(*) FROM order_payments_dim")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  order_payments_dim")
#######################################################
cur.execute("SELECT COUNT(*) FROM order_review_dim")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  order_review_dim")
#######################################################
cur.execute("SELECT COUNT(*) FROM order_date_key_dim")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  order_date_key_dim")
#######################################################
cur.execute("SELECT COUNT(*) FROM products_dim")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  products_dim")
#######################################################
cur.execute("SELECT COUNT(*) FROM seller")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  seller")
#######################################################
cur.execute("SELECT COUNT(*) FROM order_item")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  order_item")
#######################################################
cur.execute("SELECT COUNT(*) FROM orders_trx_fact")
conn.commit()
if cur.rowcount < 1:
    print("Empty table :-  orders_trx_fact")

#### Step 5: Complete Project Write Up


* I've used mainly Pyspark for processing so we can able to scale, and distribute the processing on multiple nodes (dynamically) using the configurations
with larger data we can scale up machines, Redshift for DWH, writting final output files to S3 AWS so we can load it into the tables on redshift.
* data is transactional so it should be daily ETL jobs running We can use Apache Airflow to schedule the jobs every 24H to run taking the file and process it then load in Redshift table, here we should partition by day so we can have history.
What If?

 * The data was increased by 100x: the processing is using pyspark so we can increase number of nodes (instances) which by means scalling the cluster up.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: Using apache Airflow to do the ETL pipelines then we can create a views so it could be auto refreshed on the tables on RedShift
 * The database needed to be accessed by 100+ people: by using scalable datawarehouse like Redshift so we can scale and serve more clients.