# Midterm of DS2002

## Derek Sprincis


### Tables Included

- Fact Table -- Order Data
- Dimension Table #1 -- Payments from mySQL
- Dimension Table #2 -- Product data using MongoDB 
- Dimension Table #3 -- Using .csv file from my local Azure instance. I am retrieving a .csv table from my local filesystem.
- SELECT Tables, including on demo'ing AVG


## Steps used to complete this project
1. I created the classicmodels database as provided
2. I exported data from mySQL to be used as the local file (.csv) and the MongoDB data (.json)
3. I extracted, transformed, and loaded data into mySQL from MongoDB and my local file. I modified the tables (such as dropping columns) in each of 3 sources. I consolidated tables in mySQL.
4. I completed part 3, authoring SQL queries, particulary doing an average statement. My focus was to demonstrate SQL query capabilities by investigating the average buy price for different products sold. 

In [1]:
import os
import pandas as pd
from sqlalchemy import create_engine
import mysql.connector
import numpy as np

In [33]:
connection = mysql.connector.connect(
    host_name = "localhost",
    port = "3306",
    user_id = "root",
    pwd = "Passw0rd123!",
    database = "classicmodels"
)

db_name = database

#*************************************************************

def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

Fact Table #1 -- Order Data

In [4]:
df_orders = get_dataframe(user_id="root", pwd="Passw0rd123!", host_name="localhost", db_name="classicmodels", sql_query="SELECT * FROM classicmodels.orders;")
df_orders.head(2)

In [6]:
if 'comments' in df_orders.columns:
    df_orders.drop(columns=['comments'], inplace=True)

# rename id's to keys
df_orders.rename(columns={
    "requiredDate": "promiseDate", 
}, inplace=True)

# insert our fact table key column
if 'fact_payment_key' in df_orders.columns:
    df_orders.head(2)
else:
    df_orders.insert(0, 'fact_payment_key', range(1, df_orders.shape[0]+1))
    df_orders.head(2)

Unnamed: 0,fact_payment_key,orderNumber,orderDate,promiseDate,shippedDate,status,customerNumber
0,1,10100,2003-01-06,2003-01-13/2003,2003-01-10,Shipped,363
1,2,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128


Dimension Table #1 -- Payments from mySQL
Is a date dimension table via 'paymentDate.' 

In [57]:
df_payments = get_dataframe(
    user_id="root", 
    pwd="Passw0rd123!", 
    host_name="localhost", 
    db_name="classicmodels",
    sql_query="SELECT customerNumber, paymentDate FROM classicmodels.payments")
df_payments.head(2)

Unnamed: 0,customerNumber,paymentDate
0,103,2004-10-19
1,103,2003-06-05


In [58]:
df_op = pd.merge(df_payments, df_orders, on="customerNumber", how="left") #Performing a left join

df_op.drop(["orderDate", "shippedDate", "promiseDate"], axis=1, inplace=True)
df_op.head(2)

Unnamed: 0,customerNumber,paymentDate,fact_payment_key,orderNumber,status
0,103,2004-10-19,24,10123,Shipped
1,103,2004-10-19,199,10298,Shipped


In [60]:
# Remove duplicate entries in the DataFrame df_op
df_op.drop_duplicates(subset=['fact_payment_key'], keep='first', inplace=True)

# Insert/load the newly transformed data into MySQL
set_dataframe(
    user_id="root", 
    pwd="Passw0rd123!", 
    host_name="localhost", 
    db_name="classicmodels",
    df=df_op, 
    table_name="paymentproducts_status", 
    pk_column="fact_payment_key", 
    db_operation="insert"
)


Dimension Table #2 -- Product data using MongoDB 

Furthermore, I was getting various minor errors when setting up the MongoDB collection and uploading the data so I used Chat to help smoother out the errors

In [81]:
import os
import json
import numpy as np
import pandas as pd
import datetime
import certifi
import pymongo

# Establishing a connection to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/") 
db = client["sempersupra"]
collection = db["Products"] 

# Open and read the JSON file
with open('p1data_products.json') as f:
    data = json.load(f)

# Inserting the data into the MongoDB collection
collection.insert_many(data)

cursor = collection.find({})
df_mongodb = pd.DataFrame(list(cursor))
client.close()

print("Data inserted successfully into MongoDB.")

# Define connection strings
atlas_cluster_name = "sempersupra"
atlas_user_name = "das3jk"
atlas_password = "Passw0rd123!" 

#MongoDB Atlas URI
conn_str = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"

