# Small Database Description

The "classicmodels" database is a vendor of scale vintage cars. This data includes customers, products, orders, order details, payments, employees, offices, and many more. 

# Data Retrieval and Population

Importing the Neccessary Libraries: 

In [1]:
import os 
import json
import numpy
import datetime
import certifi
import pandas as pd
import mysql.connector

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

Connection and Configuration Setup: 

In [2]:
# Declare and assign connection variables for MYSQL and MongoDb servers

mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "car.ekddkan"
atlas_user_name = "daphnepfoser"
atlas_password = "q1Yp5OIGlVDnMsjP"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "car_database"
dst_dbname = "classicmodels"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://daphnepfoser:q1Yp5OIGlVDnMsjP@car.ekddkan.mongodb.net


MYSQL Database Connection Configuration:

In [5]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "classicmodels"
dst_dbname = "classicmodels_dw"

Database Interaction Functions: 

In [34]:
# This chunk defines the functions for interacting with MYSQL and MongoDB databases 
# Functions include quering data, setting data, and managing the collection

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect() # creates a connection
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
 

    return dframe   

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace') # load table 
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});") # further iteration 
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client



def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()


MongoDB Data Population:

In [9]:
# Mongo Configuration, MongoDB collections population, and MongoDB client interaction

mongodb_args = {
    "cluster_location": "atlas",  
    "user_name": "localhost",
    "password": "GoHoosYay",
    "cluster_name": "car",
    "cluster_subnet": "ekddkan",
    "db_name": "classicmodels" 
}

# Populate MongoDB with source data
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {
    "customers": 'car_customers.json',
    "employees": 'car_employees.json',
    "offices": 'car_offices.json',
    "orderdetails": 'car_orderdetails.json',
    "orders": 'car_orders.json',
    "payments": 'car_payments.json',
    "productlines": 'car_productlines.json',
    "products": 'car_products.json'
}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

client = get_mongo_client(**mongodb_args)


MYSQL Data Retrieval Function:

In [10]:
# Retrieve data from the MYSQL Database

def get_sql_dataframe(sql_query):
    # Connection details
    mysql_uid = "root"
    mysql_pwd = "Passw0rd123"
    mysql_hostname = "localhost"
    dst_dbname = "classicmodels"

    # Establish a connection to the database
    connection = mysql.connector.connect(
        user=mysql_uid,
        password=mysql_pwd,
        host=mysql_hostname,
        database=dst_dbname
    )

    
    df = pd.read_sql_query(sql_query, connection)

    
    connection.close()

    return df

# Data Preparation and Integration

## Customer Dimension Table

In [11]:
# Getting the customers collection from MongoDB:

client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)


Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166,71800.0


In [12]:
# Perform any necessary transformations: drop columns and rename columns 

drop_cols=['contactLastName','contactFirstName','phone','addressLine1','addressLine2','salesRepEmployeeNumber','creditLimit']
df_customers.drop(drop_cols, axis=1, inplace=True)

df_customers.rename(columns={'customerNumber':'customer_id'}, inplace=True)
df_customers.rename(columns={'customerName':'customer_name', 'postalCode':'postal_code'}, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_id,customer_name,city,state,postal_code,country
0,103,Atelier graphique,Nantes,,44000,France
1,112,Signal Gift Stores,Las Vegas,NV,83030,USA


In [46]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key

df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))
df_customers.head(2)

Unnamed: 0,customer_key,customer_id,customer_name,city,state,postal_code,country
0,1,103,Atelier graphique,Nantes,,44000,France
1,2,112,Signal Gift Stores,Las Vegas,NV,83030,USA


## Product Dimension Table

In [21]:
# Importing the product csv file: 

df_products = pd.read_csv('C:\\Users\\ds2002-student\\Documents\\Projects\\data\\car_products.csv', sep=';', encoding='latin1')
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


In [22]:
# Perfoming any necessary transformations: drop columns and rename columns 

df_products.drop(['productScale','productVendor','productDescription'], axis=1, inplace=True)
df_products.head(2)

df_products.rename(columns={"productCode": "product_id"}, inplace=True)
df_products.rename(columns={"productName": "product_name", "productLine": "product_line","quantityInStock": "quantity_in_stock", "buyPrice": "buy_price"}, inplace=True)
df_products.head(2)

Unnamed: 0,product_id,product_name,product_line,quantity_in_stock,buy_price,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,7305,98.58,214.3


In [45]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key

df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))
df_products.head(2)

Unnamed: 0,product_key,product_id,product_name,product_line,quantity_in_stock,buy_price,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,7305,98.58,214.3


## Order Fact Table

In [25]:
# Getting data from the first table (orderdetails) involved and also making required transformations

