## Using Python to Perform Extract-Transform-Load (ETL Processing)
Modern Data Warehousing and Analytics solutions frequently use languages like Python or Scala to extract data from numerous sources, including relational database management systems, NoSQL database systems, real-time streaming endpoints and Data Lakes.  These languages can then be used to perform many types of transformation before then loading the data into a variety of destinations including file systems and data warehouses. This data can then be consumed by data scientists or business analysts.

### First, Import the Necessary Libraries

In [89]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

### Then. Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [90]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"

user_id = "root"
pwd = "JCrivella1"
src_dbname = "northwind"
dst_dbname = "northwind_dw2"

### Define Functions for Getting Data From and Setting Data Into Databases

In [91]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

### 1.0. Populate the Dimension Tables
#### 1.1. Extract Data from the Source Database Tables

In [92]:
sql_customers = "SELECT * FROM northwind.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Company A,Bedecs,Anna,,Owner,(123)555-0100,,,(123)555-0101,123 1st Street,Seattle,WA,99999,USA,,,b''
1,2,Company B,Gratacos Solsona,Antonio,,Owner,(123)555-0100,,,(123)555-0101,123 2nd Street,Boston,MA,99999,USA,,,b''


In [93]:
sql_employees = "SELECT * FROM northwind.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employees)
df_employees.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#,,b''
1,2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtr...,"Joined the company as a sales representative, ...",b''


In [69]:
sql_products = "SELECT * FROM northwind.products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,supplier_ids,id,product_code,product_name,description,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category,attachments
0,4,1,NWTB-1,Northwind Traders Chai,,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages,b''
1,10,3,NWTCO-3,Northwind Traders Syrup,,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments,b''


In [94]:
sql_shippers = "SELECT * FROM northwind.shippers;"
df_shippers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_shippers)
df_shippers.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Shipping Company A,,,,,,,,,123 Any Street,Memphis,TN,99999,USA,,,b''
1,2,Shipping Company B,,,,,,,,,123 Any Street,Memphis,TN,99999,USA,,,b''


#### 1.2. Perform Any Necessary Transformations
Pandas DataFrames enable extensive data modification capabilities. Here we will start by simply dropping features (columns) that we don't believe provide any real value to our analytics solution. Examples include columns having a high percentage of NULL values, columns having large amounts of free-text, and columns having binary large object (BLOB) data such as images or other documents. Then, we will rename the primary key column (id) to conform with data warehouse design standards.

In [95]:
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
0,1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
1,2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA


In [96]:
drop_cols = ['mobile_phone','notes','attachments']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"id":"employee_key"}, inplace=True)

df_employees.head(2)

Unnamed: 0,employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
0,1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
1,2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtr...


In [98]:
drop_cols = ['supplier_ids','description','attachments']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products.rename(columns={"id":"product_key"}, inplace=True)

df_products.head(2)

KeyError: "['supplier_ids' 'description' 'attachments'] not found in axis"

In [99]:
drop_cols = ['last_name','first_name','email_address','job_title','business_phone',
             'home_phone','mobile_phone','fax_number','web_page','notes','attachments']
df_shippers.drop(drop_cols, axis=1, inplace=True)
df_shippers.rename(columns={"id":"shipper_key"}, inplace=True)

df_shippers.head(2)

Unnamed: 0,shipper_key,company,address,city,state_province,zip_postal_code,country_region
0,1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
1,2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA


#### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [100]:
exec_sql = f"CREATE DATABASE `{dst_dbname}`;"

conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
sqlEngine.execute(exec_sql) #create db
sqlEngine.execute(f"USE {dst_dbname};") # select new db

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f8eb0c9c0d0>

In [101]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]
#Table name, dataframe, what I want primary key to be 

In [102]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)
#Easiest way to do this, use SQL command on a loop 
#Added the data to the tables 

### Populate the Fact Table

