# Extract Transfrom Load(ETL) Pipeline 

In [6]:
import pandas as pd
import matplotlib.pyplot as plt

pd.options.mode.chained_assignment = None

## 01 Data Extraction

In [18]:
#Get datasets
holidays=pd.read_csv('data/holidays_events.csv')
oil=pd.read_csv('data/oil.csv')
stores=pd.read_csv('data/stores.csv')
test=pd.read_csv('data/test.csv')
train=pd.read_csv('data/train.csv')
transactions=pd.read_csv('data/transactions.csv')
predictions=pd.read_csv('results/predictions.csv')

In [3]:
holidays.head(2)

Unnamed: 0,date,type,locale,locale_name,description,transferred
0,2012-03-02,Holiday,Local,Manta,Fundacion de Manta,False
1,2012-04-01,Holiday,Regional,Cotopaxi,Provincializacion de Cotopaxi,False


In [5]:
stores.head(2)

Unnamed: 0,store_nbr,city,state,type,cluster
0,1,Quito,Pichincha,D,13
1,2,Quito,Pichincha,D,13


In [6]:
test.head(2)

Unnamed: 0,id,date,store_nbr,family,onpromotion
0,3000888,2017-08-16,1,AUTOMOTIVE,0
1,3000889,2017-08-16,1,BABY CARE,0


In [3]:
train.head(2)

Unnamed: 0,id,date,store_nbr,family,sales,onpromotion
0,0,2013-01-01,1,AUTOMOTIVE,0.0,0
1,1,2013-01-01,1,BABY CARE,0.0,0


In [4]:
sum(train['sales'])

1073644952.2030932

In [8]:
transactions.head(2)

Unnamed: 0,date,store_nbr,transactions
0,2013-01-01,25,770
1,2013-01-02,1,2111


## 02 Data Transformation

In [31]:
products = train[['family']]
products.drop_duplicates(keep='first', inplace=True)
products.reset_index()
products['prod_id'] = products.index.astype(str)

def product_key(value):
    key = 'prod_' + value
    return key

products['prod_id'] = products[['prod_id']].apply(lambda x: [product_key(i) for i in x])
products = products[['prod_id','family']]
products.head()

Unnamed: 0,prod_id,family
0,prod_0,AUTOMOTIVE
1,prod_1,BABY CARE
2,prod_2,BEAUTY
3,prod_3,BEVERAGES
4,prod_4,BOOKS


In [50]:
oil = oil.dropna()
oil.head(2)

Unnamed: 0,date,dcoilwtico
1,2013-01-02,93.14
2,2013-01-03,92.97


In [52]:
#Add oil primary key
oil.drop_duplicates(keep='first', inplace=True)
oil.reset_index()
oil['oil_id'] = oil.index.astype(str)

def oil_key(value):
    key = 'oil_' + value
    return key

oil['oil_id'] = oil[['oil_id']].apply(lambda x: [oil_key(i) for i in x])
oil = oil[['oil_id','date','dcoilwtico']]
oil.head()

Unnamed: 0,oil_id,date,dcoilwtico
1,oil_1,2013-01-02,93.14
2,oil_2,2013-01-03,92.97
3,oil_3,2013-01-04,93.12
4,oil_4,2013-01-07,93.2
5,oil_5,2013-01-08,93.21


In [77]:
oil.drop_duplicates(keep='first', inplace=True)

In [56]:
holidays.head()

Unnamed: 0,date,type,locale,locale_name,description,transferred
0,2012-03-02,Holiday,Local,Manta,Fundacion de Manta,False
1,2012-04-01,Holiday,Regional,Cotopaxi,Provincializacion de Cotopaxi,False
2,2012-04-12,Holiday,Local,Cuenca,Fundacion de Cuenca,False
3,2012-04-14,Holiday,Local,Libertad,Cantonizacion de Libertad,False
4,2012-04-21,Holiday,Local,Riobamba,Cantonizacion de Riobamba,False


In [57]:
#Add oil primary key
holidays.drop_duplicates(keep='first', inplace=True)
holidays.reset_index()
holidays['holidays_id'] = holidays.index.astype(str)

def holidays_key(value):
    key = 'holidays_' + value
    return key

holidays['holidays_id'] = holidays[['holidays_id']].apply(lambda x: [holidays_key(i) for i in x])
holidays = holidays[['holidays_id','date','type','locale','locale_name','description','transferred']]
holidays.head()

Unnamed: 0,holidays_id,date,type,locale,locale_name,description,transferred
0,holidays_0,2012-03-02,Holiday,Local,Manta,Fundacion de Manta,False
1,holidays_1,2012-04-01,Holiday,Regional,Cotopaxi,Provincializacion de Cotopaxi,False
2,holidays_2,2012-04-12,Holiday,Local,Cuenca,Fundacion de Cuenca,False
3,holidays_3,2012-04-14,Holiday,Local,Libertad,Cantonizacion de Libertad,False
4,holidays_4,2012-04-21,Holiday,Local,Riobamba,Cantonizacion de Riobamba,False


In [35]:
sales = train.copy()
sales.head(2)

Unnamed: 0,id,date,store_nbr,family,sales,onpromotion
0,0,2013-01-01,1,AUTOMOTIVE,0.0,0
1,1,2013-01-01,1,BABY CARE,0.0,0


In [33]:
def add_product_key(value):
    key = products.loc[products['family'] == value, 'prod_id'].iloc[0]
    return key

'prod_28'

In [36]:
sales['prod_id'] = sales[['family']].apply(lambda x: [add_product_key(i) for i in x])

In [40]:
sales.head()

