## Midterm Project

### Prerequisites
#### Import necessary libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd
import urllib.parse

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

Declare and assign connection variables.

In [2]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "classicmodels_dw"
}

mongodb_args = {
    "user_name" : "",
    "password" : "password",
    "cluster_name" : "cluster_name",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local",
    "db_name" : "classicmodels"
}

Define Functions

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Create new classicmodels data warehouse in SQL

In [4]:
dst_dbname = "classicmodels_dw"

conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x18783910550>

#### Populate MongoDB with classicmodels.customers

In [5]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'Scripts')

json_files = {"customers" : 'classicmodelscustomers.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)  

#### Create and populate the new dimension tables. First, extract data from the source MongoDB collections into DataFrames.

In [6]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


Load data from CSV file containing employee info from classicmodels DB

In [7]:
data_dir = os.path.join(os.getcwd(), 'Scripts')
data_file = os.path.join(data_dir, 'classicmodelsemployees.csv')

df_employees = pd.read_csv(data_file, header=0)
df_employees.head(2)

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


Load data from SQL DB classicodels pertaining to orders, order_details, payments, and products

In [8]:
sql_payments = "SELECT * FROM classicmodels.payments;"
df_payments = get_sql_dataframe(sql_payments, **mysql_args)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
0,103,HQ336336,2004-10-19,6066.78
1,103,JM555205,2003-06-05,14571.44


In [9]:
sql_products = "SELECT * FROM classicmodels.products;"
df_products = get_sql_dataframe(sql_products, **mysql_args)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


##### Get the Data from the Date Dimension Table.

In [11]:
sql_dim_date = "SELECT date_key, full_date FROM classicmodels_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


Lookup the Surrogate Primary Key (date_key) that Corresponds to the paymentDate Column in the payments table

In [12]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "paymentDate"})
df_payments.paymentDate = df_payments.paymentDate.astype('datetime64[ns]').dt.date
df_payments = pd.merge(df_payments, df_dim_payment_date, on='paymentDate', how='left')
df_payments.drop(['paymentDate'], axis=1, inplace=True)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,amount,payment_date_key
0,103,HQ336336,6066.78,20041019
1,103,JM555205,14571.44,20030605


#### Perform Any Necessary Transformations to the DataFrames
Add ever-incrementing number to be used as the primary key.

In [13]:
df_employees.drop(['officeCode', 'extension'], axis=1, inplace=True)
df_employees.insert(0, "employee_key", range(1, df_employees.shape[0]+1))
df_employees.head(2)

Unnamed: 0,employee_key,employeeNumber,lastName,firstName,email,reportsTo,jobTitle
0,1,1002,Murphy,Diane,dmurphy@classicmodelcars.com,,President
1,2,1056,Patterson,Mary,mpatterso@classicmodelcars.com,1002.0,VP Sales


In [15]:
df_customers.drop(['addressLine2'], axis=1, inplace=True)
df_customers.rename(columns={"addressLine1":"addressLine"}, inplace=True)
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))
df_customers.head(2)

Unnamed: 0,customer_key,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA,1166.0,71800.0


In [16]:
df_payments.insert(0, "payment_key", range(1, df_payments.shape[0]+1))
df_payments.head(2)

Unnamed: 0,payment_key,customerNumber,checkNumber,amount,payment_date_key
0,1,103,HQ336336,6066.78,20041019
1,2,103,JM555205,14571.44,20030605


In [17]:
df_products.drop(['productScale'], axis=1, inplace=True)
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))
df_products.head(2)

Unnamed: 0,product_key,productCode,productName,productLine,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [18]:
dataframe = df_employees
table_name = 'dim_employees'
primary_key = 'employee_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [19]:
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [20]:
dataframe = df_payments
table_name = 'dim_payments'
primary_key = 'payment_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [21]:
dataframe = df_products
table_name = 'dim_products'
primary_key = 'product_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

Validate that the New Dimension Tables were Created.

In [22]:
sql_employees = "SELECT * FROM classicmodels_dw.dim_employees;"
df_employees = get_sql_dataframe(sql_employees, **mysql_args)
df_employees.head(2)

Unnamed: 0,employee_key,employeeNumber,lastName,firstName,email,reportsTo,jobTitle
0,1,1002,Murphy,Diane,dmurphy@classicmodelcars.com,,President
1,2,1056,Patterson,Mary,mpatterso@classicmodelcars.com,1002.0,VP Sales


