## DS 2002: Project 1 
### Glory Gurrola - gex7ys

#### Import necessary libraries and ensure the correct version of SQL Alchemy and PyMongo is running 

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.2


##### In order to access the classicmodels database, the file named mysqlsampledatabase.sql should be run to produce the database

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases

In [3]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"


host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "classicmodels"
dst_dbname = "classicmodels_dw"


atlas_cluster_name = "cluster_name.xxxxx"
atlas_user_name = ""
atlas_password = "password"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}


dst_dbname = "classicmodels_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://:password@cluster_name.xxxxx.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    
    
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

In [5]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')


# CHANGE THIS TO BE FOR PROJECT DATA
json_files = {"employees" : 'classicmodels_employees.json',
              "orders": 'classicmodels_orders.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

client.close()  

### Create & Populate the Dimension Tables
#### Extract Data from the Source Database Tables

In [6]:
# Extracting Data using a rlational database: MySQL
sql_customers = "SELECT * FROM classicmodels.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


In [7]:
# Extracting Data using a rlational database: MySQL
sql_products = "SELECT * FROM classicmodels.products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


In [8]:
# Extracting Data using a file system: reading in database using CSV
data_file = os.path.join(data_dir,'classicmodels_payments.csv')
df_payments = pd.read_csv(data_file, header=0,index_col=0)
df_payments.head()

Unnamed: 0_level_0,checkNumber,paymentDate,amount
customerNumber,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
103,HQ336336,2004-10-19,6066.78
103,JM555205,2003-06-05,14571.44
103,OM314933,2004-12-18,1676.14
112,BO864823,2004-12-17,14191.12
112,HQ55022,2003-06-06,32641.98


In [9]:
# Extracting Data using a NoSQL database: MongoDB
query = {} # Select all elements (columns), and all documents (rows).
collection = "employees"

df_employees = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_employees.head(2)

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


In [10]:
# Extracting Data using a NoSQL database: MongoDB
query = {} # Select all elements (columns), and all documents (rows).
collection = "orders"

df_fact_orders = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_fact_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


#### Get the Data from the Date Dimension Table

##### In order to populate the dim_date table, the file named Lab_02c_Create_Populate_Dim_Date.sql must be run before running the file below 

In [11]:
sql_dim_date = "SELECT date_key, full_date FROM classicmodels_dw.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### Transform Dataframes by dropping columns and inserting keys 

In [12]:
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['phone', 'addressLine2','state']
df_customers.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_customers.head(2)

Unnamed: 0,customer_key,customerNumber,customerName,contactLastName,contactFirstName,addressLine1,city,postalCode,country,salesRepEmployeeNumber,creditLimit
0,1,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,83030,USA,1166.0,71800.0


In [13]:
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['reportsTo']
df_employees.drop(drop_cols, axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_employees.insert(0, "employee_key", range(1, df_employees.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_employees.head(2)

Unnamed: 0,employee_key,employeeNumber,lastName,firstName,extension,email,officeCode,jobTitle
0,1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,President
1,2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,VP Sales


In [14]:
df_payments.insert(0, "payment_key", range(1, df_payments.shape[0]+1))
df_payments.head(2)

Unnamed: 0_level_0,payment_key,checkNumber,paymentDate,amount
customerNumber,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
103,1,HQ336336,2004-10-19,6066.78
103,2,JM555205,2003-06-05,14571.44


In [15]:
# Looking up payment date key that corresponds to the 'paymentDate' column
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "paymentDate"})
df_payments.paymentDate = df_payments.paymentDate.astype('datetime64[ns]').dt.date
df_payments = pd.merge(df_payments, df_dim_payment_date, on='paymentDate', how='left')
df_payments.drop(['paymentDate'], axis=1, inplace=True)
df_payments.head(2)

Unnamed: 0,payment_key,checkNumber,amount,payment_date_key
0,1,HQ336336,6066.78,20041019
1,2,JM555205,14571.44,20030605


In [16]:
df_fact_orders.insert(0, "order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,order_key,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,2,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [17]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_payments', df_payments, 'payment_key')
          ]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Dimension Tables were Created.

In [18]:
sql_customers = "SELECT * FROM classicmodels_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customerNumber,customerName,contactLastName,contactFirstName,addressLine1,city,postalCode,country,salesRepEmployeeNumber,creditLimit
0,1,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,83030,USA,1166.0,71800.0


In [19]:
sql_employees = "SELECT * FROM classicmodels_dw.dim_employees;"
df_dim_employees = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_employees)
df_dim_employees.head(2)

Unnamed: 0,employee_key,employeeNumber,lastName,firstName,extension,email,officeCode,jobTitle
0,1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,President
1,2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,VP Sales


In [20]:
sql_payments = "SELECT * FROM classicmodels_dw.dim_payments;"
df_dim_payments = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_payments)
df_dim_payments.head(2)

