## MIDTERM Project:
In this midterm I am using the **classicmodels** database to create a data mart and showcase ETL processing. I am attaching the sql file for the classicmodels database to my submission. I will fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame back to the RDBMS using a Pandas function that will create the table and fill it with data with a single operation.

#### My Project Description
I will have 4 dimension tables (dim_customers, dim_employees, dim_date, and dim_products), and one fact table (fact_orders). The data mart will represent a business process of order management and will help provide for summarization and analysis between sales employees, customers, products, and dates. I will be starting with MySQL steps, then proceed to MongoDB and local file system.

### Python MySQL Related Section
#### Prerequisites:
We have to make sure sqlalchemy is installed first.

- `python -m pip install sqlalchemy`

#### Import the Necessary Libraries

In [1]:
import os
import numpy
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
import json
import datetime
import certifi
import pandas as pd

import pymongo

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which I'll be Working 

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "classicmodels"
dst_dbname = "classic_data_mart"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.
Code to *drop* a database if it already exists, and then *create* the new **classic_data_mart** database and *use* it as the target of all subsequent operations.

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
connection.execute(f"CREATE DATABASE `{dst_dbname}`;")
connection.execute(f"USE {dst_dbname};")

connection.close()

### 1.0. Create & Populate the Dimension Tables
We need to create and populate the dimension tables first before the fact tables. I am using 4 dimension tables: dim_customers, dim_employees, and dim_date.

#### 1.1. Extract Data from the Source Database Tables
Fetch data for each dimension table (e.g., customers, employees) from the **classicmodels** database using the **get_dataframe()** function.

In [5]:
sql_customers = "SELECT * FROM classicmodels.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


#### 1.2. Create the Date Dimension Table
I executed the date dimension table from lab 2c, which I have submitted with my midterm files, to populate the date dimension table. In order to find the data range I looked up the min and max for  Be certain to target this script to the I did this on my new data warehouse **(classic_data_mart)**.  Later in this notebook we will integrate the **dim_date** table with the fact table by performing **lookup operations** to retrieve the surrogate primary keys from the **dim_date** table that correspond with each **date** typed column in the fact table (e.g., order_date).

To find the data range I used in the CALL procedure,
I executed the following statements in MySQL for the classicmodels database.

_USE classicmodels; </br>
SELECT min(orderDate), max(orderDate) </br>
FROM orders;</br>
SELECT min(requiredDate), max(requiredDate)</br> 
FROM orders;</br>
SELECT min(shippedDate), max(shippedDate) </br>
FROM orders;_</br>

Respectively for each SELECT statement I got a date that fell within the start of 2003 and the end of 2005, so that is the range I used.

#### 1.3. Perform Any Necessary Transformations
Dropping columns that I don't believe provide any real value to my analytics solution. Examples include columns having a high percentage of NULL values, columns having large amounts of free-text, and columns having binary large object (BLOB) data such as images or other documents. Then, we will rename the primary key column from the source (id) to serve as the business key for future lookup operations. Finally, we will *insert* a new primary key column that contains and ever-increasing numeric value.  It should be named after the entity (e.g., customer, product) followed by "**_key**" to conform with data warehouse design standards.

In [6]:
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
# I removed addressLine1 and addressLine2 because I felt that I don't really need it in my analysis and line2 had a lot
# of null values that, so I got rid of the address. I also got rid of state because there were a lot of null values.
drop_cols = ['addressLine1', 'addressLine2', 'state']
df_customers.drop(drop_cols, axis=1, inplace=True)

# 2. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_customers.rename(columns={"customerNumber":"customer_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate my work
df_customers.head(2)

Unnamed: 0,customer_key,customer_id,customerName,contactLastName,contactFirstName,phone,city,postalCode,country,salesRepEmployeeNumber,creditLimit
0,1,103,Atelier graphique,Schmitt,Carine,40.32.2555,Nantes,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,7025551838,Las Vegas,83030,USA,1166.0,71800.0


#### 1.4. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables
Here I'm creating the rest of my dimension tables in my **classic_data_mart** data warehouse.

In [7]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key')]

In [8]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 2.0. Create & Populate the Fact Table

