Import Libraries

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

Declare and Assign Connection Variables

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "AccessSQL"

src_dbname = "northwind"
dst_dbname = "northwind_dw2"

Functions for Getting and Setting Data into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

Create Switch Connection Text and new Data Warehouse

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f888a2dcd00>

# Create and Populate Dim Tables

Extract Data from Source

In [5]:
sql_customers = "SELECT * FROM northwind.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Company A,Bedecs,Anna,,Owner,(123)555-0100,,,(123)555-0101,123 1st Street,Seattle,WA,99999,USA,,,b''
1,2,Company B,Gratacos Solsona,Antonio,,Owner,(123)555-0100,,,(123)555-0101,123 2nd Street,Boston,MA,99999,USA,,,b''


In [6]:
sql_employees = "SELECT * FROM northwind.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employees)
df_employees.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#,,b''
1,2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtr...,"Joined the company as a sales representative, ...",b''


In [7]:
sql_products = "SELECT * FROM northwind.products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,supplier_ids,id,product_code,product_name,description,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category,attachments
0,4,1,NWTB-1,Northwind Traders Chai,,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages,b''
1,10,3,NWTCO-3,Northwind Traders Syrup,,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments,b''


In [8]:
sql_shippers = "SELECT * FROM northwind.shippers;"
df_shippers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_shippers)
df_shippers.head(2)

Unnamed: 0,id,company,last_name,first_name,email_address,job_title,business_phone,home_phone,mobile_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page,notes,attachments
0,1,Shipping Company A,,,,,,,,,123 Any Street,Memphis,TN,99999,USA,,,b''
1,2,Shipping Company B,,,,,,,,,123 Any Street,Memphis,TN,99999,USA,,,b''


Drop Unnecessary Cols from Customers, Employees, Products, and Shippers

In [9]:
#Customers:
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,company,last_name,first_name,job_title,business_phone,fax_number,address,city,state_province,zip_postal_code,country_region
0,1,Company A,Bedecs,Anna,Owner,(123)555-0100,(123)555-0101,123 1st Street,Seattle,WA,99999,USA
1,2,Company B,Gratacos Solsona,Antonio,Owner,(123)555-0100,(123)555-0101,123 2nd Street,Boston,MA,99999,USA


In [10]:
#Employees:
drop_cols = ['mobile_phone','notes','attachments']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"id":"employee_key"}, inplace=True)

df_employees.head(2)

Unnamed: 0,employee_key,company,last_name,first_name,email_address,job_title,business_phone,home_phone,fax_number,address,city,state_province,zip_postal_code,country_region,web_page
0,1,Northwind Traders,Freehafer,Nancy,nancy@northwindtraders.com,Sales Representative,(123)555-0100,(123)555-0102,(123)555-0103,123 1st Avenue,Seattle,WA,99999,USA,#http://northwindtraders.com#
1,2,Northwind Traders,Cencini,Andrew,andrew@northwindtraders.com,"Vice President, Sales",(123)555-0100,(123)555-0102,(123)555-0103,123 2nd Avenue,Bellevue,WA,99999,USA,http://northwindtraders.com#http://northwindtr...


In [11]:
#Products:
drop_cols = ['supplier_ids','description','attachments']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products.rename(columns={"id":"product_key"}, inplace=True)

df_products.head(2)


Unnamed: 0,product_key,product_code,product_name,standard_cost,list_price,reorder_level,target_level,quantity_per_unit,discontinued,minimum_reorder_quantity,category
0,1,NWTB-1,Northwind Traders Chai,13.5,18.0,10,40,10 boxes x 20 bags,0,10.0,Beverages
1,3,NWTCO-3,Northwind Traders Syrup,7.5,10.0,25,100,12 - 550 ml bottles,0,25.0,Condiments


In [12]:
#Shippers:
drop_cols = ['last_name','first_name','email_address','job_title','business_phone',
             'home_phone','mobile_phone','fax_number','web_page','notes','attachments']
df_shippers.drop(drop_cols, axis=1, inplace=True)
df_shippers.rename(columns={"id":"shipper_key"}, inplace=True)