In [103]:
#Two ways to do this: If you have done the first lab -- pass the SQL statement (insert into, select, from join statement)
#BUT, now we can just pass dataframe in python and push towards SQL
#OR, can use pandas .join methods from pandas documentation 
#Here's name, heres data frame crafted, heres what I want primary key -- and done! 

In [104]:
sql_orders = "SELECT * FROM northwind.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.rename(columns = {"id":"order_id"}, inplace=True)
df_orders.head()

Unnamed: 0,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,ship_country_region,shipping_fee,taxes,payment_type,paid_date,notes,tax_rate,tax_status_id,status_id
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,USA,200.0,0.0,Check,2006-01-15,,0.0,,3
1,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,99999,USA,5.0,0.0,Credit Card,2006-01-20,,0.0,,3
2,32,4,12,2006-01-22,2006-01-22,2.0,John Edwards,123 12th Street,Las Vegas,NV,99999,USA,5.0,0.0,Credit Card,2006-01-22,,0.0,,3
3,33,6,8,2006-01-30,2006-01-31,3.0,Elizabeth Andersen,123 8th Street,Portland,OR,99999,USA,50.0,0.0,Credit Card,2006-01-30,,0.0,,3
4,34,9,4,2006-02-06,2006-02-07,3.0,Christina Lee,123 4th Street,New York,NY,99999,USA,4.0,0.0,Check,2006-02-06,,0.0,,3


In [105]:
sql_orders_status = "SELECT * FROM northwind.orders_status;"
df_orders_status = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders_status)
df_orders_status.rename(columns = {"id":"status_id"}, inplace=True)
df_orders_status.head()

Unnamed: 0,status_id,status_name
0,0,New
1,1,Invoiced
2,2,Shipped
3,3,Closed


In [106]:
sql_order_details = "SELECT * FROM northwind.order_details;"
df_order_details = get_dataframe(user_id, pwd, host_name, src_dbname, sql_order_details)
df_order_details.head()

Unnamed: 0,id,order_id,product_id,quantity,unit_price,discount,status_id,date_allocated,purchase_order_id,inventory_id
0,27,30,34,100.0,14.0,0.0,2,,96.0,83.0
1,28,30,80,30.0,3.5,0.0,2,,,63.0
2,29,31,7,10.0,30.0,0.0,2,,,64.0
3,30,31,51,10.0,53.0,0.0,2,,,65.0
4,31,31,80,10.0,3.5,0.0,2,,,66.0


In [107]:
sql_order_details_status = "SELECT * FROM northwind.order_details_status;"
df_order_details_status = get_dataframe(user_id, pwd, host_name, src_dbname, sql_order_details_status)
df_order_details_status.rename(columns = {"id":"status_id"}, inplace=True)
df_order_details_status.head()

Unnamed: 0,status_id,status_name
0,0,
1,1,Allocated
2,2,Invoiced
3,3,Shipped
4,4,On Order


In [108]:
#First Join - Get the order_status column 
df_orders = pd.merge(df_orders, df_orders_status, on = 'status_id', how='inner')
df_orders.rename(columns={"status_name":"orders_status"}, inplace=True)
df_orders.drop(['status_id'], axis=1, inplace=True)
df_orders.head()

Unnamed: 0,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,ship_country_region,shipping_fee,taxes,payment_type,paid_date,notes,tax_rate,tax_status_id,orders_status
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,USA,200.0,0.0,Check,2006-01-15,,0.0,,Closed
1,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,99999,USA,5.0,0.0,Credit Card,2006-01-20,,0.0,,Closed
2,32,4,12,2006-01-22,2006-01-22,2.0,John Edwards,123 12th Street,Las Vegas,NV,99999,USA,5.0,0.0,Credit Card,2006-01-22,,0.0,,Closed
3,33,6,8,2006-01-30,2006-01-31,3.0,Elizabeth Andersen,123 8th Street,Portland,OR,99999,USA,50.0,0.0,Credit Card,2006-01-30,,0.0,,Closed
4,34,9,4,2006-02-06,2006-02-07,3.0,Christina Lee,123 4th Street,New York,NY,99999,USA,4.0,0.0,Check,2006-02-06,,0.0,,Closed


