Prerequisites:

In [13]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

In [14]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "10@cornPlace"

src_dbname = "northwind"
dst_dbname = "northwind_dw"

In [15]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [16]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

ModuleNotFoundError: No module named 'pymysql'

1.1. Extract Data from the Source Database Tables

In [2]:
sql_customers = "SELECT * FROM northwind.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

ModuleNotFoundError: No module named 'pymysql'

In [None]:
sql_employees = "SELECT * FROM northwind.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employees)
df_employees.head(2)

In [None]:
sql_products = "SELECT * FROM northwind.products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

In [None]:
sql_shippers = "SELECT * FROM northwind.shippers;"
df_shippers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_shippers)
df_shippers.head(2)

1.2. Perform Any Necessary Transformations

In [3]:
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"id":"customer_key"}, inplace=True)

df_customers.head(2)

NameError: name 'df_customers' is not defined

In [None]:
drop_cols = ['mobile_phone','notes','attachments']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"id":"employee_key"}, inplace=True)

df_employees.head(2)

In [None]:
drop_cols = ['supplier_ids','description','attachments']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products.rename(columns={"id":"product_key"}, inplace=True)

df_products.head(2)

In [None]:
drop_cols = ['last_name','first_name','email_address','job_title','business_phone',
             'home_phone','mobile_phone','fax_number','web_page','notes','attachments']
df_shippers.drop(drop_cols, axis=1, inplace=True)
df_shippers.rename(columns={"id":"shipper_key"}, inplace=True)

df_shippers.head(2)

1.4. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [None]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]

In [None]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

2.1. First, you could simply use the SQL SELECT statement you authored in Lab 2

In [None]:
sql_fact_orders = """
    SELECT `orders`.`id`,
    `orders`.`employee_id`,
    `orders`.`customer_id`,
    `order_details`.`product_id`,
    `orders`.`shipper_id`,
    `orders`.`ship_name`,
    `orders`.`ship_address`,
    `orders`.`ship_city`,
    `orders`.`ship_state_province`,
    `orders`.`ship_zip_postal_code`,
    `orders`.`ship_country_region`,
    `order_details`.`quantity`,
    `orders`.`order_date`,
    `orders`.`shipped_date`,
    `order_details`.`unit_price`,
    `order_details`.`discount`,
    `orders`.`shipping_fee`,
    `orders`.`taxes`,
    `orders`.`payment_type`,
    `orders`.`paid_date`,
    `orders`.`tax_rate`,
    `orders_status`.`status_name`,
    `order_details_status`.`status_name`
FROM `northwind`.`orders` LEFT OUTER JOIN `northwind`.`orders_status`
    ON `orders`.`status_id` = `orders_status`.`id`,
    `northwind`.`order_details` LEFT OUTER JOIN `northwind`.`order_details_status`
    ON `order_details`.`status_id` = `order_details_status`.`id`;
    """

df_fact_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(2)

2.2.1. Get all the data from each of the four tables involved

In [None]:
sql_orders = "SELECT * FROM northwind.orders;"
df_orders = get_dataframe(userid, pwd, hostname, src dbname, sql orders)
df_orders.rename(columns={'id':'order_id'}, inplace=True)
df_orders.head(2)

2.2.2. Get the order_status column.

In [None]:
df_orders = pd.merge(df_orders, df_orders_status, ON="status_id", HOW="inner")
df_orders.rename(columns={"status_name":"order_status"}, inplace=True)

2.2.3. Get the order_details_status

In [None]:
df_order_details = pd.merge(df_order_detals, df_order_details_status, on='status_id', how='inner')
df_order_details.rename(columns={"status_name":"order_details_status"}, inplace=True)
df_order_details.drop(['status_id'], axis=1, inplace=True)
df_order_details.head(2)

2.2.4. Join the Orders and OrderDetails DataFrames

In [None]:
df_fact_orders = pd.merge(df_orders, df_order_details, on='order_id', how="right")
df_fact_orders.head(2)

2.2.5. Perform any Additional Transformations

In [None]:
Drop columns of no particular interest, (done)
drop_columns = ['id', 'notes','tax_status_id','purchase_order_id','inventory_id','data_allocated']
df_fact_orders.drop(drop_columns, axis=1, inplace=True)

reorder the columns,
ordered_columns = ['order_id','employee_id','customer_id','product_id','shipper_id','ship_name',
                   'ship_address','ship_city','ship_state_province','ship_zip_postal_code','ship_country_region','quantity',
                   'order_date','shipped_date','unit_price','discount','shipping_fee','taxes','tax_rate',
                   'payment_type','paid_date','order_status','order_details_status']

df_fact_orders.insert(0, "order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

2.2.6. Write the DataFrame Back to the Database

In [None]:
table_name ="fact_orders"
primary_key = "order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_nme, primary_key, db_operation)

3.0. Demonstrate that the New Data Warehouse Exists and Contains the Correct Data

In [None]:
sql_test = """
    SELECT customers.'last_name' AS 'customer_name',
        SUM(orders.'quantity') AS 'total_quantity',
        SUM(orders.'unit_price') S 'total_unit_price',
    FROM '{0}'.'fact_orders' AS orders
    INNER JOIN '{0}'.'dim_customers' AS customers
    ON orders.customer_id = customers.customer_key
    GROUP BY customers.'last_name'
    ORDER BY total_unit_price DESC;
"""

def_test.head()