## DS 2002 Project 1
Project completed according to the requirements sent out via email/Canvas Course Announcement on March 3rd (and also drawing from the requirements of the project document here that don't conflict with the version sent out via email: https://github.com/JTupitza-UVA/DS-2002/blob/main/Projects/DS-2002-Data-Project-1.pdf)

### Project Info:
**Author Name**: Calvin Pan  
**Author Computing ID**: nqc8gh@virginia.edu  
**Instructor Name**: Jon Tupitza  
**Course**: DS 2002 Data Science Systems - Spring 2025 Section 001  
**Due Date**: 3/17/2025  

## Step 0: Setup
### Step 0.1: Import the Necessary Libraries

In [45]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [46]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.34
Running PyMongo Version: 4.11.2


### Step 0.2: Create connections to MySQL and MongoDB

In [48]:
mysql_args = {
    "uid" : "root",
    "pwd" : "#Pelican1",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "calvinpan1",
    "password" : "Pelican1",
    "cluster_name" : "ds2002",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "atlas", # "local"
    "db_name" : "project1"
}

In [49]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = "mongodb+srv://calvinpan1:Pelican1@ds2002.hc2ph.mongodb.net/?retryWrites=true&w=majority&appName=DS2002&authSource=admin"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

## Step 1: Exporting adventureworks to everything
### Step 1.1: Downloading adventureworks and exporting it into MySQL

Done via running this script in my local SQL instance: https://github.com/JTupitza-UVA/DS-2002/blob/main/Projects/Scripts/AdventureWorks_MySQL.sql 

### Step 1.2 Exporting dim_date into the adventureworks schema in MySQL


Done via running in my local SQL instance a slightly edited version of the Lab 2b code for dim_date obtained here (only change was to replace USE northwind_dw with USE adventureworks): https://github.com/JTupitza-UVA/DS-2002/blob/main/01-SQL/Labs/Lab_02c_Create_Populate_Dim_Date.sql

### Step 1.3 Exporting PurchaseOrderHeader from the adventureworks table into MongoDB

In [55]:
# Export the SQL purchaseorderheader data to a dataframe:
sql_get_adventureworks_poh = 'SELECT * FROM adventureworks.purchaseorderheader'
df_dim_purchaseorderheader = get_sql_dataframe(sql_get_adventureworks_poh, **mysql_args)
df_dim_purchaseorderheader.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ShipMethodID,OrderDate,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
0,1,0,4,244,83,3,2001-05-17,2001-05-26,201.04,16.0832,5.026,222.1492,2001-05-26
1,2,0,1,231,32,5,2001-05-17,2001-05-26,272.1015,21.7681,6.8025,300.6721,2001-05-26


In [56]:
# Export the dataframe to a JSON formatted dict:
dict_dim_purchaseorderheader = df_dim_purchaseorderheader.to_dict(orient='records')

# To avoid date type error, convert DataFrame to JSON with ISO dates and load back into a dictionary
dict_dim_purchaseorderheader = json.loads(
    df_dim_purchaseorderheader.to_json(date_format='iso', orient='records')
)

# Write the dict to a JSON file:
with open(os.path.join(os.getcwd(), 'purchaseorderheader.json'), "w") as o:
    json.dump(dict_dim_purchaseorderheader, o, indent=4)

In [57]:
# Load the JSON file to MongoDB 
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd())