sql_orderdetails = "SELECT * FROM classicmodels.orderdetails;"
df_orderdetails = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orderdetails)

# Droping the orderLineNumber column
drop_cols=['orderLineNumber']
df_orderdetails.drop(drop_cols, axis=1, inplace=True)

# Renaming the ordernumber and some other columns 
df_orderdetails.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_orderdetails.rename(columns={'quantityOrdered':'quantity_ordered', 'priceEach':'price_each','productCode':'product_id'}, inplace=True)
df_orderdetails.head(2)

Unnamed: 0,order_id,product_id,quantity_ordered,price_each
0,10100,S18_1749,30,136.0
1,10100,S18_2248,50,55.09


In [26]:
# Getting data from the second table (order) involved and also making required transformations

sql_orders = "SELECT * FROM classicmodels.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)

# Droping the requiredDate, shippedDate, status, and comments columns 
drop_cols=['requiredDate','shippedDate','status','comments']
df_orders.drop(drop_cols, axis=1, inplace=True)

# Renaming the orderNumber and some other columns 
df_orders.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_orders.rename(columns={'orderDate':'order_date', 'customerNumber':'customer_id'}, inplace=True)
df_orders.head(2)

Unnamed: 0,order_id,order_date,customer_id
0,10100,2003-01-06,363
1,10101,2003-01-09,128


In [28]:
# Create the fact table (combine "order" and "order_details")

df_fact_orders = pd.merge(df_orders, df_orderdetails, on='order_id', how='inner')
df_fact_orders.head(2)

Unnamed: 0,order_id,order_date,customer_id,product_id,quantity_ordered,price_each
0,10100,2003-01-06,363,S18_1749,30,136.0
1,10100,2003-01-06,363,S18_2248,50,55.09


In [29]:
# Looking up the datekeys from the date dimension table 

sql_dim_date = "SELECT date_key, full_date FROM classicmodels.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date

df_dim_date.head(2)

  df = pd.read_sql_query(sql_query, connection)


Unnamed: 0,date_key,full_date
0,20030101,2003-01-01
1,20030102,2003-01-02


In [30]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "order_date" Column.
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "order_date"})

df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='order_date', how='left')
df_fact_orders.drop(['order_date'], axis=1, inplace=True)
df_fact_orders.head(2)


Unnamed: 0,order_id,customer_id,product_id,quantity_ordered,price_each,order_date_key
0,10100,363,S18_1749,30,136.0,20030106
1,10100,363,S18_2248,50,55.09,20030106


In [37]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key

df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)


Unnamed: 0,fact_order_key,order_id,customer_id,product_id,quantity_ordered,price_each,order_date_key
0,1,10100,363,S18_1749,30,136.0,20030106
1,2,10100,363,S18_2248,50,55.09,20030106


## Data Warehouse Creation

In [53]:
# Create a new database: classicmodels_dw 

dst_dbname = 'classicmodels_dw'

conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect() # creates a connection

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

connection.close() 

In [58]:
# Write the DataFrame("df_fact_orders") back to the DataBase

table_name = "df_fact_orders"
primary_key = "fact_order_key"
db_operation = "insert" 

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)


In [56]:
# Write the DataFrame("df_products") back to the DataBase

table_name = "df_products"
primary_key = "product_key"
db_operation = "insert" 

set_dataframe(user_id, pwd, host_name, dst_dbname, df_products, table_name, primary_key, db_operation)


In [57]:
# Write the DataFrame("df_customers") back to the DataBase

table_name = "df_customers"
primary_key = "customer_key"
db_operation = "insert" 

set_dataframe(user_id, pwd, host_name, dst_dbname, df_customers, table_name, primary_key, db_operation)

## Queries ( For your reference again, here are the queries written in SQL Workbench):

In [None]:
# 1. This query outputs the sales per day of the week: 

SELECT 
    d.day_name_of_week,
    SUM(o.quantity_ordered * o.price_each) AS total_sales
FROM 
    df_fact_orders o
JOIN 
    dim_date d ON o.order_date_key = d.full_date
GROUP BY 
    d.day_name_of_week, d.day_of_week
ORDER BY 
    d.day_of_week;

In [None]:
# 2. This query outputs the customer's purchases, including quantity and price 

SELECT 
    c.customer_name,
    COUNT(o.order_id) AS order_count,
    p.product_name,
    o.quantity_ordered,
    o.price_each
FROM 
    df_fact_orders o
JOIN 
    df_customers c ON o.customer_id = c.customer_id
JOIN 
    df_products p ON o.product_id = p.product_id
GROUP BY 
    c.customer_name, p.product_name, o.quantity_ordered, o.price_each;