In [109]:
#Second Join - Get the order_details_status
df_order_details = pd.merge(df_order_details, df_order_details_status, on='status_id', how='inner')
df_order_details.rename(columns={"status_name":"order_details_status"}, inplace=True)
df_order_details.drop(['status_id'], axis=1, inplace=True)
df_order_details.head()

Unnamed: 0,id,order_id,product_id,quantity,unit_price,discount,date_allocated,purchase_order_id,inventory_id,order_details_status
0,27,30,34,100.0,14.0,0.0,,96.0,83.0,Invoiced
1,28,30,80,30.0,3.5,0.0,,,63.0,Invoiced
2,29,31,7,10.0,30.0,0.0,,,64.0,Invoiced
3,30,31,51,10.0,53.0,0.0,,,65.0,Invoiced
4,31,31,80,10.0,3.5,0.0,,,66.0,Invoiced


In [110]:
#Join orders and orderdetails data frame
df_fact_orders = pd.merge(df_orders, df_order_details, on='order_id', how='right')
df_fact_orders.head()

Unnamed: 0,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,...,orders_status,id,product_id,quantity,unit_price,discount,date_allocated,purchase_order_id,inventory_id,order_details_status
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,Closed,27,34,100.0,14.0,0.0,,96.0,83.0,Invoiced
1,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,Closed,28,80,30.0,3.5,0.0,,,63.0,Invoiced
2,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,Closed,29,7,10.0,30.0,0.0,,,64.0,Invoiced
3,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,Closed,30,51,10.0,53.0,0.0,,,65.0,Invoiced
4,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,Closed,31,80,10.0,3.5,0.0,,,66.0,Invoiced


In [111]:
#additional transformations
drop_columns = ['id','notes','purchase_order_id', 'inventory_id']
df_fact_orders.drop(drop_columns, axis=1, inplace=True)
df_fact_orders.insert(0, "order_key", range(0, df_fact_orders.shape[0]))
df_fact_orders.head()

Unnamed: 0,order_key,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,...,paid_date,tax_rate,tax_status_id,orders_status,product_id,quantity,unit_price,discount,date_allocated,order_details_status
0,0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,...,2006-01-15,0.0,,Closed,34,100.0,14.0,0.0,,Invoiced
1,1,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,...,2006-01-15,0.0,,Closed,80,30.0,3.5,0.0,,Invoiced
2,2,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,...,2006-01-20,0.0,,Closed,7,10.0,30.0,0.0,,Invoiced
3,3,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,...,2006-01-20,0.0,,Closed,51,10.0,53.0,0.0,,Invoiced
4,4,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,...,2006-01-20,0.0,,Closed,80,10.0,3.5,0.0,,Invoiced


In [112]:
#write the dataframe back to the database
table_name = "fact_orders"
primary_key = "order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

In [115]:
sql_test = """
    SELECT customers.`last_name` AS `customer_name`,
        SUM(orders.`quantity`) AS `total_quality`,
        SUM(orders.`unit_price`) AS `total_unit_price`
    FROM `Northwind_DW`.`Fact_Orders` AS orders
    INNER JOIN `Northwind_DW`.`dim_customers` AS customers
    ON orders.customer_id = customers.customer_key
    GROUP BY customers.`last_name`
    ORDER BY total_unit_price DESC;
"""

df_test= get_dataframe(user_id, pwd, host_name,src_dbname, sql_test)

In [116]:
df_test.head()

Unnamed: 0,customer_name,total_quality,total_unit_price
0,Lee,277.0,287.45
1,Pérez-Olaeta,427.0,162.5
2,Raghav,405.0,120.049999
3,Andersen,260.0,118.699999
4,Axen,253.0,100.64