df_shippers.head(2)

Unnamed: 0,shipper_key,company,address,city,state_province,zip_postal_code,country_region
0,1,Shipping Company A,123 Any Street,Memphis,TN,99999,USA
1,2,Shipping Company B,123 Any Street,Memphis,TN,99999,USA


Create and Populate Dim Tables

In [13]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

# Create and Populate Fact Table

Method 1: Using SQL Query Statement

In [14]:
sql_fact_orders = """SELECT o.id AS fact_order_key,
    o.employee_id,
    o.customer_id,
    od.product_id,
    o.shipper_id,
    o.ship_name,
    o.ship_address,
    o.ship_city,
    o.ship_state_province,
    o.ship_zip_postal_code,
    o.ship_country_region,
    od.quantity,
    o.order_date,
    o.shipped_date,
    od.unit_price,
    od.discount,
    o.shipping_fee,
    o.taxes,
    o.payment_type,
    o.paid_date,
    o.tax_rate,
    os.status_name as order_status,
    ods.status_name as order_details_status
FROM northwind.orders AS o
INNER JOIN northwind.orders_status AS os
ON o.status_id = os.id
RIGHT OUTER JOIN northwind.order_details AS od
ON o.id = od.order_id
INNER JOIN northwind.order_details_status AS ods
ON o.status_id = ods.id;"""

df_fact_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,employee_id,customer_id,product_id,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,...,shipped_date,unit_price,discount,shipping_fee,taxes,payment_type,paid_date,tax_rate,order_status,order_details_status
0,30,9,27,34,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,...,2006-01-22,14.0,0.0,200.0,0.0,Check,2006-01-15,0.0,Closed,Shipped
1,30,9,27,80,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,...,2006-01-22,3.5,0.0,200.0,0.0,Check,2006-01-15,0.0,Closed,Shipped


Method 2: Using Pandas

Get Data from orders, orders_status, order_details and order_details_status

In [23]:
sql_o = "SELECT * FROM northwind.orders;"
df_o = get_dataframe(user_id, pwd, host_name, src_dbname, sql_o)
df_o.head(2)

Unnamed: 0,id,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,ship_country_region,shipping_fee,taxes,payment_type,paid_date,notes,tax_rate,tax_status_id,status_id
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,USA,200.0,0.0,Check,2006-01-15,,0.0,,3
1,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,99999,USA,5.0,0.0,Credit Card,2006-01-20,,0.0,,3


In [24]:
sql_os = "SELECT * FROM northwind.orders_status;"
df_os = get_dataframe(user_id, pwd, host_name, src_dbname, sql_os)
df_os.head(2)

Unnamed: 0,id,status_name
0,0,New
1,1,Invoiced


In [25]:
sql_od = "SELECT * FROM northwind.order_details;"
df_od = get_dataframe(user_id, pwd, host_name, src_dbname, sql_od)
df_od.head(2)

Unnamed: 0,id,order_id,product_id,quantity,unit_price,discount,status_id,date_allocated,purchase_order_id,inventory_id
0,27,30,34,100.0,14.0,0.0,2,,96.0,83.0
1,28,30,80,30.0,3.5,0.0,2,,,63.0


In [26]:
sql_ods = "SELECT * FROM northwind.order_details_status;"
df_ods = get_dataframe(user_id, pwd, host_name, src_dbname, sql_ods)
df_ods.head(3)

Unnamed: 0,id,status_name
0,0,
1,1,Allocated
2,2,Invoiced


Merge orders with orders_status

In [27]:
df_o = df_o.merge(df_os, how='inner', left_on='status_id', right_on='id')
df_o = df_o.drop(columns={"status_id", "id_y"})
df_o = df_o.rename(columns={"id_x":"order_key", "status_name":"order_status"})
df_o.head(2)