json_files = {'purchaseorderheader': 'purchaseorderheader.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

### Step 1.4 Exporting products from the SQL adventureworks table into a CSV through a PySQL query

In [59]:
# Export the SQL products data to a dataframe:
sql_get_adventureworks_products = 'SELECT * FROM adventureworks.product'
df_dim_products = get_sql_dataframe(sql_get_adventureworks_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


In [60]:
# Export the dataframe to a CSV:
dest_file = (os.path.join(os.getcwd(), 'products.csv'))
csv_dim_products = df_dim_products.to_csv(dest_file)

## Step 2: Extracting all the data from all necessary sources (including the ones we just exported to)
### Step 2.1: Extracting employees from the MySQL version of adventureworks

In [62]:
sql_get_employees = 'SELECT * FROM adventureworks.employee'
df_dim_employees = get_sql_dataframe(sql_get_employees, **mysql_args)
# Drop rowguid because it's causing problems:
df_dim_employees.drop('rowguid', axis=1, inplace=True)
df_dim_employees.head(2)

Unnamed: 0,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,ModifiedDate
0,1,14417807,1209,adventure-works\guy1,16.0,Production Technician - WC60,1972-05-15,M,M,1996-07-31,b'\x00',21,30,b'\x01',2004-07-31
1,2,253022876,1030,adventure-works\kevin0,6.0,Marketing Assistant,1977-06-03,S,M,1997-02-26,b'\x00',42,41,b'\x01',2004-07-31


### Step 2.2: Extracting purchaseorderheader from the MongoDB version of adventureworks

In [64]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "purchaseorderheader"

df_dim_poh = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_dim_poh.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ShipMethodID,OrderDate,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
0,1,0,4,244,83,3,2001-05-17T00:00:00.000,2001-05-26T00:00:00.000,201.04,16.0832,5.026,222.1492,2001-05-26T00:00:00.000
1,2,0,1,231,32,5,2001-05-17T00:00:00.000,2001-05-26T00:00:00.000,272.1015,21.7681,6.8025,300.6721,2001-05-26T00:00:00.000


### Step 2.3: Extracting products from our CSV

In [66]:
dest_file = (os.path.join(os.getcwd(), 'products.csv'))

df_dim_products = pd.read_csv(dest_file)
df_dim_products.head(2)

Unnamed: 0.1,Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,...,,,,,,1998-06-01,,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,...,,,,,,1998-06-01,,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


#### Step 2.3.1 : Clean dim_products

In [68]:
# Drop a bunch of blank and BLOB columns:
drop_cols = ['MakeFlag','FinishedGoodsFlag', 'rowguid']
df_dim_products.drop(drop_cols, axis=1, inplace=True)
df_dim_products.dropna(how='all', axis=1, inplace=True)
df_dim_products.head(2)

Unnamed: 0.1,Unnamed: 0,ProductID,Name,ProductNumber,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,...,Weight,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,ModifiedDate
0,0,1,Adjustable Race,AR-5381,,1000,750,0.0,0.0,,...,,0,,,,,,1998-06-01,,2004-03-11 10:01:36
1,1,2,Bearing Ball,BA-8327,,1000,750,0.0,0.0,,...,,0,,,,,,1998-06-01,,2004-03-11 10:01:36


### Step 2.4: Using MySQL to extract a new adventureworks table.
Required transformations include looking up the primary keys from each dimension table to establish a foreign key relationship between each dimension and the fact table.  Other transformations may include renaming columns, dropping columns, and reordering columns.

#### Step 2.4.1: Extract purchaseorderdetail from the SQL adventureworks db

In [71]:
sql_get_pod = 'SELECT * FROM adventureworks.purchaseorderdetail'
df_pod = get_sql_dataframe(sql_get_pod, **mysql_args)
df_pod.head(2)

Unnamed: 0,PurchaseOrderID,PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,LineTotal,ReceivedQty,RejectedQty,StockedQty,ModifiedDate
0,1,1,2001-05-31,4,1,50.26,201.04,3.0,0.0,3.0,2001-05-24
1,2,2,2001-05-31,3,359,45.12,135.36,3.0,0.0,3.0,2001-05-24


#### Step 2.4.2: Merge dim_employees and dim_poh

In [73]:
# Create an Employee_Key that descends
df_dim_employees['Employee_Key'] = range(1, len(df_dim_employees) + 1)

# Rename modified_date on both to be distinct
df_dim_employees.rename(columns = {'ModifiedDate' : 'EmpModifiedDate'} , inplace = True)
df_dim_poh.rename(columns = {'ModifiedDate' : 'POHModifiedDate'}, inplace = True)

# Merge on EmployeeID
df_dim_poh = pd.merge(df_dim_poh, df_dim_employees, on='EmployeeID', how='inner')
df_dim_poh.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ShipMethodID,OrderDate,ShipDate,SubTotal,TaxAmt,...,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,EmpModifiedDate,Employee_Key
0,1,0,4,244,83,3,2001-05-17T00:00:00.000,2001-05-26T00:00:00.000,201.04,16.0832,...,1961-02-04,S,F,2000-03-03,b'\x00',53,46,b'\x01',2004-07-31,244
1,2,0,1,231,32,5,2001-05-17T00:00:00.000,2001-05-26T00:00:00.000,272.1015,21.7681,...,1960-12-25,M,M,2000-02-05,b'\x00',57,48,b'\x01',2004-07-31,231


#### Step 2.4.3: Merge purchaseorderdetail with dim_products

In [75]:
# Create a Product Key that descends
df_dim_products['Product_Key'] = range(1, len(df_dim_products) + 1)

# Rename modified_date on both to be distinct
df_dim_products.rename(columns = {'ModifiedDate' : 'ProductsModifiedDate'} , inplace = True)
df_pod.rename(columns = {'ModifiedDate' : 'PODModifiedDate'}, inplace = True)

# Merge on ProductID
df_pod = pd.merge (df_pod, df_dim_products, on='ProductID', how ='inner')
df_pod.head(2)

Unnamed: 0,PurchaseOrderID,PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,LineTotal,ReceivedQty,RejectedQty,StockedQty,...,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,ProductsModifiedDate,Product_Key
0,1,1,2001-05-31,4,1,50.26,201.04,3.0,0.0,3.0,...,0,,,,,,1998-06-01,,2004-03-11 10:01:36,1
1,2,2,2001-05-31,3,359,45.12,135.36,3.0,0.0,3.0,...,0,,,,,,1998-06-01,,2004-03-11 10:01:36,38


#### Step 2.4.4: Merge purchaseorderdetail (final fact table) with dim_poh

In [77]:
# Create a POD Key that descends
df_pod['PurchaseOrderDetail_Key'] = range(1, len(df_pod) + 1)

# Create a POH Key that descends
df_dim_poh['PurchaseOrderHistory_Key'] = range(1, len(df_dim_poh) + 1)

# Merge on PurchaseOrderID
df_pod = pd.merge (df_pod, df_dim_poh, on='PurchaseOrderID', how = 'inner')
df_pod.head(2)

Unnamed: 0,PurchaseOrderID,PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,LineTotal,ReceivedQty,RejectedQty,StockedQty,...,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,EmpModifiedDate,Employee_Key,PurchaseOrderHistory_Key
0,1,1,2001-05-31,4,1,50.26,201.04,3.0,0.0,3.0,...,S,F,2000-03-03,b'\x00',53,46,b'\x01',2004-07-31,244,1
1,2,2,2001-05-31,3,359,45.12,135.36,3.0,0.0,3.0,...,M,M,2000-02-05,b'\x00',57,48,b'\x01',2004-07-31,231,2


### 2.5. Load Newly Transformed Data into the adventureworks Data Warehouse

In [79]:
dataframe = df_dim_products
table_name = 'dim_products'
primary_key = 'Product_Key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [80]:
dataframe = df_dim_employees
table_name = 'dim_employees'
primary_key = 'Employee_Key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [81]:
dataframe = df_pod
table_name = 'purchaseorderdetail'
primary_key = 'PurchaseOrderDetail_Key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [82]:
dataframe = df_dim_poh
table_name = 'dim_purchaseorderhistory'
primary_key = 'PurchaseOrderHistory_Key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

## Step 3: Writing SQL Queries
### Step 3.1: Query 1
SQL SELECT statement that returns:

Each Employee's ID  
Each Product's ID  
The total amount of the purchase order detail quantity associated with each employee and product  
The total amount of the purchase order detail unit cost associated with each employee and product  

In [151]:
sql_purchase_orders = """
    SELECT 
    e.EmployeeID,
    p.ProductID,
    SUM(pod.OrderQty) AS total_quantity,
    SUM(pod.UnitPrice) AS total_price
FROM 
    adventureworks.purchaseorderdetail AS pod
JOIN 
    dim_employees AS e ON pod.EmployeeID = e.EmployeeID
JOIN 
    dim_products AS p ON pod.ProductID = p.ProductID
GROUP BY 
    e.EmployeeID,
    p.ProductID;

"""

In [153]:
df_fact_purchase_orders = get_sql_dataframe(sql_purchase_orders, **mysql_args)
df_fact_purchase_orders

Unnamed: 0,EmployeeID,ProductID,total_quantity,total_price
0,244,1,19.0,301.5775
1,266,4,18.0,342.1530
2,233,317,4400.0,224.0280
3,233,318,4400.0,276.1920
4,233,319,7150.0,610.3335
...,...,...,...,...
1980,223,447,3.0,41.3805
1981,233,476,1650.0,47.6595
1982,198,914,1100.0,42.1890
1983,266,415,15.0,227.3880


### Step 3.2: Query 2
SQL SELECT statement that returns:

Each Product's ID  
Each Employee's ID
The count of the purchase order detail unit cost associated with each product and employee


In [171]:
sql_purchase_orders = """
    SELECT 
        p.ProductID,
        e.EmployeeID,
        COUNT(*) AS total_count
    FROM 
        adventureworks.purchaseorderdetail AS pod
    JOIN 
        dim_employees AS e ON pod.EmployeeID = e.EmployeeID
    JOIN 
        dim_products AS p ON pod.ProductID = p.ProductID
    GROUP BY
        e.EmployeeID,
        p.ProductID;
"""

In [173]:
df_fact_purchase_orders = get_sql_dataframe(sql_purchase_orders, **mysql_args)
df_fact_purchase_orders

Unnamed: 0,ProductID,EmployeeID,total_count
0,1,244,6
1,4,266,6
2,317,233,8
3,318,233,8
4,319,233,13
...,...,...,...
1980,447,223,1
1981,476,233,3
1982,914,198,2
1983,415,266,5