Unnamed: 0,id,date,store_nbr,family,sales,onpromotion,prod_id
0,0,2013-01-01,1,AUTOMOTIVE,0.0,0,prod_0
1,1,2013-01-01,1,BABY CARE,0.0,0,prod_1
2,2,2013-01-01,1,BEAUTY,0.0,0,prod_2
3,3,2013-01-01,1,BEVERAGES,0.0,0,prod_3
4,4,2013-01-01,1,BOOKS,0.0,0,prod_4


In [66]:
sales = sales.drop('family', 1)

  sales = sales.drop('family', 1)


## 03 Data Loading

### Connect to SQL SERVER Data Warehouse

In [26]:
import pyodbc 

conn = pyodbc.connect('Driver={ODBC Driver 17 for SQL Server};'
                      'Server=DESKTOP-HMHN4IF;'
                      'Database=Stores_db;'
                      'Trusted_Connection=yes;')

cursor = conn.cursor()

### Create Dimension Tables and Add Records

In [46]:
# Create products dimension table
tblProducts = """CREATE TABLE products ( 
prod_id VARCHAR(20) PRIMARY KEY, 
family VARCHAR(50));"""
  
# execute the statement
cursor.execute(tblProducts)

<pyodbc.Cursor at 0x1934e289a30>

In [80]:
# Insert DataFrame records.
for i,row in products.iterrows():
    sql = "INSERT INTO products VALUES (?,?)"
    cursor.execute(sql, row.prod_id, row.family)

    # commit to save our changes
    conn.commit()

In [55]:
# Create oil dimension table
tblOil = """CREATE TABLE oil ( 
oil_id VARCHAR(20) PRIMARY KEY, 
date DATE,
dcoilwtico float(2));"""
  
# execute the statement
cursor.execute(tblOil)

<pyodbc.Cursor at 0x1934e289a30>

In [92]:
# Insert DataFrame records.
for i,row in oil.iterrows():
    sql = "INSERT INTO oil VALUES (?,?,?)"
    cursor.execute(sql, row.oil_id, row.date, row.dcoilwtico)

    # commit to save our changes
    conn.commit()

In [60]:
# Create holidays dimension table
tblHolidays = """CREATE TABLE holidays ( 
holidays_id VARCHAR(20) PRIMARY KEY,
date DATE,
type VARCHAR(30),
locale VARCHAR(30),
locale_name VARCHAR(50),
description VARCHAR(70),
transferred VARCHAR(6));"""
  
# execute the statement
cursor.execute(tblHolidays)

<pyodbc.Cursor at 0x1934e289a30>

In [91]:
# Insert DataFrame records.
for i,row in holidays.iterrows():
    sql = "INSERT INTO holidays VALUES (?,?,?,?,?,?,?)"
    cursor.execute(sql, row.holidays_id, row.date, row.type, row.locale, row.locale_name, row.description, row.transferred)

    # commit to save our changes
    conn.commit()

In [48]:
# Create stores dimension table
tblStores = """CREATE TABLE stores ( 
store_nbr VARCHAR(20) PRIMARY KEY, 
city VARCHAR(50),
state VARCHAR(50),
type CHAR(1),
cluster CHAR(4));"""
  
# execute the statement
cursor.execute(tblStores)

<pyodbc.Cursor at 0x1934e289a30>

In [94]:
# Insert DataFrame records.
for i,row in stores.iterrows():
    sql = "INSERT INTO stores VALUES (?,?,?,?,?)"
    cursor.execute(sql, row.store_nbr, row.city, row.state, row.type, row.cluster)

    # commit to save our changes
    conn.commit()

In [68]:
# Create sales fact table
tblSales = """CREATE TABLE fact_sales ( 
id VARCHAR(20) PRIMARY KEY,
date DATE,
sales float(2),
onpromotion INTEGER,
store_nbr VARCHAR(20) FOREIGN KEY REFERENCES stores(store_nbr),
prod_id VARCHAR(20) FOREIGN KEY REFERENCES products(prod_id));"""
  
# execute the statement
cursor.execute(tblSales)

<pyodbc.Cursor at 0x1934e289a30>

In [97]:
# Insert DataFrame records.
for i,row in sales.iterrows():
    sql = "INSERT INTO fact_sales VALUES (?,?,?,?,?,?)"
    cursor.execute(sql, row.id, row.date, row.sales, row.onpromotion, row.store_nbr, row.prod_id)

    # commit to save our changes
    conn.commit()

In [21]:
#Add oil primary key
predictions.drop_duplicates(keep='first', inplace=True)
predictions.reset_index()
predictions['predictions_id'] = predictions.index.astype(str)

def predictions_key(value):
    key = 'pred_' + value
    return key

predictions['predictions_id'] = predictions[['predictions_id']].apply(lambda x: [predictions_key(i) for i in x])
predictions = predictions[['predictions_id','id','sales']]
predictions.head(2)

Unnamed: 0,predictions_id,id,sales
0,pred_0,3000888,3.06543
1,pred_1,3000889,0.0


In [27]:
# Create sales fact table
tblPredictions = """CREATE TABLE SalesPredictions ( 
predictions_id VARCHAR(20) PRIMARY KEY,
id INTEGER,
sales float(5));"""
  
# execute the statement
cursor.execute(tblPredictions)

<pyodbc.Cursor at 0x27a49723530>

In [28]:
# Insert DataFrame records.
for i,row in predictions.iterrows():
    sql = "INSERT INTO SalesPredictions VALUES (?,?,?)"
    cursor.execute(sql, row.predictions_id, row.id, row.sales)

    # commit to save our changes
    conn.commit()

In [29]:
# close the connection
conn.close()