In [1]:
import mysql.connector
import pandas as pd
import pymongo
from sqlalchemy import create_engine, text
import certifi
import json
import os
from sqlalchemy.exc import OperationalError
import numpy as np

In [2]:
# Database connection setup
mysql_args_northwind = {
    "uid": "root",
    "pwd": "password",
    "hostname": "localhost",
    "dbname": "northwind"
}

mysql_args_adventureworks = {
    "uid": "root",
    "pwd": "password",
    "hostname": "localhost",
    "dbname": "adventureworks"
}

mysql_args_adventureworks_dw2 = {
    "uid": "root",
    "pwd": "password",
    "hostname": "localhost",
    "dbname": "adventureworks_dw2"
}

mongodb_args = {
    "user_name": "banerjeeethan",
    "password": "VUHXrKOGS58xzQyE",
    "cluster_name": "Cluster0",
    "cluster_subnet": "38mdy",
    "cluster_location": "atlas",  # "local"
    "db_name": "northwind_purchasing"
}

In [3]:
# Connect to MySQL and fetch data
def get_sql_dataframe(sql_query, **args):
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    with sqlEngine.connect() as connection:
        df = pd.read_sql(text(sql_query), connection)
    return df

In [4]:
# Connect to MongoDB
def get_mongo_client(**args):
    if args["cluster_location"] == "atlas":
        connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
        client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
    else:
        client = pymongo.MongoClient("mongodb://localhost:27017/")
    return client

In [5]:

# Fetch MongoDB data
def get_mongo_dataframe(mongo_client, db_name, collection, query={}):
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    if '_id' in dframe and '_id' in dframe.columns:
        dframe.drop(['_id'], axis=1, inplace=True)
    return dframe

In [6]:
# Load Data into Data Warehouse
def load_dataframe_to_dw(df, table_name, mysql_args_dw2):
    if df.empty:
        print(f"Warning: Empty dataframe for {table_name}, skipping load")
        return
    conn_str = f"mysql+pymysql://{mysql_args_dw2['uid']}:{mysql_args_dw2['pwd']}@{mysql_args_dw2['hostname']}/{mysql_args_dw2['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    with sqlEngine.connect() as connection:
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
    print(f"Loaded {len(df)} rows to {table_name}")

In [7]:


# Extract Data from MySQL
df_aw_customers = get_sql_dataframe("""
    SELECT CustomerID AS customer_id, AccountNumber AS account_number,
           TerritoryID AS territory_id, CustomerType AS customer_type
    FROM customer;
""", **mysql_args_adventureworks)

df_aw_sales = get_sql_dataframe("""
    SELECT SalesOrderID AS sales_order_id, CustomerID AS customer_id,
           OrderDate AS order_date, TotalDue AS total_amount
    FROM salesorderheader;
""", **mysql_args_adventureworks)

df_aw_sales_details = get_sql_dataframe("""
    SELECT SalesOrderID AS sales_order_id, ProductID AS product_id,
           OrderQty AS quantity, UnitPrice AS unit_price, LineTotal AS line_total
    FROM salesorderdetail;
""", **mysql_args_adventureworks)

df_aw_products = get_sql_dataframe("SELECT ProductID AS product_id, Name AS product_name FROM product;", **mysql_args_adventureworks)

df_date_keys = get_sql_dataframe("SELECT date_key, full_date FROM dim_date", **mysql_args_adventureworks_dw2)
df_date_keys['full_date'] = pd.to_datetime(df_date_keys['full_date'])


In [8]:
# Extract Data from MongoDB
client = get_mongo_client(**mongodb_args)
df_nw_suppliers = get_mongo_dataframe(client, mongodb_args["db_name"], "suppliers")
client.close()


In [9]:

# Load Customer Data from JSON
with open("customers.json", "r", encoding="utf-8") as file:
    df_json_customers = pd.DataFrame(json.load(file))

In [10]:

# Prepare Dimension Tables
df_dim_customer_final = pd.merge(df_aw_customers, df_json_customers, left_on='customer_id', right_on='id', how='left')
df_dim_customer_final = df_dim_customer_final[['customer_id', 'account_number', 'territory_id', 'customer_type', 'email', 'first', 'last', 'company', 'created_at']]
df_dim_customer_final.rename(columns={'first': 'first_name', 'last': 'last_name'}, inplace=True)
df_dim_customer_final["created_at"] = pd.to_datetime(df_dim_customer_final["created_at"]).dt.strftime('%Y-%m-%d %H:%M:%S')
df_dim_customer_final.dropna(subset=['customer_id'], inplace=True)

df_dim_product = df_aw_products.copy()
df_dim_product.dropna(subset=['product_id'], inplace=True)

df_dim_supplier = df_nw_suppliers[['id', 'company', 'first_name', 'last_name', 'job_title']].rename(columns={'id': 'supplier_id'})
df_dim_supplier.dropna(subset=['supplier_id'], inplace=True)


In [11]:
# Prepare Fact Transaction Table
df_fact_transaction = pd.merge(df_aw_sales, df_aw_sales_details, on='sales_order_id', how='inner')
df_fact_transaction['order_date'] = pd.to_datetime(df_fact_transaction['order_date'])
df_fact_transaction = pd.merge(df_fact_transaction, df_date_keys, left_on='order_date', right_on='full_date', how='left')
df_fact_transaction.drop(['full_date', 'order_date'], axis=1, inplace=True)
df_fact_transaction['customer_key'] = df_fact_transaction['customer_id']
df_fact_transaction['product_key'] = df_fact_transaction['product_id']
df_fact_transaction['supplier_key'] = None
df_fact_transaction = df_fact_transaction[['sales_order_id', 'date_key', 'customer_key', 'product_key', 'supplier_key', 'quantity', 'unit_price', 'line_total']].rename(columns={'line_total': 'amount'})
df_fact_transaction.dropna(subset=['customer_key', 'product_key', 'date_key'], inplace=True)

In [12]:
# Load tables only when need
load_dataframe_to_dw(df_dim_customer_final, 'dim_customer', mysql_args_adventureworks_dw2)
load_dataframe_to_dw(df_dim_product, 'dim_product', mysql_args_adventureworks_dw2)
load_dataframe_to_dw(df_dim_supplier, 'dim_supplier', mysql_args_adventureworks_dw2)
load_dataframe_to_dw(df_fact_transaction, 'fact_transaction', mysql_args_adventureworks_dw2)

Loaded 19185 rows to dim_customer
Loaded 504 rows to dim_product
Loaded 10 rows to dim_supplier
Loaded 121317 rows to fact_transaction


In [13]:
# Display Data Samples
print("Customer Dimension Table:")
print(df_dim_customer_final.head(), "\n")

print("Product Dimension Table:")
print(df_dim_product.head(), "\n")

print("Supplier Dimension Table:")
print(df_dim_supplier.head(), "\n")

print("Fact Transaction Table:")
print(df_fact_transaction.head(), "\n")

Customer Dimension Table:
   customer_id account_number  territory_id customer_type  \
0            1     AW00000001             1             S   
1            2     AW00000002             1             S   
2            3     AW00000003             4             S   
3            4     AW00000004             4             S   
4            5     AW00000005             4             S   

                    email first_name last_name  \
0  isidro_von@hotmail.com     Torrey      Veum   
1  frederique19@gmail.com      Micah   Sanford   
2       fredy54@gmail.com     Hollis     Swift   
3   braxton29@hotmail.com      Perry   Leffler   
4      turner59@gmail.com    Janelle   Hagenes   

                          company           created_at  
0          Hilll, Mayert and Wolf  2014-12-25 04:06:27  
1                  Stokes-Reichel  2014-07-03 16:08:17  
2  Rodriguez, Cartwright and Kuhn  2014-08-18 06:15:16  
3        Sipes, Feeney and Hansen  2014-07-10 11:31:40  
4             Lesch a