### Instead, implement the solution using Pandas DataFrames to craft the table
First, I'll query the source **classicmodels** database to fill a *dataframe* for each of the source tables I need to create my *fact_orders* fact table; orders and orderdetails. Then I'll *join* those *dataframes* using the <a href="https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge.html">**merge( )**</a> method of the Pandas DataFrame.  I'll make any additional changes that I expect to see reflected in the *fact* table in my new MySQL database, including the addition of **foreign key references** to the dimension tables, and then I'll push the *dataframe* back to the MySQL server to create and populate the new *fact* table.

#### 2.1. Get all the data from the 2 tables involved

In [9]:
sql_orders = "SELECT * FROM classicmodels.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_orders.head(2)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


In [10]:
sql_order_details = "SELECT * FROM classicmodels.orderdetails;"
df_order_details = get_dataframe(user_id, pwd, host_name, src_dbname, sql_order_details)
# I inserted a new order_details_id column because the primary key was order number and product code together
# so I thought it would be easier to have a primary key that is one column
# I also want to merge with orders on orderNumber, so renamed that the same 'order_id'
df_order_details.insert(0, "order_details_id", range(1, df_order_details.shape[0]+1))
df_order_details.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_order_details.head(2)

Unnamed: 0,order_details_id,order_id,productCode,quantityOrdered,priceEach,orderLineNumber
0,1,10100,S18_1749,30,136.0,3
1,2,10100,S18_2248,50,55.09,2


#### 2.2. Join the Orders and OrderDetails DataFrames
I'm joining the *orders* and *order_details* dataframes. Since each **order** (the *left* dataframe) can have many **order details** (the *right* dataframe), we'll need to implement a **right** *outer join* **on** the *order_id* column.

In [11]:
df_fact_orders = pd.merge(df_orders, df_order_details, on='order_id', how='right')
df_fact_orders.head(2)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,comments,customerNumber,order_details_id,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,1,S18_1749,30,136.0,3
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,2,S18_2248,50,55.09,2


In [12]:
df_fact_orders.shape

(2996, 12)

#### 2.5. Lookup the Primary Keys from the Dimension Tables
I am establishing **foreign key relationships** between the newly-crafted **Fact table** and each of the **Dimension tables**.

##### 2.5.1. First, fetch the Surrogate Primary Key and the Business Key from each of the Dimension tables using the **get_dataframe()** function.

#### Important Notes:

For the merge of the df_dim_customers table and the fact orders tables, I need to make sure to have employee_id. This is because one of the dimension tables I want to use is the employees table, however it doesn't have an ID (connecting key) to either of the original orders tables in the **classicmodels** database.
However, the employee number is in the customers table, so I am going to join the customers table to the orders table, but also keep employee number column and rename it employee id.
I double checked and all the orders connect to a customer who has a not null salesRepEmployeeNumber, so I believe that it means the employee is connected to the customer and helps them with all their orders, so it also means that the employee is connected to the order. 

In [13]:
# Select 'customer_key' and 'customer_id', and salesRepEmployeeNumber from classic_data_mart.dim_customers
dim_customers_sql = "SELECT customer_key, customer_id, salesRepEmployeeNumber AS employee_id FROM classic_data_mart.dim_customers"
df_dim_customers = get_dataframe(user_id, pwd, host_name, src_dbname, dim_customers_sql)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id,employee_id
0,1,103,1370.0
1,2,112,1166.0


##### 2.5.2. Next, using the Business Keys, lookup the corresponding Surrogate Primary Key values in the Dimension tables

In [14]:
# 1. I have to rename the the column in fact orders the correlates with customerID in dim_customers to merge properly
# 2. Modify 'df_fact_orders' by merging it with 'df_dim_customers' on the 'customer_id' column
# 3. Drop the 'customer_id' column
# 4. Display the first 2 rows of the dataframe to validate your work
# If you notice, I also have employee_id here