Unnamed: 0,order_key,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,ship_zip_postal_code,ship_country_region,shipping_fee,taxes,payment_type,paid_date,notes,tax_rate,tax_status_id,order_status
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,99999,USA,200.0,0.0,Check,2006-01-15,,0.0,,Closed
1,31,3,4,2006-01-20,2006-01-22,1.0,Christina Lee,123 4th Street,New York,NY,99999,USA,5.0,0.0,Credit Card,2006-01-20,,0.0,,Closed


Merge order_details with order_details_status

In [28]:
df_od = df_od.merge(df_ods, how='inner', left_on='status_id', right_on='id')
df_od = df_od.drop(columns={"status_id", "id_y"})
df_od = df_od.rename(columns={"id_x":"detail_key", "status_name":"detail_status"})
df_od.head(2)

Unnamed: 0,detail_key,order_id,product_id,quantity,unit_price,discount,date_allocated,purchase_order_id,inventory_id,detail_status
0,27,30,34,100.0,14.0,0.0,,96.0,83.0,Invoiced
1,28,30,80,30.0,3.5,0.0,,,63.0,Invoiced


Merge order and order_details

In [29]:
df_o = df_o.merge(df_od, how='outer', left_on="order_key", right_on="order_id")
df_o = df_o.drop(columns={"order_id"})
df_o.head(2)

Unnamed: 0,order_key,employee_id,customer_id,order_date,shipped_date,shipper_id,ship_name,ship_address,ship_city,ship_state_province,...,order_status,detail_key,product_id,quantity,unit_price,discount,date_allocated,purchase_order_id,inventory_id,detail_status
0,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,Closed,27.0,34.0,100.0,14.0,0.0,,96.0,83.0,Invoiced
1,30,9,27,2006-01-15,2006-01-22,2.0,Karen Toh,789 27th Street,Las Vegas,NV,...,Closed,28.0,80.0,30.0,3.5,0.0,,,63.0,Invoiced


Additional Transformations and Creating New Primary Key

In [30]:
#Any Transformations Here
#


#Create primary key called "fact_order_key"
x = df_o.count(0)
#print(x)
numRows = x.order_key
countArr = []
for i in range(1, numRows+1):
    countArr.append(i)

df_o = df_o.insert(0, "fact_order_key", countArr, allow_duplicates=False)


#df_o.head(5)

order_key               66
employee_id             66
customer_id             66
order_date              66
shipped_date            53
shipper_id              58
ship_name               66
ship_address            66
ship_city               66
ship_state_province     66
ship_zip_postal_code    66
ship_country_region     66
shipping_fee            66
taxes                   66
payment_type            50
paid_date               50
notes                    0
tax_rate                66
tax_status_id            0
order_status            66
detail_key              58
product_id              58
quantity                58
unit_price              58
discount                58
date_allocated           0
purchase_order_id       15
inventory_id            56
detail_status           58
dtype: int64


Write Fact Order Dataframe back into Database

In [33]:
db_operation = "insert"
table_name = "fact_order_table"
dataframe = df_o
primary_key = "fact_order_key"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

AttributeError: 'NoneType' object has no attribute 'to_sql'

# Check New Database

In [34]:
query = f"""SELECT customer.last_name AS customer_name,
	SUM(orders.quantity) AS total_quantity,
    SUM(orders.unit_price) AS total_unit_price
FROM {dst_dbname}.fact_orders AS orders
INNER JOIN {dst_dbname}.dim_customers AS customer
ON orders.customer_key = customer.customer_key
GROUP BY customer.last_name
ORDER BY total_unit_price DESC;"""

sqlEngine.execute(query)

ProgrammingError: (pymysql.err.ProgrammingError) (1146, "Table 'northwind_dw2.fact_orders' doesn't exist")
[SQL: SELECT customer.last_name AS customer_name,
	SUM(orders.quantity) AS total_quantity,
    SUM(orders.unit_price) AS total_unit_price
FROM northwind_dw2.fact_orders AS orders
INNER JOIN northwind_dw2.dim_customers AS customer
ON orders.customer_key = customer.customer_key
GROUP BY customer.last_name
ORDER BY total_unit_price DESC;]
(Background on this error at: https://sqlalche.me/e/14/f405)