def get_mongo_dataframe(conn_str, db, collection_name, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(conn_str)
    db = client[db]
    collection = db[collection_name]
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    dframe = pd.DataFrame(list(collection.find(query)))
    client.close()
    return dframe

def transform_data(item):
    transformed_item = {
        "productName": item["productName"],
        "productLine": item["productLine"],
        "productScale": item["productScale"],
        "productVendor": item["productVendor"],
        "productDescription": item["productDescription"],
        "quantityInStock": item.get("quantityInStock", 0),
        "buyPrice": item.get("buyPrice", 0.0),
        "MSRP": item.get("MSRP", 0.0),
    }
    return transformed_item

print(f"Atlas Connection String: {conn_str}")

Data inserted successfully into MongoDB.
Atlas Connection String: mongodb+srv://das3jk:Passw0rd123!@sempersupra.mongodb.net


I just placed the .json file into MongoDB. I will now extract the data from MongoDB

In [109]:
query = {} # Defining query dictionary

# Convert dictionary query to JSON string
query_str = json.dumps(query)

# Extracting customer data frame from MongoDB data source
collection_name = "Products"
df_products = get_mongo_dataframe(
    conn_str=conn_str, 
    db="sempersupra", 
    collection_name=collection_name, 
    query=query
)
df_products.drop(['_id'], axis=1, inplace=True)

df_products.head(2)

Unnamed: 0,_id,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,6604c362c29c40f1ee71b915,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,6604c362c29c40f1ee71b916,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


In [110]:
#Dropping unneccessary categories
df_products.drop(['_id', 'productScale', 'productName', 'productVendor', 'productDescription', 'quantityInStock', 'MSRP'], axis=1, inplace=True)
df_products.head(2)

Unnamed: 0,productCode,productLine,buyPrice
0,S10_1678,Motorcycles,48.81
1,S10_1949,Classic Cars,98.58


Dimension Table #3 -- Using .csv file from my local Azure instance. I am retrieving a .csv table from my local filesystem.

In [111]:
df_csv = pd.read_csv('proj1csv.csv') #reading the file

df_csv.drop(columns=['priceEach'], inplace=True) #drops some categories
# Above I modify the number of columns from each source to the destination; 2.c

df_csv.insert(0, "fact_key", range(1, df_csv.shape[0]+1)) # Defining query dictionary

df_csv.head(2)

Unnamed: 0,fact_key,orderNumber,productCode,quantityOrdered,orderLineNumber
0,1,10100,S18_1749,30,3
1,2,10100,S18_2248,50,2


In [135]:
#Merge the orderdetails with the products table using mySQL

engine = create_engine('sqlite:///:memory:')

# Converting DataFrames to SQL tables
a.to_sql('products_2', engine)
df_csv.to_sql('orderdetails_2', engine)

# Perform the join using a SQL query
query = """
    SELECT *
    FROM products_2
    INNER JOIN orderdetails_2 ON products_2.productCode = orderdetails_2.productCode
"""
result = pd.read_sql_query(query, engine)

result = result.drop('index', axis=1)

result.head(2)

Unnamed: 0,productCode,productLine,buyPrice,fact_key,orderNumber,productCode.1,quantityOrdered,orderLineNumber
0,S10_1678,Motorcycles,48.81,73,10107,S10_1678,30,2
1,S10_1678,Motorcycles,48.81,189,10121,S10_1678,34,5


In [136]:
# load the transformed result table into mySQL as productorderdetails
set_dataframe(
    user_id="root", 
    pwd="Passw0rd123!", 
    host_name="localhost", 
    db_name="classicmodels",
    df=result, 
    table_name="productorderdetails", 
    pk_column="fact_key", 
    db_operation="insert"
)

## Demonstrating Proper Functionality

This SQL query finds the number of times that each customer rented on each day of the week

In [137]:

type1 = """
SELECT
    p.productLine AS 'Product Line',
    o.buyPrice AS 'Buy Price'
FROM
    INNER JOIN classicmodels.orders AS o ON c.customer_id = o.customer_id
    INNER JOIN classicmodels.orderdetails AS od ON o.order_id = od.order_id
    INNER JOIN classicmodels.products AS p ON od.product_id = p.product_id
WHERE
    p.productLine
GROUP BY
    p.productLine,
    o.buyPrice
"""

#type1 = get_dataframe(db_name=src_mysql_dbname, sql_query=sql_customer_weekdays)
type1 = get_dataframe(user_id="root", pwd="Passw0rd123!", host_name="localhost", db_name="classicmodels", sql_query="SELECT * FROM classicmodels.productorderdetails;")
type1.head(10)

Unnamed: 0,index,productLine,buyPrice,fact_key,orderNumber,quantityOrdered,orderLineNumber
0,72,Motorcycles,48.81,73,10107,30,2
1,188,Motorcycles,48.81,189,10121,34,5
2,293,Motorcycles,48.81,294,10134,41,2
3,398,Motorcycles,48.81,399,10145,45,6
4,514,Motorcycles,48.81,515,10159,49,14
5,611,Motorcycles,48.81,612,10168,36,1
6,733,Motorcycles,48.81,734,10180,29,9
7,841,Motorcycles,48.81,842,10188,48,1
8,953,Motorcycles,48.81,954,10201,22,2
9,10,Classic Cars,98.58,11,10103,26,11


In [137]:
#Satisfying req 3B -- Perform some type of aggregation (e.g., SUM, COUNT, AVERAGE).
# I am averaging

type2 = """
SELECT
    p.productLine AS 'productLine',
    AVG(o.buyPrice) AS 'Average buyPrice'
FROM
    INNER JOIN classicmodels.orders AS o ON c.customer_id = o.customer_id
    INNER JOIN classicmodels.orderdetails AS od ON o.order_id = od.order_id
    INNER JOIN classicmodels.products AS p ON od.product_id = p.product_id
GROUP BY
    p.productLine
ORDER BY
    `index`;

"""
type2 = get_dataframe(user_id="root", pwd="Passw0rd123!", host_name="localhost", db_name="classicmodels", sql_query="SELECT * FROM classicmodels.productorderdetails;")
type2.head(7)

Unnamed: 0,productLine,Average buyPrice
0,Classic Cars,64.99
1,Trucks and Buses,56.33
2,Motorcycles,50.69
3,Planes,49.63
4,Ships,48.72
5,Vintage Cars,45.99
6,Trains,43.92