In [14]:
import mysql.connector
import pandas as pd
from sqlalchemy import create_engine, text

# MySQL connection setup
mysql_args_adventureworks_dw2 = {
    "uid": "root",
    "pwd": "password",
    "hostname": "localhost",
    "dbname": "adventureworks_dw2"
}

# Connect to MySQL and fetch data
def get_sql_dataframe(sql_query, **args):
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    df = pd.read_sql(text(sql_query), connection)
    connection.close()
    return df

# Example SQL queries
# 1. Top 5 Customers by Total Sales
top_customers_query = """
SELECT c.first_name, c.last_name, SUM(t.amount) AS total_sales
FROM fact_transaction t
JOIN dim_customer c ON t.customer_key = c.customer_id
GROUP BY t.customer_key, c.first_name, c.last_name
ORDER BY total_sales DESC
LIMIT 5;
"""
df_top_customers = get_sql_dataframe(top_customers_query, **mysql_args_adventureworks_dw2)
print("Top 5 Customers by Total Sales:")
print(df_top_customers)

# 2. Total Sales per Month
monthly_sales_query = """
SELECT DATE_FORMAT(d.full_date, '%Y-%m') AS month, SUM(t.amount) AS total_sales
FROM fact_transaction t
JOIN dim_date d ON t.date_key = d.date_key
GROUP BY month
ORDER BY month;
"""
df_monthly_sales = get_sql_dataframe(monthly_sales_query, **mysql_args_adventureworks_dw2)
print("\nTotal Sales per Month:")
print(df_monthly_sales)

# 3. Average Sales Amount per Customer
avg_sales_per_customer_query = """
SELECT c.first_name, c.last_name, AVG(t.amount) AS average_sale
FROM fact_transaction t
JOIN dim_customer c ON t.customer_key = c.customer_id
GROUP BY t.customer_key, c.first_name, c.last_name;
"""
df_avg_sales_per_customer = get_sql_dataframe(avg_sales_per_customer_query, **mysql_args_adventureworks_dw2)
print("\nAverage Sales Amount per Customer:")
print(df_avg_sales_per_customer)

# 4. Total Quantity of Products Sold
total_product_quantity_query = """
SELECT p.product_name, SUM(t.quantity) AS total_quantity
FROM fact_transaction t
JOIN dim_product p ON t.product_key = p.product_id
GROUP BY t.product_key, p.product_name
ORDER BY total_quantity DESC
LIMIT 10;
"""
df_total_product_quantity = get_sql_dataframe(total_product_quantity_query, **mysql_args_adventureworks_dw2)
print("\nTotal Quantity of Products Sold:")
print(df_total_product_quantity)


Top 5 Customers by Total Sales:
  first_name last_name    total_sales
0     Jessie  McKenzie  877107.192221
1      Vince       Rau  853849.179524
2    Britney    Becker  841908.770707
3       Leda   Hermann  816755.576276
4     Dillon   Schmitt  799277.895062

Total Sales per Month:
      month   total_sales
0   2001-07  9.627167e+05
1   2001-08  2.044600e+06
2   2001-09  1.639840e+06
3   2001-10  1.358050e+06
4   2001-11  2.868129e+06
5   2001-12  2.458472e+06
6   2002-01  1.309863e+06
7   2002-02  2.451606e+06
8   2002-03  2.099416e+06
9   2002-04  1.546592e+06
10  2002-05  2.942673e+06
11  2002-06  1.678567e+06
12  2002-07  2.894055e+06
13  2002-08  4.147192e+06
14  2002-09  3.235826e+06
15  2002-10  2.217544e+06
16  2002-11  3.388911e+06
17  2002-12  2.762527e+06
18  2003-01  1.756407e+06
19  2003-02  2.873937e+06
20  2003-03  2.049530e+06
21  2003-04  2.371678e+06
22  2003-05  3.443525e+06
23  2003-06  2.542672e+06
24  2003-07  3.554092e+06
25  2003-08  5.068342e+06
26  2003-09  5