## Using Python to Perform Extract-Transform-Load (ETL Processing)
Modern Data Warehousing and Analytics solutions frequently use languages like Python or Scala to extract data from numerous sources, including relational database management systems, NoSQL database systems, real-time streaming endpoints and Data Lakes.  These languages can then be used to perform many types of transformation before then loading the data into a variety of destinations including file systems and data warehouses. This data can then be consumed by data scientists or business analysts.

### First, Import the Necessary Libraries

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

### Then. Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [1]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"

user_id = "root"
pwd = "Antvenom21!"
db_name = "northwind"
src_dbname = "northwind"
dst_dbname = "northwind_dw2"
f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"

'mysql+pymysql://root:Antvenom21!@localhost/northwind'

### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


    def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
        conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"

        sqlEngine = create_engine(conn_str, pool_recycle=3600)
        connection = sqlEngine.connect()

        if db_operation == "insert":
            df.to_sql(table_name, con=connection, index=False, if_exists='replace')
            sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")

        elif db_operation == "update":
            df.to_sql(table_name, con=connection, index=False, if_exists='append')

        connection.close()

### 1.0. Populate the Dimension Tables
#### 1.1. Extract Data from the Source Database Tables

In [4]:
sql_customers = "SELECT * FROM northwind.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Company A,Bedecs,Anna,,Owner,(123)555-0100,,,(123)555-0101,123 1st Street,Seattle,WA,99999,USA,,,b''
1,2,Company B,Gratacos Solsona,Antonio,,Owner,(123)555-0100,,,(123)555-0101,123 2nd Street,Boston,MA,99999,USA,,,b''


In [5]:
sql_employees = "SELECT * FROM northwind.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employees)
df_employees.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#,,b''
1,2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtr...,"Joined the company as a sales representative, ...",b''


In [6]:
sql_products = "SELECT * FROM northwind.products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,supplier_ids,id,product_code,product_name,description,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category,attachments
0,4,1,NWTB-1,Northwind Traders Chai,,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages,b''
1,10,3,NWTCO-3,Northwind Traders Syrup,,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments,b''


In [7]:
sql_shippers = "SELECT * FROM northwind.shippers;"
df_shippers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_shippers)
df_shippers.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Shipping Company A,,,,,,,,,123 Any Street,Memphis,TN,99999,USA,,,b''
1,2,Shipping Company B,,,,,,,,,123 Any Street,Memphis,TN,99999,USA,,,b''


#### 1.2. Perform Any Necessary Transformations
Pandas DataFrames enable extensive data modification capabilities. Here we will start by simply dropping features (columns) that we don't believe provide any real value to our analytics solution. Examples include columns having a high percentage of NULL values, columns having large amounts of free-text, and columns having binary large object (BLOB) data such as images or other documents. Then, we will rename the primary key column (id) to conform with data warehouse design standards.

In [8]:
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
0,1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
1,2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA


In [9]:
drop_cols = ['mobile_phone','notes','attachments']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"id":"employee_key"}, inplace=True)

df_employees.head(2)

Unnamed: 0,employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
0,1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
1,2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtr...


In [10]:
drop_cols = ['supplier_ids','description','attachments']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products.rename(columns={"id":"product_key"}, inplace=True)

df_products.head(2)

Unnamed: 0,product_key,product_code,product_name,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category
0,1,NWTB-1,Northwind Traders Chai,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages
1,3,NWTCO-3,Northwind Traders Syrup,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments


In [11]:
drop_cols = ['last_name','first_name','email_address','job_title','business_phone',
             'home_phone','mobile_phone','fax_number','web_page','notes','attachments']
df_shippers.drop(drop_cols, axis=1, inplace=True)
df_shippers.rename(columns={"id":"shipper_key"}, inplace=True)

df_shippers.head(2)

Unnamed: 0,shipper_key,company,address,city,state_province,zip_postal_code,country_region
0,1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
1,2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA


#### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [16]:
#exec_sql = f"CREATE DATABASE `{dst_dbname}`;"

conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
#sqlEngine.execute(exec_sql) #create db
sqlEngine.execute(f"USE {dst_dbname};") # select new db

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x21b921b7730>

In [17]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]

In [18]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Populate the Fact Table

In [113]:
query = "SELECT * FROM northwind.orders"

fact_table = get_dataframe(user_id, pwd, host_name, src_dbname, query)
fact_table.rename(columns={"id":"order_id"}, inplace=True)
fact_table.drop(['notes','tax_status_id'], axis = 1, inplace = True)

fact_table.head()