In [23]:
sql_customers = "SELECT * FROM classicmodels_dw.dim_customers;"
df_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_customers.head(2)

Unnamed: 0,customer_key,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA,1166.0,71800.0


In [24]:
sql_payments = "SELECT * FROM classicmodels_dw.dim_payments;"
df_payments = get_sql_dataframe(sql_payments, **mysql_args)
df_payments.head(2)

Unnamed: 0,payment_key,customerNumber,checkNumber,amount,payment_date_key
0,1,103,HQ336336,6066.78,20041019
1,2,103,JM555205,14571.44,20030605


In [25]:
sql_products = "SELECT * FROM classicmodels_dw.dim_products;"
df_products = get_sql_dataframe(sql_products, **mysql_args)
df_products.head(2)

Unnamed: 0,product_key,productCode,productName,productLine,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


## Create Fact Order Table
#### Get all the data from each of the two tables involved, orders and orderdetails

In [26]:
sql_orders = "SELECT * FROM classicmodels.orders;"
df_orders = get_sql_dataframe(sql_orders, **mysql_args)
df_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


In [27]:
sql_order_details = "SELECT * FROM classicmodels.orderdetails;"
df_order_details = get_sql_dataframe(sql_order_details, **mysql_args)
df_order_details.head(2)

Unnamed: 0,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,S18_1749,30,136.0,3
1,10100,S18_2248,50,55.09,2


#### Join the Orders and OrderDetails DataFrames to create orders Fact Table

In [28]:
df_fact_orders = pd.merge(df_orders, df_order_details, on='orderNumber', how='right')
df_fact_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,S18_1749,30,136.0,3
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,S18_2248,50,55.09,2


#### Lookup the Primary Keys from the Dimension Tables in order to establish foreign key relationships between the fact table and the dimension tables

In [29]:
sql_dim_customers = "SELECT customer_key, customerNumber FROM classicmodels_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customerNumber
0,1,103
1,2,112


In [30]:
sql_dim_products = "SELECT product_key, productCode FROM classicmodels_dw.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,productCode
0,1,S10_1678
1,2,S10_1949


##### Using the Business Keys, lookup the corresponding Surrogate Primary Key values in the Dimension tables