Unnamed: 0,payment_key,checkNumber,amount,payment_date_key
0,1,HQ336336,6066.78,20041019
1,2,JM555205,14571.44,20030605


#### Lookup the Primary Keys from the Dimension Tables

In [21]:
sql_dim_customers = "SELECT customer_key, customerNumber FROM classicmodels_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customerNumber
0,1,103
1,2,112


In [22]:
sql_dim_employees = "SELECT employee_key, employeeNumber FROM classicmodels_dw.dim_employees;"
df_dim_employees = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_employees)
df_dim_employees.head(2)

Unnamed: 0,employee_key,employeeNumber
0,1,1002
1,2,1056


In [23]:
df_fact_orders.head()

Unnamed: 0,order_key,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,2,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128
2,3,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,,181
3,4,10103,2003-01-29,2003-02-07,2003-02-02,Shipped,,121
4,5,10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,141


### Creating Fact Table

#### Lookup the Surrogate Primary Key (date_key) that Corresponds to the orderDate, requiredDate, shippedDate Columns

In [24]:
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "orderDate"})
df_fact_orders.orderDate = df_fact_orders.orderDate.astype('datetime64[ns]').dt.date
df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='orderDate', how='left')
df_fact_orders.drop(['orderDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_key,orderNumber,requiredDate,shippedDate,status,comments,customerNumber,order_date_key
0,1,10100,2003-01-13,2003-01-10,Shipped,,363,20030106
1,2,10101,2003-01-18,2003-01-11,Shipped,Check on availability.,128,20030109


In [25]:
df_dim_required_date = df_dim_date.rename(columns={"date_key" : "required_date_key", "full_date" : "requiredDate"})
df_fact_orders.requiredDate = df_fact_orders.requiredDate.astype('datetime64[ns]').dt.date
df_fact_orders = pd.merge(df_fact_orders, df_dim_required_date, on='requiredDate', how='left')
df_fact_orders.drop(['requiredDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_key,orderNumber,shippedDate,status,comments,customerNumber,order_date_key,required_date_key
0,1,10100,2003-01-10,Shipped,,363,20030106,20030113
1,2,10101,2003-01-11,Shipped,Check on availability.,128,20030109,20030118


In [26]:
df_dim_shipped_date = df_dim_date.rename(columns={"date_key" : "shipped_date_key", "full_date" : "shippedDate"})
df_fact_orders.shippedDate = df_fact_orders.shippedDate.astype('datetime64[ns]').dt.date
df_fact_orders = pd.merge(df_fact_orders, df_dim_shipped_date, on='shippedDate', how='left')
df_fact_orders.drop(['shippedDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_key,orderNumber,status,comments,customerNumber,order_date_key,required_date_key,shipped_date_key
0,1,10100,Shipped,,363,20030106,20030113,20030110.0
1,2,10101,Shipped,Check on availability.,128,20030109,20030118,20030111.0


In [27]:
# Merging the "fact_orders" and "dim_customers" dataframes on the 'customerNumber' to
# get the 'customer_key'. Then drop the 'customerNumber' column
df_fact_orders = pd.merge(df_fact_orders, df_dim_customers, on= 'customerNumber', how = 'inner')
df_fact_orders.drop(['customerNumber'], axis = 1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_key,orderNumber,status,comments,order_date_key,required_date_key,shipped_date_key,customer_key
0,1,10100,Shipped,,20030106,20030113,20030110.0,86
1,93,10192,Shipped,,20031120,20031129,20031125.0,86


In [28]:
# Insert a new 'fact_order_key' column, with an ever-incrementing numeric value,
# to serve as the surrogate primary key.
df_fact_orders.insert(0, 'fact_order_key', range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_key,orderNumber,status,comments,order_date_key,required_date_key,shipped_date_key,customer_key
0,1,1,10100,Shipped,,20030106,20030113,20030110.0,86
1,2,93,10192,Shipped,,20031120,20031129,20031125.0,86


#### Transforming table by dropping unneccessary columns 

In [29]:
df_fact_orders.drop(['comments'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_key,orderNumber,status,order_date_key,required_date_key,shipped_date_key,customer_key
0,1,1,10100,Shipped,20030106,20030113,20030110.0,86
1,2,93,10192,Shipped,20031120,20031129,20031125.0,86


#### Load Newly Transformed Fact Table into the classicmodels_dw Data Warehouse

In [30]:
dataframe = df_fact_orders
table_name = 'fact_orders'
primary_key = 'fact_order_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating that the fact order table was added to the Data Warehouse

In [31]:
sql_fact_orders = "SELECT * FROM classicmodels_dw.fact_orders;"
df_fact_order_check = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_fact_orders)
df_fact_order_check.head(2)

Unnamed: 0,fact_order_key,order_key,orderNumber,status,order_date_key,required_date_key,shipped_date_key,customer_key
0,1,1,10100,Shipped,20030106,20030113,20030110.0,86
1,2,93,10192,Shipped,20031120,20031129,20031125.0,86


### Demonstrate that the New Data Warehouse Exists and Contains the Correct Data

In [32]:
sql_orders = """
    SELECT 
        dc.customerName, 
        dc.contactLastName, 
        dc.contactFirstName, 
        de.lastName AS salesRepLastName, 
        de.firstName AS salesRepFirstName, 
        COUNT(fo.orderNumber) AS totalOrders
    FROM 
        fact_orders fo
    JOIN 
        dim_customers dc ON fo.customer_key = dc.customer_key
    JOIN 
        dim_employees de ON dc.salesRepEmployeeNumber = de.employeeNumber
    GROUP BY 
        dc.customerName, dc.contactLastName, dc.contactFirstName, de.lastName, de.firstName
    ORDER BY 
        totalOrders DESC;
    
"""

In [33]:
df_fact_order_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_orders)
df_fact_order_customers

Unnamed: 0,customerName,contactLastName,contactFirstName,salesRepLastName,salesRepFirstName,totalOrders
0,Euro+ Shopping Channel,Freyre,Diego,Hernandez,Gerard,26
1,Mini Gifts Distributors Ltd.,Nelson,Susan,Jennings,Leslie,17
2,Danish Wholesale Imports,Petersen,Jytte,Castillo,Pamela,5
3,"Down Under Souveniers, Inc",Graham,Mike,Marsh,Peter,5
4,"Dragon Souveniers, Ltd.",Natividad,Eric,Nishi,Mami,5
...,...,...,...,...,...,...
93,Online Mini Collectables,Barajas,Miguel,Firrelli,Julie,2
94,Amica Models & Co.,Accorti,Paolo,Castillo,Pamela,2
95,"Norway Gifts By Mail, Co.",Klaeboe,Jan,Jones,Barry,2
96,Marta's Replicas Co.,Hernandez,Marta,Patterson,Steve,2


In [34]:
sales_rep_orders = """
    SELECT 
        de.lastName AS salesRepLastName, 
        de.firstName AS salesRepFirstName, 
        de.jobTitle, 
        COUNT(fo.orderNumber) AS totalOrdersHandled
    FROM 
        fact_orders fo
    JOIN 
        dim_customers dc ON fo.customer_key = dc.customer_key
    JOIN 
        dim_employees de ON dc.salesRepEmployeeNumber = de.employeeNumber
    WHERE 
        de.jobTitle = 'Sales Rep'
    GROUP BY 
        de.lastName, de.firstName, de.jobTitle
    ORDER BY 
        totalOrdersHandled DESC;
    
"""

In [35]:
df_sales_rep_order_handled = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sales_rep_orders)
df_sales_rep_order_handled

Unnamed: 0,salesRepLastName,salesRepFirstName,jobTitle,totalOrdersHandled
0,Hernandez,Gerard,Sales Rep,43
1,Jennings,Leslie,Sales Rep,34
2,Castillo,Pamela,Sales Rep,31
3,Jones,Barry,Sales Rep,25
4,Vanauf,George,Sales Rep,22
5,Bott,Larry,Sales Rep,22
6,Bondur,Loui,Sales Rep,20
7,Fixter,Andy,Sales Rep,19
8,Marsh,Peter,Sales Rep,19
9,Patterson,Steve,Sales Rep,18


In [36]:
customer_info = """
SELECT 
    country, 
    COUNT(customerNumber) AS numberOfCustomers, 
    AVG(creditLimit) AS averageCreditLimit
FROM 
    dim_customers
GROUP BY 
    country
ORDER BY 
    numberOfCustomers DESC, averageCreditLimit DESC;
    
"""

In [37]:
df_customer_info = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, customer_info)
df_customer_info

Unnamed: 0,country,numberOfCustomers,averageCreditLimit
0,USA,36,78102.777778
1,Germany,13,19776.923077
2,France,12,77691.666667
3,Spain,7,73971.428571
4,UK,5,88740.0
5,Australia,5,86060.0
6,Italy,4,97200.0
7,New Zealand,4,90625.0
8,Finland,3,95266.666667
9,Canada,3,76200.0