Unnamed: 0,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,ship_country_region,shipping_fee,taxes,payment_type,paid_date,tax_rate,status_id
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,USA,200.0,0.0,Check,2006-01-15,0.0,3
1,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,99999,USA,5.0,0.0,Credit Card,2006-01-20,0.0,3
2,32,4,12,2006-01-22,2006-01-22,2.0,John Edwards,123 12th Street,Las Vegas,NV,99999,USA,5.0,0.0,Credit Card,2006-01-22,0.0,3
3,33,6,8,2006-01-30,2006-01-31,3.0,Elizabeth Andersen,123 8th Street,Portland,OR,99999,USA,50.0,0.0,Credit Card,2006-01-30,0.0,3
4,34,9,4,2006-02-06,2006-02-07,3.0,Christina Lee,123 4th Street,New York,NY,99999,USA,4.0,0.0,Check,2006-02-06,0.0,3


In [114]:
query = "SELECT * FROM northwind.order_details"

df_order_details = get_dataframe(user_id, pwd, host_name, src_dbname, query)
df_order_details.rename(columns={"status_id":"details_status_id"}, inplace=True)

df_order_details.drop(['id', 'date_allocated','purchase_order_id','inventory_id'], axis = 1, inplace = True)

fact_table = pd.merge(fact_table, df_order_details, on='order_id',how = 'right')
fact_table.head()

Unnamed: 0,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,...,taxes,payment_type,paid_date,tax_rate,status_id,product_id,quantity,unit_price,discount,details_status_id
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,0.0,Check,2006-01-15,0.0,3,34,100.0,14.0,0.0,2
1,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,0.0,Check,2006-01-15,0.0,3,80,30.0,3.5,0.0,2
2,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,0.0,Credit Card,2006-01-20,0.0,3,7,10.0,30.0,0.0,2
3,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,0.0,Credit Card,2006-01-20,0.0,3,51,10.0,53.0,0.0,2
4,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,0.0,Credit Card,2006-01-20,0.0,3,80,10.0,3.5,0.0,2


In [115]:
query = "SELECT * FROM northwind.orders_status"

df_order_status = get_dataframe(user_id, pwd, host_name, src_dbname, query)
df_order_status.rename(columns={"id":"status_id","status_name":"order_details"}, inplace=True)

fact_table = pd.merge(fact_table, df_order_status, on='status_id', how = 'inner')
fact_table.drop(['status_id'], axis = 1, inplace = True)
fact_table.head()

Unnamed: 0,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,...,taxes,payment_type,paid_date,tax_rate,product_id,quantity,unit_price,discount,details_status_id,order_details
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,0.0,Check,2006-01-15,0.0,34,100.0,14.0,0.0,2,Closed
1,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,0.0,Check,2006-01-15,0.0,80,30.0,3.5,0.0,2,Closed
2,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,0.0,Credit Card,2006-01-20,0.0,7,10.0,30.0,0.0,2,Closed
3,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,0.0,Credit Card,2006-01-20,0.0,51,10.0,53.0,0.0,2,Closed
4,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,...,0.0,Credit Card,2006-01-20,0.0,80,10.0,3.5,0.0,2,Closed


In [116]:
query = "SELECT * FROM northwind.order_details_status"

df_order_status_details = get_dataframe(user_id, pwd, host_name, src_dbname, query)
df_order_status_details.rename(columns={"id":"details_status_id","status_name":"order_details_status"}, inplace=True)

fact_table = pd.merge(fact_table, df_order_status_details, on='details_status_id', how = 'inner')
fact_table.drop(['details_status_id'], axis = 1, inplace = True)
fact_table.shape

(58, 23)

In [117]:
for col in fact_table.columns:
    print(col)

order_id
employee_id
customer_id
order_date
shipped_date
shipper_id
ship_name
ship_address
ship_city
ship_state_province
ship_zip_postal_code
ship_country_region
shipping_fee
taxes
payment_type
paid_date
tax_rate
product_id
quantity
unit_price
discount
order_details
order_details_status


In [118]:
list_key =  list(map(lambda x:x+1, range(fact_table.shape[0])))
fact_table.insert(0, "orders_key", list_key)
fact_table.head()


Unnamed: 0,orders_key,order_id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,...,taxes,payment_type,paid_date,tax_rate,product_id,quantity,unit_price,discount,order_details,order_details_status
0,1,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,...,0.0,Check,2006-01-15,0.0,34,100.0,14.0,0.0,Closed,Invoiced
1,2,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,...,0.0,Check,2006-01-15,0.0,80,30.0,3.5,0.0,Closed,Invoiced
2,3,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,...,0.0,Credit Card,2006-01-20,0.0,7,10.0,30.0,0.0,Closed,Invoiced
3,4,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,...,0.0,Credit Card,2006-01-20,0.0,51,10.0,53.0,0.0,Closed,Invoiced
4,5,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,...,0.0,Credit Card,2006-01-20,0.0,80,10.0,3.5,0.0,Closed,Invoiced


In [119]:
table = "fact_orders"
primary_key = 'orders_key'
op = 'insert'
set_dataframe(user_id, pwd, host_name, dst_dbname, fact_table, table, primary_key, op)