<a href="https://colab.research.google.com/github/William9923/AutoML-Package/blob/master/notebooks/ETLPipeline%F0%9F%A6%84_05_03_2021.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL Pipeline Design

Goal : 
* Creating PoC for ETL Pipeline using pandas & pygrametl
* Extract -> check -> clean -> transform -> load into staging area

Done : 
* Date_dim
* Time_dim

Todo :
* User_dim
* Seller_dim
* Payment_dim
* Product_dim
* Feedback_dim
* Fact_OrderItems

Requirements : 
* pygrametl - 2.6
*pandas


In [2]:
#ignore warnings
import warnings
warnings.filterwarnings('ignore')

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import pandas as pd

# Load data

print("Loading Dataset ...")

data_folder = "data/raw/"

user = pd.read_csv(data_folder + "user_dataset.csv")
order = pd.read_csv(data_folder + "order_dataset.csv")
order_item = pd.read_csv(data_folder + "order_item_dataset.csv")
payment = pd.read_csv(data_folder + "payment_dataset.csv")
products = pd.read_csv(data_folder + "products_dataset.csv")
seller = pd.read_csv(data_folder + "seller_dataset.csv")
feedback = pd.read_csv(data_folder + "feedback_dataset.csv")

print("Finish...")

Loading Dataset ...
Finish...


## ETL Implementation on Staging

In [4]:
from dotenv import load_dotenv
import os
load_dotenv()

# OR, the same with increased verbosity
load_dotenv(verbose=True)

DBNAME= os.getenv("DBNAME")
HOSTNAME= os.getenv("HOSTNAME")
USER= os.getenv("USER")
PASS= os.getenv("PASS")
STGDBNAME = os.getenv("STAGING_DBNAME")

True

True

In [59]:
import psycopg2
import pandas as pd
from sqlalchemy import create_engine

# Create an engine instance
alchemyEngine = create_engine(f'postgresql+psycopg2://{USER}:{PASS}@{HOSTNAME}/{STGDBNAME}', pool_recycle=3600);

# Connect to PostgreSQL server
conn  = alchemyEngine.connect();

## Date_dim ETL

### Utility Function

In [60]:
def create_date_dimension(start='2015-01-01', end='2020-12-31'):
  weekday = {
      0:"Monday",
      1:"Tuesday",
      2:"Wednesday",
      3:"Thursday",
      4:"Friday",
      5:"Saturday",
      6:"Sunday" 
  }

  df = pd.DataFrame({"date": pd.date_range(start, end)})
  df["day_name"] = df.date.dt.dayofweek.map(weekday)
  df["day"] = df.date.dt.day
  df["month"] = df.date.dt.month
  df["week"] = df.date.dt.weekofyear
  df["quarter"] = df.date.dt.quarter
  df["year"] = df.date.dt.year
  df["isWeekDay"] = (df.date.dt.weekday > 4) & (df.date.dt.weekday <= 6)
  df.insert(0, 'date_id', (df.year.astype(str) + df.month.astype(str).str.zfill(2) + df.day.astype(str).str.zfill(2)).astype(int))
  return df

In [70]:
dates = create_date_dimension()
dates.drop(['date'], axis=1).to_sql('Date_dim', con=conn, if_exists="append", index=False,method="multi")

## Geo_dim ETL
---
Obtained from : 
- user_dataset
- seller_dataset

In [71]:
# User
geolocation_user = user.drop(['user_name'], axis=1)
geolocation_user.drop_duplicates(subset=['customer_zip_code'], keep="last", 
                                 inplace=True)

In [73]:
# Seller
geolocation_seller = seller.drop(['seller_id'], axis=1)
geolocation_seller.drop_duplicates(subset=['seller_zip_code'], keep="last", 
                                   inplace=True)

In [77]:
geolocation_user.rename(columns= 
                        {'customer_zip_code' : 'zip_code', 
                         'customer_city' : 'city', 
                         'customer_state' : 'state'}
                        , inplace=True)
geolocation_seller.rename(columns= 
                        {'seller_zip_code' : 'zip_code', 
                         'seller_city' : 'city', 
                         'seller_state' : 'state'}
                        , inplace=True)
geolocation = pd.concat([geolocation_user, geolocation_seller])

In [80]:
# Data checking
geolocation.drop_duplicates(subset=['zip_code'], keep="last", inplace=True)

In [81]:
geolocation.head()