In [31]:
# Merge the "fact_orders" and "dim_costumers" dataframes on the 'customerNumber' to
# get the 'customer_key'. Then drop the 'customerNumber' column and display the results.
df_fact_orders = pd.merge(df_fact_orders, df_dim_customers, on='customerNumber', how='inner')
df_fact_orders.drop(['customerNumber'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,productCode,quantityOrdered,priceEach,orderLineNumber,customer_key
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,S18_1749,30,136.0,3,86
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,S18_2248,50,55.09,2,86


In [32]:
# Merge the "fact_orders" and "dim_products" dataframes on the 'productCode' to
# get the 'product_key'. Then drop the 'productCode' column and display the results.
df_fact_orders = pd.merge(df_fact_orders, df_dim_products, on='productCode', how='inner')
df_fact_orders.drop(['productCode'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,quantityOrdered,priceEach,orderLineNumber,customer_key,product_key
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,30,136.0,3,86,23
1,10379,2005-02-10,2005-02-18,2005-02-11,Shipped,,39,156.4,2,11,23


##### Lookup the DateKeys from the Date Dimension Table.

In [33]:
sql_dim_date = "SELECT date_key, full_date FROM classicmodels_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [34]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "orderDate" Column.
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "orderDate"})
df_fact_orders.orderDate = df_fact_orders.orderDate.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='orderDate', how='left')
df_fact_orders.drop(['orderDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,orderNumber,requiredDate,shippedDate,status,comments,quantityOrdered,priceEach,orderLineNumber,customer_key,product_key,order_date_key
0,10100,2003-01-13,2003-01-10,Shipped,,30,136.0,3,86,23,20030106
1,10379,2005-02-18,2005-02-11,Shipped,,39,156.4,2,11,23,20050210


In [35]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the requiredDate" Column.
df_dim_required_date = df_dim_date.rename(columns={"date_key" : "required_date_key", "full_date" : "requiredDate"})
df_fact_orders.requiredDate = df_fact_orders.requiredDate.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_required_date, on='requiredDate', how='left')
df_fact_orders.drop(['requiredDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,orderNumber,shippedDate,status,comments,quantityOrdered,priceEach,orderLineNumber,customer_key,product_key,order_date_key,required_date_key
0,10100,2003-01-10,Shipped,,30,136.0,3,86,23,20030106,20030113
1,10379,2005-02-11,Shipped,,39,156.4,2,11,23,20050210,20050218


In [36]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the shippedDate" Column.
df_dim_shipped_date = df_dim_date.rename(columns={"date_key" : "shipped_date_key", "full_date" : "shippedDate"})
df_fact_orders.shippedDate = df_fact_orders.shippedDate.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_shipped_date, on='shippedDate', how='left')
df_fact_orders.drop(['shippedDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,orderNumber,status,comments,quantityOrdered,priceEach,orderLineNumber,customer_key,product_key,order_date_key,required_date_key,shipped_date_key
0,10100,Shipped,,30,136.0,3,86,23,20030106,20030113,20030110.0
1,10379,Shipped,,39,156.4,2,11,23,20050210,20050218,20050211.0


In [37]:
drop_columns = ['comments', 'orderLineNumber']
df_fact_orders.drop(drop_columns, axis=1, inplace=True)

ordered_columns = ['orderNumber', 'customer_key', 'product_key', 'order_date_key', 'required_date_key', 'shipped_date_key'
                      , 'quantityOrdered', 'priceEach', 'status']
df_fact_orders = df_fact_orders[ordered_columns]

df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,orderNumber,customer_key,product_key,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status
0,1,10100,86,23,20030106,20030113,20030110.0,30,136.0,Shipped
1,2,10379,11,23,20050210,20050218,20050211.0,39,156.4,Shipped


#### Write the DataFrame for Order Fact Table back to the database

In [38]:
dataframe = df_fact_orders
table_name = 'fact_orders'
primary_key = 'fact_order_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Validate that the Order Fact Table was created

In [39]:
sql_fact_orders = "SELECT * FROM classicmodels_dw.fact_orders;"
df_fact_orders = get_sql_dataframe(sql_fact_orders, **mysql_args)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,orderNumber,customer_key,product_key,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status
0,1,10100,86,23,20030106,20030113,20030110.0,30,136.0,Shipped
1,2,10379,11,23,20050210,20050218,20050211.0,39,156.4,Shipped


### Demonstrate Datawarehouse exists and contains the correct data 

SQL statement that shows each customer's last name and the total price associated with said customer.

In [40]:
sql_orders = """
    SELECT c.customerName AS 'Customer'
        , SUM(o.priceEach) AS 'Total Price'
    FROM classicmodels_dw.fact_orders AS o
    INNER JOIN classicmodels_dw.dim_customers AS c
    ON o.customer_key = c.customer_key
    GROUP BY c.customerName
    ORDER BY SUM(o.priceEach) DESC;
    
"""

In [41]:
df_fact_orders = get_sql_dataframe(sql_orders, **mysql_args)
df_fact_orders

Unnamed: 0,Customer,Total Price
0,Euro+ Shopping Channel,22680.00
1,Mini Gifts Distributors Ltd.,16746.58
2,"Australian Collectors, Co.",5159.19
3,Muscle Machine Inc,4800.74
4,Land of Toys Inc.,4575.97
...,...,...
93,Microscale Inc.,778.89
94,Frau da Collezione,737.99
95,Auto-Moto Classics Inc.,603.71
96,Atelier graphique,601.49


SQL statement that shows each Product Line and the total quantity ordered for each status category associated with said Product Line

In [42]:
sql_orders = """
    SELECT p.productLine AS 'Product Line'
        , o.status AS 'Status'
        , SUM(o.quantityOrdered) AS 'Total Quantity'
    FROM classicmodels_dw.fact_orders AS o
    INNER JOIN classicmodels_dw.dim_products AS p
    ON o.product_key = p.product_key
    GROUP BY p.productLine, o.status;
    
"""

In [43]:
df_fact_orders = get_sql_dataframe(sql_orders, **mysql_args)
df_fact_orders

Unnamed: 0,Product Line,Status,Total Quantity
0,Vintage Cars,Shipped,21015.0
1,Vintage Cars,In Process,532.0
2,Vintage Cars,Resolved,317.0
3,Vintage Cars,On Hold,450.0
4,Classic Cars,Shipped,33349.0
5,Classic Cars,Cancelled,659.0
6,Classic Cars,Disputed,174.0
7,Classic Cars,Resolved,294.0
8,Classic Cars,On Hold,542.0
9,Classic Cars,In Process,564.0