df_fact_orders.rename(columns={"customerNumber":"customer_id"}, inplace=True)
df_fact_orders = pd.merge(df_fact_orders, df_dim_customers, on='customer_id', how='inner')
df_fact_orders.drop(['customer_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,comments,order_details_id,productCode,quantityOrdered,priceEach,orderLineNumber,customer_key,employee_id
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,1,S18_1749,30,136.0,3,86,1216.0
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,2,S18_2248,50,55.09,2,86,1216.0


In [15]:
# Dropping the comments column because it was mostly NULL
df_fact_orders.drop(['comments'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,order_details_id,productCode,quantityOrdered,priceEach,orderLineNumber,customer_key,employee_id
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,1,S18_1749,30,136.0,3,86,1216.0
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,2,S18_2248,50,55.09,2,86,1216.0


##### 2.5.3. Lookup the DateKeys from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. Be certain to cast the **full_date** column to the **datetime64[ns]** data type using the **.astype()** function that is native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute of the **datetime64[ns]** datatype.

In [16]:
sql_dim_date = "SELECT date_key, full_date FROM classic_data_mart.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20030101,2003-01-01
1,20030102,2003-01-02


Next, for each **date** typed column in the Fact table, lookup the corresponding Primary Key column. Be certain to cast each **date** column to the **datetime64[ns]** data type using the **.astype()** function that's native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute.

In [17]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "orderDate" Column.
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "orderDate"})
df_fact_orders.orderDate = df_fact_orders.orderDate.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='orderDate', how='left')
df_fact_orders.drop(['orderDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,requiredDate,shippedDate,status,order_details_id,productCode,quantityOrdered,priceEach,orderLineNumber,customer_key,employee_id,order_date_key
0,10100,2003-01-13,2003-01-10,Shipped,1,S18_1749,30,136.0,3,86,1216.0,20030106
1,10100,2003-01-13,2003-01-10,Shipped,2,S18_2248,50,55.09,2,86,1216.0,20030106


In [18]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "requiredDate" Column.
df_dim_required_date = df_dim_date.rename(columns={"date_key" : "required_date_key", "full_date" : "requiredDate"})
df_fact_orders.requiredDate = df_fact_orders.requiredDate.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_required_date, on='requiredDate', how='left')
df_fact_orders.drop(['requiredDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,shippedDate,status,order_details_id,productCode,quantityOrdered,priceEach,orderLineNumber,customer_key,employee_id,order_date_key,required_date_key
0,10100,2003-01-10,Shipped,1,S18_1749,30,136.0,3,86,1216.0,20030106,20030113
1,10100,2003-01-10,Shipped,2,S18_2248,50,55.09,2,86,1216.0,20030106,20030113


In [19]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "shippedDate" Column.
df_dim_shipped_date = df_dim_date.rename(columns={"date_key" : "shipped_date_key", "full_date" : "shippedDate"})
df_fact_orders.shippedDate = df_fact_orders.shippedDate.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_shipped_date, on='shippedDate', how='left')
df_fact_orders.drop(['shippedDate'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,status,order_details_id,productCode,quantityOrdered,priceEach,orderLineNumber,customer_key,employee_id,order_date_key,required_date_key,shipped_date_key
0,10100,Shipped,1,S18_1749,30,136.0,3,86,1216.0,20030106,20030113,20030110.0
1,10100,Shipped,2,S18_2248,50,55.09,2,86,1216.0,20030106,20030113,20030110.0


#### 2.6. Perform any Additional Transformations
In this step we can prepare the DataFrame so that it defines exactly what we want to see created in the database.  Issues may include dropping unwanted columns, reordering the columns, and in our case, creating a new column to serve as the primary key.

In [20]:
# 1. Drop the columns of no particular interest
# 2. Reorder the remaining columns
# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
# 4. Display the first 2 rows of the dataframe to validate your work
drop_cols = ['orderLineNumber']
df_fact_orders.drop(drop_cols, axis=1, inplace=True)

ordered_cols = ['order_id', 'order_details_id', 'customer_key', 'employee_id', 'productCode',
               'order_date_key', 'required_date_key', 'shipped_date_key', 'quantityOrdered', 'priceEach', 
                'status']
df_fact_orders = df_fact_orders[ordered_cols]

df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_id,order_details_id,customer_key,employee_id,productCode,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status
0,1,10100,1,86,1216.0,S18_1749,20030106,20030113,20030110.0,30,136.0,Shipped
1,2,10100,2,86,1216.0,S18_2248,20030106,20030113,20030110.0,50,55.09,Shipped


#### 2.7. Write the DataFrame Back to the Database

In [21]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

## MongoDB Section of Code

In this lab I will build upon the **classicmodels** dimensional database ; however, you will be integrating new data sourced from an instance of MongoDB. The new data will be concerned with new business processes; inventory and purchasing. You will continue to interact with both the source systems (MongoDB and MySQL), and the destination system (the classic_data_mart data warehouse) from a remote client running Python (Jupyter Notebooks). 

I'm fetching data into Pandas DataFrames, performing all the necessary transformations in-memory on the client, and then pushing the newly transformed DataFrame to the RDBMS data warehouse using a Pandas function that will create the table and fill it with data with a single operation.

### Prerequisites:
This notebook uses the PyMongo database connectivity library to connect to MySQL databases; therefore, you must have first installed that libary into your python environment by executing the following command in a Terminal window.

- `python -m pip install pymongo[srv]`

#### Import the Necessary Libraries
Import statements are at the top

In [22]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.3


In [23]:
# Declare & Assign Connection Variables for the MongoDB Server, 
# the MySQL Server & Databases with which You'll be Working 
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "classic_data_mart"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "",
    "password" : "password",
    "cluster_name" : "cluster_name",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local", # "atlas"
    "db_name" : "classicmodels_products"
}

In [24]:
# Define Functions for Getting Data From and Setting Data Into Databases
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [25]:
# Populate MongoDB with Source Data
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"products" : 'classicmodels_products.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

In [26]:
# Extract Data from the Source MongoDB Collections Into DataFrames
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "products"

df_products = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


In [27]:
# Data Transformations to products table
# 1. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_products.rename(columns={"productCode":"product_id"}, inplace=True)

# 2. Since there are no values in the 'due_date' column go ahead and drop it.
df_products.drop(['productDescription'], axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))
df_products.head(2)

Unnamed: 0,product_key,product_id,productName,productLine,productScale,productVendor,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,7305,98.58,214.3


In [28]:
# Load the Transformed DataFrames into the New Data Warehouse by Creating New Table
dataframe = df_products
table_name = 'dim_products'
primary_key = 'product_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [29]:
# Validate that the New Dimension Tables were Created.
sql_products = "SELECT * FROM classic_data_mart.dim_products;"
df_dim_products = get_sql_dataframe(sql_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,product_id,productName,productLine,productScale,productVendor,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,7305,98.58,214.3


In [30]:
# First, fetch the Surrogate Primary Key and the Business Key from each Dimension table
sql_dim_products = "SELECT product_key, product_id FROM classic_data_mart.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,product_id
0,1,S10_1678
1,2,S10_1949


In [31]:
#Next, use Business Keys to lookup corresponding Surrogate Primary Keys in the Dimension tables

# Merge the "fact_orders" and "dim_products" dataframes on the 'product_id' to
# get the 'product_key'. Then drop the 'product_id' column and display the results.
# Also need to get fact_orders table from MySQL
sql_fact_orders = "SELECT * FROM classic_data_mart.fact_orders;"
df_fact_orders = get_sql_dataframe(sql_fact_orders, **mysql_args)
df_fact_orders.rename(columns={"productCode":"product_id"}, inplace=True)
df_fact_orders = pd.merge(df_fact_orders, df_dim_products, on='product_id', how='inner')
df_fact_orders.drop(['product_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_id,order_details_id,customer_key,employee_id,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status,product_key
0,1,10100,1,86,1216.0,20030106,20030113,20030110.0,30,136.0,Shipped,23
1,294,10379,2611,11,1370.0,20050210,20050218,20050211.0,39,156.4,Shipped,23


In [32]:
# This is inserting the updated table into the mySQL data warehouse
dataframe = df_fact_orders
table_name = 'fact_orders'
primary_key = 'fact_order_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [33]:
# TODO: Validate the correctness of the new "Fact Orders" fact table.
sql_fact_orders = "SELECT * FROM classic_data_mart.fact_orders;"
df_fact_orders = get_sql_dataframe(sql_fact_orders, **mysql_args)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_id,order_details_id,customer_key,employee_id,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status,product_key
0,1,10100,1,86,1216.0,20030106,20030113,20030110.0,30,136.0,Shipped,23
1,2,10100,2,86,1216.0,20030106,20030113,20030110.0,50,55.09,Shipped,27


## Dimension Table from Local File System Section

In [47]:
# Import Statements are above
#Loading Data from CSV File
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'classicmodels_employees.csv')

df_employees = pd.read_csv(data_file, header=0, index_col=0)
df_employees.head(2)

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


In [49]:
# Employees

# 1. I didn't drop any columns because there were no null columns and all the columns seemed relevant/useful.
# 2. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_employees.rename(columns={"employeeNumber":"employee_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_employees.insert(0, "employee_key", range(1, df_employees.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_employees.head(2)

Unnamed: 0,employee_key,employee_id,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
1,1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
2,2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


In [52]:
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
db_operation = "insert"

tables = [('dim_employees', df_employees, 'employee_key')]
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [53]:
# Employees: getting dimension table data
dim_employees_sql = "SELECT employee_key, employee_id FROM classic_data_mart.dim_employees"
df_dim_employees = get_dataframe(user_id, pwd, host_name, src_dbname, dim_employees_sql)
df_dim_employees.head(2)

Unnamed: 0,employee_key,employee_id
0,1,1002
1,2,1056


In [54]:
# Repeat for the Employees dimension
# Since I have the employee id from the customers table I can merge on employee_id
sql_fact_orders = "SELECT * FROM classic_data_mart.fact_orders;"
df_fact_orders = get_sql_dataframe(sql_fact_orders, **mysql_args)
df_fact_orders = pd.merge(df_fact_orders, df_dim_employees, on='employee_id', how='inner')
df_fact_orders.drop(['employee_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_id,order_details_id,customer_key,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status,product_key,employee_key
0,1,10100,1,86,20030106,20030113,20030110.0,30,136.0,Shipped,23,10
1,2,10100,2,86,20030106,20030113,20030110.0,50,55.09,Shipped,27,10


In [57]:
# Inserting into MySQL data warehouse
dataframe = df_fact_orders
table_name = 'fact_orders'
primary_key = 'fact_order_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [58]:
# TODO: Validate the correctness of the new "Fact Orders" fact table.
sql_fact_orders = "SELECT * FROM classic_data_mart.fact_orders;"
df_fact_orders = get_sql_dataframe(sql_fact_orders, **mysql_args)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_id,order_details_id,customer_key,order_date_key,required_date_key,shipped_date_key,quantityOrdered,priceEach,status,product_key,employee_key
0,1,10100,1,86,20030106,20030113,20030110.0,30,136.0,Shipped,23,10
1,2,10100,2,86,20030106,20030113,20030110.0,50,55.09,Shipped,27,10


### SQL Queries
Select SQL statement that returns:
- Each Customer’s Last Name
- quantity each customer ordered
- total price spent by each customer

In [63]:
sql_test = """
    SELECT customers.`contactLastName` AS `Customer Last Name`,
        SUM(orders.`quantityOrdered`) AS `Quantity Ordered`,
        SUM(orders.`priceEach`) AS `Price`
    FROM `fact_orders` AS orders
    INNER JOIN `dim_customers` AS customers
    ON orders.customer_key = customers.customer_key
    GROUP BY customers.`contactLastName`
    ORDER BY `Price` DESC;
""".format(dst_dbname)

src_dbname='classic_data_mart'
df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Customer Last Name,Quantity Ordered,Price
0,Freyre,9327.0,22680.0
1,Nelson,7161.0,18866.93
2,Young,4185.0,10845.51
3,Frick,3372.0,7884.86
4,Brown,3372.0,7795.13


In [66]:
# This shows how many orders each employee handled on certain dates, doesn't have a row if they handled none.
sql_test = """
    SELECT COUNT(order_date_key) AS `Num of Orders` , order_date_key AS `order date`, employees.lastName AS `Employee Last Name`
    FROM `fact_orders` AS orders
    INNER JOIN `dim_employees` AS employees
    ON orders.employee_key = employees.employee_key
    GROUP BY `order date`, employees.lastName
""".format(dst_dbname)

src_dbname='classic_data_mart'
df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head(10)

Unnamed: 0,Num of Orders,order date,Employee Last Name
0,4,20030106,Patterson
1,16,20031120,Patterson
2,29,20041104,Patterson
3,4,20030109,Jones
4,8,20040315,Jones
5,8,20031004,Jones
6,11,20041105,Jones
7,2,20030110,Tseng
8,9,20040405,Tseng
9,14,20041105,Tseng