Unnamed: 0,zip_code,city,state
58,85808,KABUPATEN BREBES,JAWA TENGAH
85,2175,KOTA TANGERANG,BANTEN
103,62016,KOTA JAKARTA SELATAN,DKI JAKARTA
175,4723,KOTA TANGERANG,BANTEN
176,65430,KOTA SURABAYA,JAWA TIMUR


In [82]:
geolocation.to_sql('Geo_dim', con=conn, if_exists="append", index=False,
                   method="multi")

## Question (Customer Subject)
---
1. Who is the most valuable customer and what is their lifetime value?
2. Who is the repeated customer ? What is the characteristic of the repeated customer

Fact Table Design : 
* user_name (identifier)
* total_spending (metric)
* num_order(metric)
* num_order_place (metric)
* lifetime_feedback_score (metric)
* last_order (metric)

### Customer Fact Table

In [None]:
# create base fact
cust_fact = pd.DataFrame({
    'user_name' : user.user_name
})
cust_fact.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 96096 entries, 0 to 96095
Data columns (total 1 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   user_name  96096 non-null  object
dtypes: object(1)
memory usage: 750.9+ KB


In [None]:
order_payment = pd.merge(order, payment, how="left", left_on='order_id', right_on='order_id')
total_spending = order_payment[['user_name', 'payment_value']].groupby(by="user_name").mean()

In [None]:
num_ordering = order.user_name.value_counts().rename_axis('user_name').to_frame('counts')
num_ordering

Unnamed: 0_level_0,counts
user_name,Unnamed: 1_level_1
8d50f5eadf50201ccdcedfb9e2ac8455,17
3e43e6105506432c953e165fb2acf44c,9
1b6c7548a2a1f9037c1fd3ddfed95f33,7
ca77025e7201e3b30c44b472ff346268,7
6469f99c1f9dfae7733b25662e7f1782,7
...,...
778a607e77be028910e557f6f608611d,1
4a4978eb43b834cdf1b044281ac943fd,1
6ff1fffdc0177147be8352ef9c3d56ea,1
302754b2edd8002c9916b2e3f119bc67,1


In [None]:
num_ordering_place = order[['user_name', 'send_city']].groupby(by="user_name").count()
num_ordering_place

Unnamed: 0_level_0,send_city
user_name,Unnamed: 1_level_1
0000366f3b9a7992bf8c76cfdf3221e2,1
0000b849f77a49e4a4ce2b2a4ca5be3f,1
0000f46a3911fa3c0805444483337064,1
0000f6ccb0745a6a4b88665a16c9f078,1
0004aac84e0df4da2b147fca70cf8255,1
...,...
fffcf5a5ff07b0908bd4e2dbc735a684,1
fffea47cd6d3cc0a88bd621562a9d061,1
ffff371b4d645b6ecea244b27531430a,1
ffff5962728ec6157033ef9805bacc48,1


In [None]:
order_feedback = pd.merge(order, feedback, how="left", left_on='order_id', right_on='order_id')
feedback_user_score = order_feedback[['user_name', 'feedback_score']].groupby(by="user_name").mean()

In [None]:
cust_fact_updated = pd.merge(cust_fact, total_spending, how="left", left_on="user_name", right_on="user_name")
cust_fact_updated = pd.merge(cust_fact_updated, num_ordering, how="left", left_on="user_name", right_on="user_name")
cust_fact_updated = pd.merge(cust_fact_updated, num_ordering_place, how="left", left_on="user_name", right_on="user_name")
cust_fact_updated = pd.merge(cust_fact_updated, feedback_user_score, how="left", left_on="user_name", right_on="user_name")
cust_fact_updated.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 96096 entries, 0 to 96095
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   user_name       96096 non-null  object 
 1   payment_value   96095 non-null  float64
 2   counts          96096 non-null  int64  
 3   send_city       96096 non-null  int64  
 4   feedback_score  96096 non-null  float64
dtypes: float64(2), int64(2), object(1)
memory usage: 4.4+ MB


In [None]:
cust_fact_updated.head(10)

Unnamed: 0,user_name,payment_value,counts,send_city,feedback_score
0,861eff4711a542e4b93843c6dd7febb0,146870.0,1,1,4.0
1,290c77bc529b7ac935b93aa66c333dc3,335480.0,1,1,5.0
2,060e732b5b29e8181a18229c7b0b2b5e,157730.0,1,1,5.0
3,259dac757896d24d7702b9acbbff3f3c,173300.0,1,1,5.0
4,345ecd01c38d18a9036ed96c73b8d066,252250.0,1,1,5.0
5,addec96d2e059c80c30fe6871d30d177,22770.0,1,1,5.0
6,57b2a98a409812fe9618067b6b8ebe4f,36010.0,1,1,3.0
7,9afe194fb833f79e300e37e580171f22,122470.0,1,1,5.0
8,2a7745e1ed516b289ed9b29c7d0539a5,40400.0,1,1,5.0
9,2a46fb94aef5cbeeb850418118cee090,153130.0,1,1,5.0


## Question (Product Subject)
1. Which product is the most popular ? 
2. What product type that associate each other best? 
3. How long a product is being shipped?

Fact Table Design : 
* product_id (identifier
* num_bought
* avg_price_per_unit
* avg_shipping_time
* avg_rating
* num_return

### Product Fact Table

In [None]:
# create base fact
prod_fact = pd.DataFrame({
    'product_id' : products.product_id
})
prod_fact.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32951 entries, 0 to 32950
Data columns (total 1 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   product_id  32951 non-null  object
dtypes: object(1)
memory usage: 257.6+ KB


In [None]:
num_bought = order_item[['order_item_id','product_id']].groupby('product_id').count()
num_bought.rename(columns={'order_item_id':'num_bought'}, inplace=True)
num_bought.head()

Unnamed: 0_level_0,num_bought
product_id,Unnamed: 1_level_1
00066f42aeeb9f3007548bb9d3f33c38,1
00088930e925c41fd95ebfe695fd2655,1
0009406fd7479715e4bef61dd91f2462,1
000b8f95fcb9e0096488278317764d19,2
000d9be29b5207b54e86aa1b1ac54872,1


In [None]:
prod_price = order_item[['product_id', 'price']].groupby('product_id').mean()
prod_price.rename(columns={'price':'avg_price'}, inplace=True)
prod_price.head()

Unnamed: 0_level_0,avg_price
product_id,Unnamed: 1_level_1
00066f42aeeb9f3007548bb9d3f33c38,101650.0
00088930e925c41fd95ebfe695fd2655,129900.0
0009406fd7479715e4bef61dd91f2462,229000.0
000b8f95fcb9e0096488278317764d19,58900.0
000d9be29b5207b54e86aa1b1ac54872,199000.0


In [None]:
from datetime import datetime
dateparse = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S')

merged = pd.merge(order, order_item, how="right", left_on='order_id', right_on='order_id')

merged['isReturned'] = merged['delivered_date'].isnull()
merged['order_date_parsed'] = merged.apply(
    lambda x : dateparse(x['order_date']),
    axis=1
)

merged['delivered_date_parsed'] = merged.apply(
    lambda x : x['delivered_date'] if pd.isnull(x['delivered_date']) else dateparse(x['delivered_date']),
    axis=1
)
merged['estimated_time_delivery_parsed'] = merged.apply(
    lambda x : dateparse(x['estimated_time_delivery']),
    axis=1
)

In [None]:
import numpy as np
merged['order_duration'] = merged.apply(
    lambda x : x['delivered_date_parsed'] - x['order_date_parsed'] if not x['isReturned'] else 0,
    axis = 1
)

merged['order_duration'] = merged['order_duration'].astype('timedelta64[D]') / np.timedelta64(1, 'h')

In [None]:
order_feedback = pd.merge(merged, feedback, how="left", left_on='order_id', right_on='order_id')
feedback_product_score = order_feedback[['product_id', 'feedback_score']].groupby(by="product_id").mean()

In [None]:
aggr_merged = merged[['product_id','order_duration', 'isReturned']].groupby(by='product_id') \
  .agg({'order_duration' : 'mean', 'isReturned' : 'sum'}) \
  .rename(columns={'order_duration':'avg_shipping_time', 'isReturned' : 'num_return' })

In [None]:
prod_fact_updated = pd.merge(prod_fact, num_bought, how="left", left_on="product_id", right_on="product_id")
prod_fact_updated = pd.merge(prod_fact_updated, prod_price, how="left", left_on="product_id", right_on="product_id")
prod_fact_updated = pd.merge(prod_fact_updated, aggr_merged, how="left", left_on="product_id", right_on="product_id")
prod_fact_updated = pd.merge(prod_fact_updated, feedback_product_score, how="left", left_on="product_id", right_on="product_id")
prod_fact_updated.head()

Unnamed: 0,product_id,num_bought,avg_price,avg_shipping_time,num_return,feedback_score
0,1e9e8ef04dbcff4541ed26657ea517e5,1,10910.0,72.0,0,5.0
1,3aa071139cb16b67ca9e5dea641aaa2f,1,248000.0,360.0,0,5.0
2,96bd76ec8810374ed1b65e291975717f,1,79800.0,48.0,0,5.0
3,cef67bcfe19066a932b7673e239eb23d,1,112300.0,72.0,0,1.0
4,9dc1a7de274444849c219cff195d0b71,1,37900.0,48.0,0,5.0


## Question (Seller Subject)
1. Who is the most popular seller?
2. Who is the best rated seller?
3. How many item a seller sell (lifetime value) ? 

Fact Table Design:
* seller_id (identifier)
* num_sell 
* avg_approve_time
* total_sale
* total_cancel

### Seller Fact Table

In [None]:
# create base fact
seller_fact = pd.DataFrame({
    'seller_id' : seller.seller_id
})
seller_fact.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3095 entries, 0 to 3094
Data columns (total 1 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   seller_id  3095 non-null   object
dtypes: object(1)
memory usage: 24.3+ KB


In [None]:
num_sell = order_item[['product_id', 'seller_id' ]].groupby('seller_id').count().rename(columns=({'product_id' : 'num_sell'}))
num_sell.head()

Unnamed: 0_level_0,num_sell
seller_id,Unnamed: 1_level_1
0015a82c2db000af6aaaf3ae2ecb0532,3
001cca7ae9ae17fb1caed9dfb1094831,239
001e6ad469a905060d959994f1b41e4f,1
002100f778ceb8431b7a1020ff7ab48f,55
003554e2dce176b5555353e4f3555ac8,1


In [None]:
from datetime import datetime
import numpy as np
dateparse = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S')

merged = pd.merge(order, order_item, how="right", left_on='order_id', right_on='order_id')

merged['isCanceled'] = merged['order_approved_date'].isnull()

merged['order_date_parsed'] = merged.apply(
    lambda x : dateparse(x['order_date']),
    axis=1
)

merged['order_approved_date_parsed'] = merged.apply(
    lambda x : x['order_approved_date'] if pd.isnull(x['order_approved_date']) else dateparse(x['order_approved_date']),
    axis=1
)

merged['approve_time'] = merged.apply(
    lambda x : x['order_approved_date_parsed'] - x['order_date_parsed'] if not x['isCanceled'] else 0,
    axis = 1
)

merged['approve_time'] = merged['approve_time'].astype('timedelta64[D]') / np.timedelta64(1, 'h')

In [None]:
aggr_merged = merged[['seller_id','approve_time', 'isCanceled', 'price']].groupby(by='seller_id') \
  .agg({'approve_time' : 'mean', 'isCanceled' : 'sum', 'price' : 'sum'}) \
  .rename(columns={'approve_time':'avg_approve_time', 'isCanceled' : 'num_cancel', 'price' : 'total_sales' })

aggr_merged.head()

Unnamed: 0_level_0,avg_approve_time,num_cancel,total_sales
seller_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0015a82c2db000af6aaaf3ae2ecb0532,8.0,0,2685000.0
001cca7ae9ae17fb1caed9dfb1094831,6.828452,0,25080030.0
001e6ad469a905060d959994f1b41e4f,0.0,0,250000.0
002100f778ceb8431b7a1020ff7ab48f,17.454545,0,1234500.0
003554e2dce176b5555353e4f3555ac8,0.0,0,120000.0


In [None]:
seller_fact_updated = pd.merge(seller_fact, num_sell, how="left", left_on="seller_id", right_on="seller_id")
seller_fact_updated = pd.merge(seller_fact_updated, aggr_merged, how="left", left_on="seller_id", right_on="seller_id")
seller_fact_updated.head()

Unnamed: 0,seller_id,num_sell,avg_approve_time,num_cancel,total_sales
0,3442f8959a84dea7ee197c632cb2df15,3,0.0,0,218700.0
1,d1b65fc7debc3361ea86b5f14c68d2e2,41,7.609756,0,11703070.0
2,ce3ad9de960102d0677a81f5d0bb7b2d,1,0.0,0,158000.0
3,c0f3eea2e14555b6faeea3dd58c1b1c3,1,0.0,0,79990.0
4,51a04a8a6bdcb23deccc82b0b80742cf,1,0.0,0,167990.0


## Datamart Output

In [None]:
cust_fact_updated.to_csv("data/processed/customer_fact.csv", index=False)
prod_fact_updated.to_csv("data/processed/product_fact.csv", index=False)
seller_fact_updated.to_csv("data/processed/seller_fact.csv", index=False)