# DS-2002 Final Project
# Part 1
#### (Download this .ipynb file to see the full thing, GitHub won't display all of it in preview mode!)
#### **1. Overview**
This project demonstrates an ETL (Extract, Transform, Load) pipeline using MySQL, MongoDB, and local file system data sources to populate a dimensional data mart. The goal is to process structured and semi-structured data from multiple sources and store it in a data warehouse for analysis.
This project designs a dimensional data mart for customer order analysis in a retail business. The data mart is structured to support historical sales performance tracking, customer purchasing behavior, and inventory insights.

#### **2. Data Sources**
- **Provided Scripts**
  - AdventureWoks_MySQL(Source Database)
  - AdventureWorks_Queries_MySQL(Views for AdventureWorks)
  - Lab_02c_Create_Populate_Dim_Date(Date)
- **MySQL (AdventureWorks_DW database)**
  - Extracted `fact_sales_orders` (Fact Table)
  - Extracted `dim_customers` (Dimension Table)
  - Extracted `dim_products` (Dimension Table)
  - Extracted `dim_date` (Date Dimension)
- **MongoDB**
  - Inserted `fact_sales_orders`, `dim_customers`, and `dim_products` into MongoDB collections
  - Retrieved and verified the data from MongoDB
- **Local File System**
  - Saved `dim_products.csv` locally for further analysis


### **3. ETL Process**
#### **Step 1: Extract Data**
- Queried MySQL database to retrieve fact and dimension tables.
- Transformed data to align with the data warehouse schema.
- Saved extracted data in JSON format before inserting it into MongoDB.

#### **Step 2: Transform Data**
- Renamed columns to maintain consistency.
- Established primary and foreign key relationships.
- Mapped `order_date_key` using `dim_date` to standardize dates.

#### **Step 3: Load Data**
- Inserted JSON data into MongoDB.
- Exported the `dim_products` dimension as a CSV file.



### **4. Deployment Strategy**
- Connections to MySQL and MongoDB are handled via helper functions (`get_sql_dataframe()`, `get_mongo_dataframe()`, etc.).
- Data is exported in multiple formats for different use cases (JSON for MongoDB, CSV for local storage).
- The process follows **Lab 3 and Lab 4 methodologies**, ensuring compliance with project requirements.


### **5. Verification and Testing**
- **SQL Queries** were used to verify the integrity of foreign key mappings.
- **DataFrames** were displayed after each major step to confirm transformations.
- **MongoDB `count_documents()`** was used to confirm successful insertion.
- **CSV file validation** ensured the local file was saved and accessible.


### 1.0 Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import shutil
import numpy as np
import datetime
import certifi
import time
import pandas as pd

from pymongo import MongoClient
import sqlalchemy
from sqlalchemy import create_engine, text

### Getting Started, initial Scripts to Run
From the GitHub Repository run download and run AdventureWoks_MySQL in MySQL workbench to create the source database that we will be working with. After the database has been populated then from the git download and run AdventureWorks_Queries_MySQL this will populate the source database with more data that we will use later. 

### 1.1 Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [2]:
# MySQL Connection Variables (from Lab 3 & Lab 4 format)
mysql_args = {
    "uid": "root",  # Change if necessary
    "pwd": "Thw9dr",  # Replace with your actual MySQL password
    "hostname": "localhost",
    "port": "3306",
    "src_dbname": "adventureworks",  # Source database (OLTP)
    "dst_dbname": "adventureworks_dw"  # Data warehouse database (OLAP)

}

conn_props = {
    "user": mysql_args["uid"],
    "password": mysql_args["pwd"],
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [3]:
# MongoDB Connection Variables (following Lab 4 format)
mongodb_args = {
    "user_name": "",  # Leave empty for local connection
    "password": "",  # No password needed for local MongoDB
    "cluster_name": "",  # Not needed for local MongoDB
    "cluster_subnet": "",  # Not needed for local MongoDB
    "cluster_location": "local",  # Running locally
    "db_name": "adventureworks_nosql",  # Target MongoDB database
    "collection" : "",
    "null_column_threshold": 0.5
}



### 1.2 Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL source database and return a Pandas DataFrame.'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}:{args['port']}/{args['src_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    # Execute Query and Fetch Data
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    
    return dframe


def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Insert or update data in the MySQL destination database using Pandas DataFrame.'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}:{args['port']}/{args['dst_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate MongoDB connection and return a client instance.'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = MongoClient(connect_str)
            
        elif args["cluster_location"] == "local":
            client = MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query={}):
    '''Query MongoDB, retrieve documents, and return a Pandas DataFrame.'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    
    # Drop MongoDB's default `_id` column
    if "_id" in dframe.columns:
        dframe.drop(["_id"], axis=1, inplace=True)

    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    '''Load JSON data from a directory and insert into MongoDB collections.'''
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)  # Drop existing collection to avoid duplicates
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            file.insert_many(json_object)
        
    #mongo_client.close()


In [5]:
mongo_client = get_mongo_client(**mongodb_args)
df_products = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], "products", {})
mongo_client.close() 


### 1.3 Create adventureworks_dw

In [6]:
# Ensure adventureworks_dw exists before testing connection
engine = create_engine(f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}")
with engine.connect() as connection:
    connection.execute(text("CREATE DATABASE IF NOT EXISTS adventureworks_dw"))
print("Database 'adventureworks_dw' created successfully!")


Database 'adventureworks_dw' created successfully!


### 1.4 Test MySQL and MongoDB Connections

In [7]:
# Test MySQL Connections for Both Databases
try:
    # Test Source Database (OLTP)
    with create_engine(
        f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}/{mysql_args['src_dbname']}"
    ).connect() as connection:
        print(f"Successfully connected to MySQL source database: {mysql_args['src_dbname']}")

    # Test Destination Database (OLAP)
    with create_engine(
        f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}/{mysql_args['dst_dbname']}"
    ).connect() as connection:
        print(f"Successfully connected to MySQL destination database: {mysql_args['dst_dbname']}")

except Exception as e:
    print("Error connecting to MySQL:", e)


Successfully connected to MySQL source database: adventureworks
Successfully connected to MySQL destination database: adventureworks_dw


In [8]:
# Test MongoDB Connection
try:
    client = get_mongo_client(**mongodb_args)
    db = client[mongodb_args["db_name"]]
    db.command("ping")  # Ping MongoDB server
    print(f"Successfully connected to MongoDB database: {mongodb_args['db_name']}")
    
    client.close()  

except Exception as e:
    print("Error connecting to MongoDB:", e)


Successfully connected to MongoDB database: adventureworks_nosql


### 2.0 Create & Populate the Dimension Tables

In [9]:
# Extract customers from the predefined view
sql_customers = "SELECT * FROM adventureworks.dim_customers_vw;"
df_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_customers.head(2)


Unnamed: 0,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,b'\x00',98104,US,United States,North America,Northwest
1,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,b'\x00',98055,US,United States,North America,Northwest


In [10]:
sql_products = "SELECT * FROM adventureworks.dim_products_vw;"
df_products = get_sql_dataframe(sql_products, **mysql_args)
df_products.head(2)



Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01,NaT,
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01,NaT,


### 2.1 Create the Date Dimension Table
At this point, we have to **execute the script from Lab 2c** that creates and populates a **Date Dimension** table.  Be certain to target this script to the new data warehouse database we just created **(adventureworks_dw)** we will need this later.  

In [11]:
# Extract dim_date from adventureworks_dw
sql_dim_date = "SELECT date_key, full_date FROM adventureworks_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)

# Convert `full_date` to datetime (removes timestamp)
df_dim_date['full_date'] = pd.to_datetime(df_dim_date['full_date']).dt.date

# Display the first rows for validation
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


### 2.2 Create & Populate the Dimension Table

In [12]:
sql_fact_sales = "SELECT * FROM adventureworks.fact_sales_orders_vw;"
df_fact_sales = get_sql_dataframe(sql_fact_sales, **mysql_args)
df_fact_sales.head(2)


Unnamed: 0,SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,CreditCardApprovalCode,SubTotal,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal
0,43659,1,2001-07-01,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,105041Vi84182,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746
1,43659,1,2001-07-01,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,105041Vi84182,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373


In [13]:
print(df_customers.columns)

Index(['CustomerID', 'AccountNumber', 'CustomerType', 'AddressType',
       'AddressLine1', 'AddressLine2', 'City', 'StateProvinceCode',
       'State_Province', 'IsOnlyStateProvinceFlag', 'PostalCode',
       'CountryRegionCode', 'Country_Region', 'Sales Territory Group',
       'Sales Territory'],
      dtype='object')


#### Perform Any Necessary Transformations

In [14]:
# Rename CustomerID to serve as the business key
if "CustomerID" in df_customers.columns:
    df_customers.rename(columns={"CustomerID": "customer_id"}, inplace=True)

# Insert a new surrogate key for the Data Warehouse
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0] + 1))

# Verify changes
df_customers.head(2)



Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,b'\x00',98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,b'\x00',98055,US,United States,North America,Northwest


In [15]:
print(df_products.columns)


Index(['ProductID', 'Name', 'ProductNumber', 'MakeFlag', 'FinishedGoodsFlag',
       'Color', 'SafetyStockLevel', 'ReorderPoint', 'StandardCost',
       'ListPrice', 'Size', 'SizeUnitMeasureCode', 'WeightUnitMeasureCode',
       'Weight', 'DaysToManufacture', 'ProductLine', 'Class', 'Style',
       'ProductCategory', 'ProductSubcategory', 'ProductModel',
       'SellStartDate', 'SellEndDate', 'DiscontinuedDate'],
      dtype='object')


In [16]:
# Drop unnecessary columns only if they exist
drop_cols = ['ProductNumber', 'MakeFlag', 'FinishedGoodsFlag', 'Size', 'SizeUnitMeasureCode', 
             'WeightUnitMeasureCode', 'ProductLine', 'Class', 'Style', 'SellEndDate', 'DiscontinuedDate']

df_products.drop(columns=[col for col in drop_cols if col in df_products.columns], inplace=True, errors='ignore')

# Rename ProductID for business key lookups
if "ProductID" in df_products.columns:
    df_products.rename(columns={"ProductID": "product_id"}, inplace=True)

# Insert a new surrogate key for the Data Warehouse
df_products.insert(0, "product_key", range(1, df_products.shape[0] + 1))

# Verify changes
df_products.head(2)


Unnamed: 0,product_key,product_id,Name,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Weight,DaysToManufacture,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,,1000,750,0.0,0.0,,0,,,,1998-06-01
1,2,2,Bearing Ball,,1000,750,0.0,0.0,,0,,,,1998-06-01


#### 2.3 Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables


In [17]:
db_operation = "insert"

tables = [
    ('dim_customers', df_customers, 'customer_key'),
    ('dim_products', df_products, 'product_key')
]

for table_name, dataframe, primary_key in tables:
    set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)


### 2.4 Create & Populate the Dimension Tables

In [18]:
# Extract sales order headers
sql_sales_order_header = "SELECT * FROM adventureworks.SalesOrderHeader;"
df_sales_order_header = get_sql_dataframe(sql_sales_order_header, **mysql_args)
df_sales_order_header.rename(columns={"SalesOrderID": "sales_order_id"}, inplace=True)
df_sales_order_header.head(2)

Unnamed: 0,sales_order_id,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,CreditCardID,CreditCardApprovalCode,CurrencyRateID,SubTotal,TaxAmt,Freight,TotalDue,Comment,rowguid,ModifiedDate
0,43659,1,2001-07-01,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,16281.0,105041Vi84182,,24643.9362,1971.5149,616.0984,27231.5495,,b'!S\xb6y\xca9\x15A\x9c\xba\x8f\xe0\x90>\x12\xe6',2001-07-08
1,43660,1,2001-07-01,2001-07-13,2001-07-08,5,b'\x00',SO43660,PO18850127500,10-4020-000117,...,5618.0,115213Vi29411,,1553.1035,124.2483,38.8276,1716.1794,,"b'-\xc4\x8ds;\xd0\xa1H\x98""\xf9Zg\xeas\x89'",2001-07-08


In [19]:
# Extract sales order details
sql_sales_order_detail = "SELECT * FROM adventureworks.SalesOrderDetail;"
df_sales_order_detail = get_sql_dataframe(sql_sales_order_detail, **mysql_args)
df_sales_order_detail.rename(columns={"SalesOrderID": "sales_order_id", "SalesOrderDetailID": "sales_order_detail_id"}, inplace=True)
df_sales_order_detail.head(2)

Unnamed: 0,sales_order_id,sales_order_detail_id,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
0,43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,"b'm\xc9\x07\xb2\xe6\xd9+@\x84p,\xc1v\xc4""\x83'",2001-07-01
1,43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,"b'\r`\xbbzw\x1e\xbeA\x9f\xe5\xb9\x14,\xfc\x08\...",2001-07-01


In [20]:
print(df_fact_sales.columns)

Index(['SalesOrderID', 'RevisionNumber', 'OrderDate', 'DueDate', 'ShipDate',
       'Status', 'OnlineOrderFlag', 'SalesOrderNumber', 'PurchaseOrderNumber',
       'AccountNumber', 'CustomerID', 'ContactID', 'SalesPersonID',
       'Sales Territory Group', 'Sales Territory', 'BillToAddressID',
       'ShipToAddressID', 'ShipMethod', 'ShipBase', 'ShipRate',
       'Credit Card Type', 'Credit Card Number', 'Credit Card ExpMonth',
       'Credit Card ExpYear', 'CreditCardApprovalCode', 'SubTotal', 'TaxAmt',
       'Freight', 'TotalDue', 'CarrierTrackingNumber', 'OrderQty', 'ProductID',
       'UnitPrice', 'LineTotal'],
      dtype='object')


### 3.0 Transforming Sales Data

Merges sales order tables, drops unnecessary columns, standardizes keys, adds foreign keys from dimension tables, and refines the final dataset while retaining essential date fields.


In [21]:
# Merge Sales Order Header with Sales Order Detail
df_fact_sales = df_sales_order_detail.merge(df_sales_order_header, on="sales_order_id", how="inner")

# Drop irrelevant columns (DO NOT drop ShipDate or DueDate)
drop_cols = ['rowguid', 'ModifiedDate', 'OnlineOrderFlag']
df_fact_sales.drop(columns=[col for col in drop_cols if col in df_fact_sales.columns], inplace=True, errors='ignore')

# Rename keys for consistency
df_fact_sales.rename(columns={"CustomerID": "customer_id", "ProductID": "product_id"}, inplace=True)

# Add foreign keys by merging with dimension tables
df_fact_sales = df_fact_sales.merge(df_customers[['customer_id', 'customer_key']], on="customer_id", how="left")
df_fact_sales = df_fact_sales.merge(df_products[['product_id', 'product_key']], on="product_id", how="left")

# Drop original business keys (we only need the foreign keys)
df_fact_sales.drop(columns=["customer_id", "product_id"], inplace=True)

# KEEP ShipDate and DueDate in the final selection!
df_fact_sales = df_fact_sales[['sales_order_id', 'sales_order_detail_id', 'customer_key',
                               'product_key', 'OrderDate', 'ShipDate', 'DueDate', 'TotalDue']]

# Verify transformations
df_fact_sales.head(2)


Unnamed: 0,sales_order_id,sales_order_detail_id,customer_key,product_key,OrderDate,ShipDate,DueDate,TotalDue
0,43659,1,687,281,2001-07-01,2001-07-08,2001-07-13,27231.5495
1,43659,2,687,282,2001-07-01,2001-07-08,2001-07-13,27231.5495


In [22]:
print(df_fact_sales.columns)


Index(['sales_order_id', 'sales_order_detail_id', 'customer_key',
       'product_key', 'OrderDate', 'ShipDate', 'DueDate', 'TotalDue'],
      dtype='object')


## 3.1 Assigning Date Keys

Replaces business date fields (`OrderDate`, `ShipDate`, `DueDate`) with surrogate primary keys from the date dimension table for efficient lookups and joins.


In [23]:
# Lookup the Surrogate Primary Key (date_key) for OrderDate
df_dim_order_date = df_dim_date.rename(columns={"date_key": "order_date_key", "full_date": "OrderDate"})
df_fact_sales['OrderDate'] = pd.to_datetime(df_fact_sales['OrderDate']).dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_order_date, on='OrderDate', how='left')
df_fact_sales.drop(['OrderDate'], axis=1, inplace=True)

# Lookup the Surrogate Primary Key (date_key) for ShipDate (not ShippedDate!)
df_dim_shipped_date = df_dim_date.rename(columns={"date_key": "shipped_date_key", "full_date": "ShipDate"})  
df_fact_sales['ShipDate'] = pd.to_datetime(df_fact_sales['ShipDate']).dt.date  # Use 'ShipDate' here!
df_fact_sales = pd.merge(df_fact_sales, df_dim_shipped_date, on='ShipDate', how='left')
df_fact_sales.drop(['ShipDate'], axis=1, inplace=True)

# Lookup the Surrogate Primary Key (date_key) for DueDate
df_dim_due_date = df_dim_date.rename(columns={"date_key": "due_date_key", "full_date": "DueDate"})
df_fact_sales['DueDate'] = pd.to_datetime(df_fact_sales['DueDate']).dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_due_date, on='DueDate', how='left')
df_fact_sales.drop(['DueDate'], axis=1, inplace=True)

# Verify final structure
df_fact_sales.head(2)


Unnamed: 0,sales_order_id,sales_order_detail_id,customer_key,product_key,TotalDue,order_date_key,shipped_date_key,due_date_key
0,43659,1,687,281,27231.5495,20010701,20010708,20010713
1,43659,2,687,282,27231.5495,20010701,20010708,20010713


In [24]:
df_fact_sales.insert(0, "fact_sales_order_key", range(1, len(df_fact_sales) + 1))


In [25]:
df_fact_sales.head(5)


Unnamed: 0,fact_sales_order_key,sales_order_id,sales_order_detail_id,customer_key,product_key,TotalDue,order_date_key,shipped_date_key,due_date_key
0,1,43659,1,687,281,27231.5495,20010701,20010708,20010713
1,2,43659,2,687,282,27231.5495,20010701,20010708,20010713
2,3,43659,3,687,283,27231.5495,20010701,20010708,20010713
3,4,43659,4,687,276,27231.5495,20010701,20010708,20010713
4,5,43659,5,687,277,27231.5495,20010701,20010708,20010713


In [26]:
set_dataframe(df_fact_sales, "fact_sales_orders", "fact_sales_order_key", "insert", **mysql_args)


### 4.0 Data Validation and Sales Summary

Checks the first 10 rows of `fact_sales_orders`, verifies foreign key integrity by ensuring all `customer_key` and `product_key` values exist in their respective dimension tables, and aggregates total revenue from sales data.


In [27]:
# Check the First 10 Rows in fact_sales_orders
sql_check = "SELECT * FROM adventureworks_dw.fact_sales_orders LIMIT 10;"
df_check = get_sql_dataframe(sql_check, **mysql_args)
display(df_check)

# Verify Foreign Key Mappings in fact_sales_orders
sql_check_fk = """
SELECT f.sales_order_id, 
       f.customer_key AS fact_customer_key, 
       f.product_key AS fact_product_key, 
       c.customer_key AS dim_customer_key, 
       p.product_key AS dim_product_key
FROM adventureworks_dw.fact_sales_orders f
LEFT JOIN adventureworks_dw.dim_customers c ON f.customer_key = c.customer_key
LEFT JOIN adventureworks_dw.dim_products p ON f.product_key = p.product_key
WHERE c.customer_key IS NULL OR p.product_key IS NULL
LIMIT 10;
"""
df_fk_check = get_sql_dataframe(sql_check_fk, **mysql_args)
display(df_fk_check)

# Aggregate Total Sales from fact_sales_orders
sql_total_sales = "SELECT SUM(TotalDue) AS total_revenue FROM adventureworks_dw.fact_sales_orders;"
df_sales_summary = get_sql_dataframe(sql_total_sales, **mysql_args)
display(df_sales_summary)


Unnamed: 0,fact_sales_order_key,sales_order_id,sales_order_detail_id,customer_key,product_key,TotalDue,order_date_key,shipped_date_key,due_date_key
0,1,43659,1,687,281,27231.5495,20010701,20010708,20010713
1,2,43659,2,687,282,27231.5495,20010701,20010708,20010713
2,3,43659,3,687,283,27231.5495,20010701,20010708,20010713
3,4,43659,4,687,276,27231.5495,20010701,20010708,20010713
4,5,43659,5,687,277,27231.5495,20010701,20010708,20010713
5,6,43659,6,687,278,27231.5495,20010701,20010708,20010713
6,7,43659,7,687,279,27231.5495,20010701,20010708,20010713
7,8,43659,8,687,219,27231.5495,20010701,20010708,20010713
8,9,43659,9,687,221,27231.5495,20010701,20010708,20010713
9,10,43659,10,687,214,27231.5495,20010701,20010708,20010713


Unnamed: 0,sales_order_id,fact_customer_key,fact_product_key,dim_customer_key,dim_product_key


Unnamed: 0,total_revenue
0,3568775000.0


### 4.1 Foreign Key Integrity and Sales Verification

Ensures all `customer_key` and `product_key` values in `fact_sales_orders` exist in their respective dimension tables and verifies total sales and order count for data consistency.


In [28]:
# Check if all customer_key values in fact_sales_orders exist in dim_customers
sql_check_customers = """
SELECT f.customer_key
FROM adventureworks_dw.fact_sales_orders f
LEFT JOIN adventureworks_dw.dim_customers d ON f.customer_key = d.customer_key
WHERE d.customer_key IS NULL;
"""

df_missing_customers = get_sql_dataframe(sql_check_customers, **mysql_args)
display(df_missing_customers)

# Check if all product_key values in fact_sales_orders exist in dim_products
sql_check_products = """
SELECT f.product_key
FROM adventureworks_dw.fact_sales_orders f
LEFT JOIN adventureworks_dw.dim_products d ON f.product_key = d.product_key
WHERE d.product_key IS NULL;
"""

df_missing_products = get_sql_dataframe(sql_check_products, **mysql_args)
display(df_missing_products)

# Verify total sales amount in fact_sales_orders
sql_total_sales = """
SELECT SUM(TotalDue) AS total_sales, COUNT(*) AS total_orders
FROM adventureworks_dw.fact_sales_orders;
"""

df_total_sales = get_sql_dataframe(sql_total_sales, **mysql_args)
display(df_total_sales)


Unnamed: 0,customer_key


Unnamed: 0,product_key


Unnamed: 0,total_sales,total_orders
0,3568775000.0,122579


## 4.2 Top Spending Customers

Identifies the top 10 customers by total spending in `fact_sales_orders`, grouping by `customer_key` and ordering by total amount spent in descending order.

In [29]:
sql_top_customers = """
SELECT customer_key, SUM(TotalDue) AS total_spent
FROM adventureworks_dw.fact_sales_orders
GROUP BY customer_key
ORDER BY total_spent DESC
LIMIT 10;
"""
df_top_customers = get_sql_dataframe(sql_top_customers, **mysql_args)
display(df_top_customers)


Unnamed: 0,customer_key,total_spent
0,689,50812250.0
1,649,46351580.0
2,523,46083590.0
3,27,45699510.0
4,505,45447880.0
5,75,45421580.0
6,179,44536840.0
7,555,42905550.0
8,336,42756520.0
9,159,40663890.0


## 4.3 Top Selling Products

Retrieves the top 10 products by total revenue, aggregating order count and sales amount from `fact_sales_orders`, grouped by `product_key` and sorted in descending order of revenue.

In [30]:
sql_top_products = """
SELECT product_key, COUNT(*) AS total_orders, SUM(TotalDue) AS total_revenue
FROM adventureworks_dw.fact_sales_orders
GROUP BY product_key
ORDER BY total_revenue DESC
LIMIT 10;
"""
df_top_products = get_sql_dataframe(sql_top_products, **mysql_args)
display(df_top_products)


Unnamed: 0,product_key,total_orders,total_revenue
0,217,3408,60593360.0
1,220,1659,58276220.0
2,216,3112,56083670.0
3,213,3032,54860840.0
4,212,3107,52886510.0
5,219,1234,48377230.0
6,221,1092,42882140.0
7,364,1096,36577200.0
8,243,612,33010640.0
9,298,717,31728900.0


## 4.4 Monthly Sales Analysis

Aggregates total sales per month by joining `fact_sales_orders` with `dim_date`, grouping by year and month, and ordering chronologically to analyze sales trends over time.

In [31]:
sql_monthly_sales = """
SELECT d.calendar_year, d.month_name, d.month_of_year, SUM(f.TotalDue) AS total_sales
FROM adventureworks_dw.fact_sales_orders f
JOIN adventureworks_dw.dim_date d ON f.order_date_key = d.date_key
GROUP BY d.calendar_year, d.month_name, d.month_of_year
ORDER BY d.calendar_year, d.month_of_year;
"""
df_monthly_sales = get_sql_dataframe(sql_monthly_sales, **mysql_args)
display(df_monthly_sales)


Unnamed: 0,calendar_year,month_name,month_of_year,total_sales
0,2001,July,7,10045560.0
1,2001,August,8,37574840.0
2,2001,September,9,24625160.0
3,2001,October,10,20796210.0
4,2001,November,11,64270960.0
5,2001,December,12,44729050.0
6,2002,January,1,12825170.0
7,2002,February,2,42435290.0
8,2002,March,3,31202800.0
9,2002,April,4,21099560.0


### 5.0 MongoDB Section

In [32]:
# Connect to MongoDB (Local Instance)
mongo_client = get_mongo_client(**mongodb_args)

# Use the specified database name
mongo_db_name = mongodb_args["db_name"]  # "adventureworks_nosql"


## 5.1 Extracting and Exporting Data to JSON

Retrieves `fact_sales_orders`, `dim_customers`, and `dim_products` from MySQL, saves them as JSON files in a `data` directory, ensuring structured data storage for MongoDB or further processing.


In [33]:
# Define MongoDB collection names
json_files = {
    "fact_sales_orders": "fact_sales_orders.json",
    "dim_customers": "dim_customers.json",
    "dim_products": "dim_products.json"  
}

# Extract fact_sales_orders from MySQL
sql_fact_sales = "SELECT * FROM adventureworks_dw.fact_sales_orders;"
df_fact_sales = get_sql_dataframe(sql_fact_sales, **mysql_args)

# Extract dim_customers from MySQL
sql_dim_customers = "SELECT * FROM adventureworks_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)

# Extract dim_products from MySQL (🔹 Added New Dimension)
sql_dim_products = "SELECT * FROM adventureworks_dw.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)

# Ensure 'data' directory exists
data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

# Define JSON file paths
fact_sales_path = os.path.join(data_dir, json_files["fact_sales_orders"])
dim_customers_path = os.path.join(data_dir, json_files["dim_customers"])
dim_products_path = os.path.join(data_dir, json_files["dim_products"])

# Save DataFrames as JSON Files
df_fact_sales.to_json(fact_sales_path, orient="records", indent=4)
df_dim_customers.to_json(dim_customers_path, orient="records", indent=4)
df_dim_products.to_json(dim_products_path, orient="records", indent=4)  

print(f"JSON Export Complete: {fact_sales_path} & {dim_customers_path} & {dim_products_path}")


JSON Export Complete: C:\Users\student.DESKTOP-0PHPGHC\Downloads\data\fact_sales_orders.json & C:\Users\student.DESKTOP-0PHPGHC\Downloads\data\dim_customers.json & C:\Users\student.DESKTOP-0PHPGHC\Downloads\data\dim_products.json


## 5.2 Loading Data into MongoDB

Inserts JSON files into MongoDB collections and verifies successful insertion by counting records in `fact_sales_orders`, `dim_customers`, and `dim_products`.

In [34]:
# Insert JSON files into MongoDB
set_mongo_collections(mongo_client, mongo_db_name, data_dir, json_files)

# Confirm Insertion
fact_sales_count = mongo_client[mongo_db_name]["fact_sales_orders"].count_documents({})
dim_customers_count = mongo_client[mongo_db_name]["dim_customers"].count_documents({})
dim_products_count = mongo_client[mongo_db_name]["dim_products"].count_documents({})  

print(f" Inserted {fact_sales_count} sales records, {dim_customers_count} customer records, and {dim_products_count} product records into MongoDB.")

 Inserted 122579 sales records, 19220 customer records, and 504 product records into MongoDB.


## 5.3 Retrieving Sample Data from MongoDB

Fetches sample records from `fact_sales_orders`, `dim_customers`, and `dim_products` collections in MongoDB and displays the first five records for verification.


In [35]:
# Retrieve a sample from MongoDB
df_mongo_sales = get_mongo_dataframe(mongo_client, mongo_db_name, "fact_sales_orders", {})
df_mongo_customers = get_mongo_dataframe(mongo_client, mongo_db_name, "dim_customers", {})
df_mongo_products = get_mongo_dataframe(mongo_client, mongo_db_name, "dim_products", {})  

# Display the first 5 records
display(df_mongo_sales.head(), df_mongo_customers.head(), df_mongo_products.head())


Unnamed: 0,fact_sales_order_key,sales_order_id,sales_order_detail_id,customer_key,product_key,TotalDue,order_date_key,shipped_date_key,due_date_key
0,1,43659,1,687,281,27231.5495,20010701,20010708,20010713
1,2,43659,2,687,282,27231.5495,20010701,20010708,20010713
2,3,43659,3,687,283,27231.5495,20010701,20010708,20010713
3,4,43659,4,687,276,27231.5495,20010701,20010708,20010713
4,5,43659,5,687,277,27231.5495,20010701,20010708,20010713


Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,�,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,�,98055,US,United States,North America,Northwest
2,3,2,AW00000002,S,Main Office,3207 S Grady Way,,Renton,WA,Washington,�,98055,US,United States,North America,Northwest
3,4,3,AW00000003,S,Main Office,12345 Sterling Avenue,,Irving,TX,Texas,�,75061,US,United States,North America,Southwest
4,5,4,AW00000004,S,Main Office,800 Interchange Blvd.,Suite 2501,Austin,TX,Texas,�,78701,US,United States,North America,Southwest


Unnamed: 0,product_key,product_id,Name,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Weight,DaysToManufacture,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,,1000,750,0.0,0.0,,0,,,,896659200000
1,2,2,Bearing Ball,,1000,750,0.0,0.0,,0,,,,896659200000
2,3,3,BB Ball Bearing,,800,600,0.0,0.0,,1,,,,896659200000
3,4,4,Headset Ball Bearings,,800,600,0.0,0.0,,0,,,,896659200000
4,5,316,Blade,,800,600,0.0,0.0,,1,,,,896659200000


### 5.4 Verification of Correctness

In [36]:
# Connect to MongoDB
mongo_client = get_mongo_client(**mongodb_args)

# Extract data from "fact_sales_orders"
collection = "fact_sales_orders"
query = {}  # Get all documents
df_fact_sales_mongo = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], collection, query)
print(f" Fact Sales Orders Count: {len(df_fact_sales_mongo)}") 
df_fact_sales_mongo.head(2)  # Verify

 Fact Sales Orders Count: 122579


Unnamed: 0,fact_sales_order_key,sales_order_id,sales_order_detail_id,customer_key,product_key,TotalDue,order_date_key,shipped_date_key,due_date_key
0,1,43659,1,687,281,27231.5495,20010701,20010708,20010713
1,2,43659,2,687,282,27231.5495,20010701,20010708,20010713


In [37]:
# Extract data from "dim_customers"
mongo_client = get_mongo_client(**mongodb_args)

collection = "dim_customers"
query = {}  # Get all documents
df_dim_customers_mongo = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], collection, query)
print(f" Customers Count: {len(df_dim_customers_mongo)}")  
df_dim_customers_mongo.head(2)  # Verify


 Customers Count: 19220


Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,�,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,�,98055,US,United States,North America,Northwest


In [38]:
# Extract data from "dim_products" 
collection = "dim_products"
query = {}  # Get all documents
df_dim_products_mongo = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], collection, query)
print(f" Products Count: {len(df_dim_products_mongo)}")  
df_dim_products_mongo.head(2)  # Verify

 Products Count: 504


Unnamed: 0,product_key,product_id,Name,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Weight,DaysToManufacture,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,,1000,750,0.0,0.0,,0,,,,896659200000
1,2,2,Bearing Ball,,1000,750,0.0,0.0,,0,,,,896659200000


## 5.5 Exporting Customers Data to CSV

Extracts `dim_customers` from MySQL, saves it as a CSV file in the `data` directory, and verifies successful file creation.


In [39]:
import re

# Ensure 'data' directory exists
os.makedirs(data_dir, exist_ok=True)

# Fixed, idempotent file name
csv_file_path = os.path.join(data_dir, "dim_customers.csv")

# Extract dim_customers from MySQL
sql_dim_customers = "SELECT * FROM adventureworks_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)

# Save as CSV — overwrites every time
df_dim_customers.to_csv(csv_file_path, index=False)

print(f"✅ CSV Export Complete (idempotent): {csv_file_path}")


✅ CSV Export Complete (idempotent): C:\Users\student.DESKTOP-0PHPGHC\Downloads\data\dim_customers.csv


In [40]:
import os

# Verify if the file was saved correctly
if os.path.exists(csv_file_path):
    print(f" CSV File Found: {csv_file_path}")
else:
    print("CSV File Not Found. Check the directory and try again.")


 CSV File Found: C:\Users\student.DESKTOP-0PHPGHC\Downloads\data\dim_customers.csv


# Part 2
# Project Overview: Data Lakehouse Implementation with Spark Streaming

This solution demonstrates a complete implementation of the **Databricks Lakehouse architecture** (Bronze → Silver → Gold) using Apache Spark. It processes **streaming fact data** alongside **batch reference data**, integrating them into a unified analytical pipeline that supports incremental data loading and real-time aggregation.


## Functional Requirements Coverage

1. **Batch Execution with Incremental Load**  
   The reference dimension tables (`dim_customers`, `dim_products`, and `dim_date`) were loaded via batch from CSV and SQL sources, representing **static data** used to enrich the fact table during Silver layer transformations.

2. **Streaming Real-Time Fact Data**  
   The `fact_sales_orders` table was split into **three separate JSON files**, simulating incremental real-time data from a streaming source. Spark AutoLoader and `.readStream()` were used to ingest this data as a **structured streaming source**, enabling mini-batch processing.

3. **Bronze Layer**  
   The streaming JSON files were written to the **Bronze layer** in Parquet format, appending metadata such as `receipt_time` and `source_file` for traceability.

4. **Silver Layer**  
   The Bronze data was enriched by **joining with reference dimension tables** using appropriate foreign keys and date transformations. The resulting Silver table reflects **business-ready transactional data**, including fields like `order_full_date`, `product_key`, and `customer_key`.

5. **Gold Layer**  
   Aggregated tables were built from the Silver layer:
   Focused on **products**, grouping by `ProductCategory` and `month_name`.

6. **Execution in a Databricks Notebook**  
   All operations — including data ingestion, transformation, streaming queries, and output generation — were performed in a single Databricks notebook using PySpark.

## Section I: Prerequisites

## 1.0 Import and Setup

In [41]:
import os

# Replace this with the correct path if your Java 21 is installed somewhere else
os.environ["JAVA_HOME"] = "C:\\Java\\jdk-21"


In [42]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

### 2.0. Instantiate Global Variables

In [43]:
base_dir = os.path.join(os.getcwd(), 'data')
data_dir = os.path.join(base_dir, 'adventureworks')
stream_dir = os.path.join(data_dir, 'streaming')

orders_stream_dir = os.path.join(stream_dir, 'fact_sales_orders') 
#batch_dir = orders_stream_dir
batch_dir = os.path.join(data_dir, 'batch')

# Output paths for bronze/silver/gold
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
database_dir = os.path.join(sql_warehouse_dir, f"{dest_database}.db")

sales_output_bronze = os.path.join(database_dir, 'fact_sales_orders', 'bronze')
sales_output_silver = os.path.join(database_dir, 'fact_sales_orders', 'silver')
sales_output_gold = os.path.join(database_dir, 'fact_sales_orders', 'gold')


### 3.0. Define Global Functions

In [44]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)

def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")

def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"


def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    
def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

def get_mysql_dataframe(spark_session, sql_query: str, db="src", **args):
    '''
    Query MySQL and return a PySpark DataFrame.
    Set `db="src"` to use the OLTP (adventureworks) database,
    or `db="dst"` to use your OLAP-style database (adventureworks_dw).
    '''
    db_name = args['src_dbname'] if db == "src" else args['dst_dbname']
    
    jdbc_url = f"jdbc:mysql://{args['hostname']}:{args['port']}/{db_name}"
    
    dframe = spark_session.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", conn_props['driver']) \
        .option("user", conn_props['user']) \
        .option("password", conn_props['password']) \
        .option("query", sql_query) \
        .load()
    
    return dframe


### 4.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [45]:
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\student.DESKTOP-0PHPGHC\\Downloads\\spark-warehouse\\adventureworks_dlh.db' has been removed successfully."

### 5.0. Create a New Spark Session

In [46]:
from pyspark import SparkConf

worker_threads = f"local[{int(os.cpu_count()/2)}]"

# Required JARs
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0.jar")

print("JAR path exists:", os.path.exists(mysql_spark_jar))
print("JAR path:", mysql_spark_jar)

jars = [mysql_spark_jar]

# Build Spark Configuration
sparkConf_args = get_spark_conf_args(jars, **mongodb_args)
sparkConf = get_spark_conf(**sparkConf_args)

# Create Spark Session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")

spark  # show SparkSession info

JAR path exists: True
JAR path: C:\Users\student.DESKTOP-0PHPGHC\Downloads\mysql-connector-j-9.1.0.jar


### 6.0. Create a New Metadata Database.

In [47]:
# Drop the database if it exists (clean slate)
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE")

# Create the Delta Lakehouse database
sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Final Project: AdventureWorks Data Lakehouse'
    WITH DBPROPERTIES (
        contains_pii = false,
        project_phase = 'Final',
        source = 'adventureworks'
    )
"""
spark.sql(sql_create_db)


DataFrame[]

## 7.0 Exporting Streaming Batches  
This code pulls sales order data from MySQL, splits it into 3, and exports them as JSON files for use in streaming.



In [48]:
import glob
# SQL query: only necessary fields for dimensional joins + metrics
sales_query = """
SELECT 
    fact_sales_order_key,
    sales_order_id,
    customer_key,
    product_key,
    TotalDue,
    order_date_key
FROM fact_sales_orders
WHERE order_date_key BETWEEN 20010101 AND 20011231;
"""

# Build SQLAlchemy connection to your MySQL DB
engine = create_engine(
    f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}/{mysql_args['dst_dbname']}"
)

# Load query results into a DataFrame
df = pd.read_sql(sales_query, engine)

# Set up the directory to export streaming JSON files
stream_dir = os.path.join(os.getcwd(), "data", "adventureworks", "streaming", "fact_sales_orders")
os.makedirs(stream_dir, exist_ok=True)

# Remove old batch files, if any
for old_file in glob.glob(os.path.join(stream_dir, "fact_sales_part*.json")):
    os.remove(old_file)

# Split into 3 JSON files ("mini-batches" for streaming)
batches = np.array_split(df, 3)
for i, batch in enumerate(batches, start=1):
    filename = f"fact_sales_part{i}.json"
    path = os.path.join(stream_dir, filename)
    batch.to_json(path, orient="records", lines=False, date_format="iso")
    print(f"✅ Exported: {filename}")

✅ Exported: fact_sales_part1.json
✅ Exported: fact_sales_part2.json
✅ Exported: fact_sales_part3.json


  return bound(*args, **kwds)


## 8.0 Folder Set-up

In [49]:
# Set your base path
base_path = os.path.expanduser("~/Downloads/data")
source_dir = base_path
target_dir = os.path.join(base_path, "adventureworks", "batch")

# Create the target directory if it doesn't exist
os.makedirs(target_dir, exist_ok=True)

# List of files to move
files_to_move = [
    "dim_customers.csv",
    "dim_customers.json",
    "dim_products.json"
]

# Move the files
for filename in files_to_move:
    src = os.path.join(source_dir, filename)
    dst = os.path.join(target_dir, filename)
    if os.path.exists(src):
        shutil.move(src, dst)
        print(f"✅ Moved: {filename}")
    else:
        print(f"⚠️ File not found: {filename}")

✅ Moved: dim_customers.csv
✅ Moved: dim_customers.json
✅ Moved: dim_products.json


## Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
### 1.0. Fetch Data from the File System
#### 1.1. Verify the location of the source data files on the file system

In [50]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,dim_customers.csv,2279580,2025-05-07 19:57:55.084141254
1,dim_customers.json,11043443,2025-05-07 19:57:43.349634409
2,dim_products.json,225575,2025-05-07 19:57:43.359987736


#### 1.2. Populate the <span style="color:darkred">Customers Dimension</span>
##### 1.2.1. Use PySpark to Read data from a CSV file

In [51]:
customer_csv = os.path.join(batch_dir, 'dim_customers.csv')
df_dim_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)
df_dim_customers.toPandas().head(2)


Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,�,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,�,98055,US,United States,North America,Northwest


##### 1.2.2. Make Necessary Transformations to the New DataFrame

In [52]:
# Load your CSV
customer_csv = os.path.join(batch_dir, 'dim_customers.csv')
df_dim_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customer_csv)

# Just reorder columns (customer_key already exists and is valid)
ordered_columns = [
    'customer_key', 'customer_id', 'AccountNumber', 'CustomerType',
    'AddressType', 'AddressLine1', 'AddressLine2', 'City',
    'StateProvinceCode', 'State_Province', 'IsOnlyStateProvinceFlag',
    'PostalCode', 'CountryRegionCode', 'Country_Region',
    'Sales Territory Group', 'Sales Territory'
]

df_dim_customers = df_dim_customers.select(ordered_columns)

# Preview
df_dim_customers.toPandas().head(2)



Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,�,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,�,98055,US,United States,North America,Northwest


##### 1.2.3. Save as the <span style="color:darkred">dim_customers</span> table in the Data Lakehouse

In [53]:
df_dim_customers.write.mode("overwrite").saveAsTable(f"{dest_database}.dim_customers")


##### 1.2.4. Unit Test: Describe and Preview Table

In [54]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers").show(truncate=False)
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()


+----------------------------+------------------+-------+
|col_name                    |data_type         |comment|
+----------------------------+------------------+-------+
|customer_key                |int               |NULL   |
|customer_id                 |int               |NULL   |
|AccountNumber               |string            |NULL   |
|CustomerType                |string            |NULL   |
|AddressType                 |string            |NULL   |
|AddressLine1                |string            |NULL   |
|AddressLine2                |string            |NULL   |
|City                        |string            |NULL   |
|StateProvinceCode           |string            |NULL   |
|State_Province              |string            |NULL   |
|IsOnlyStateProvinceFlag     |string            |NULL   |
|PostalCode                  |string            |NULL   |
|CountryRegionCode           |string            |NULL   |
|Country_Region              |string            |NULL   |
|Sales Territo

Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,�,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,�,98055,US,United States,North America,Northwest


### 2.0. Fetch Reference Data from a MySQL Database
#### 2.1. Populate the <span style="color:darkred">Date Dimension</span>
##### 2.1.1 Fetch data from the <span style="color:darkred">dim_date</span> table in MySQL

In [55]:
# Define query to pull from MySQL
sql_dim_date = f"SELECT * FROM {mysql_args['dst_dbname']}.dim_date"

# Run the query using your get_sql_dataframe helper (or pd.read_sql if needed)
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)


##### 2.1.2. Save as the <span style="color:darkred">dim_date</span> table in the Data Lakehouse

In [56]:
df_dim_date.write.mode("overwrite").saveAsTable(f"{dest_database}.dim_date")

##### 2.1.3. Unit Test: Describe and Preview Table

In [57]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### 3.0. Populate the <span style="color:darkred">Product Dimension</span>
##### 3.1. Fetch data from the <span style="color:darkred">Products</span> table in MySQL

In [58]:
mongodb_args["collection"] = "dim_products"

df_dim_products = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_products.toPandas().head(2)


Unnamed: 0,Color,DaysToManufacture,ListPrice,Name,ProductCategory,ProductModel,ProductSubcategory,ReorderPoint,SafetyStockLevel,SellStartDate,StandardCost,product_id,product_key
0,,0,0.0,Adjustable Race,,,,750,1000,896659200000,0.0,1,1
1,,0,0.0,Bearing Ball,,,,750,1000,896659200000,0.0,2,2


##### 3.2. Make Necessary Transformations to the New Dataframe

In [59]:
ordered_columns = [
    'product_key', 'product_id', 'Name', 'Color',
    'SafetyStockLevel', 'ReorderPoint', 'StandardCost',
    'ListPrice', 'DaysToManufacture', 'SellStartDate',
    'ProductCategory', 'ProductSubcategory', 'ProductModel'
]
df_dim_products = df_dim_products.select(*ordered_columns)
df_dim_products.toPandas().head(2)


Unnamed: 0,product_key,product_id,Name,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,SellStartDate,ProductCategory,ProductSubcategory,ProductModel
0,1,1,Adjustable Race,,1000,750,0.0,0.0,0,896659200000,,,
1,2,2,Bearing Ball,,1000,750,0.0,0.0,0,896659200000,,,


##### 3.3. Save as the <span style="color:darkred">dim_products</span> table in the Data lakehouse

In [60]:
df_dim_products.write.mode("overwrite").saveAsTable(f"{dest_database}.dim_products")


##### 3.4. Unit Test: Describe and Preview Table

In [61]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()


+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         product_key|                 int|   NULL|
|          product_id|                 int|   NULL|
|                Name|              string|   NULL|
|               Color|              string|   NULL|
|    SafetyStockLevel|                 int|   NULL|
|        ReorderPoint|                 int|   NULL|
|        StandardCost|              double|   NULL|
|           ListPrice|              double|   NULL|
|   DaysToManufacture|                 int|   NULL|
|       SellStartDate|              bigint|   NULL|
|     ProductCategory|              string|   NULL|
|  ProductSubcategory|              string|   NULL|
|        ProductModel|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|           

Unnamed: 0,product_key,product_id,Name,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,SellStartDate,ProductCategory,ProductSubcategory,ProductModel
0,1,1,Adjustable Race,,1000,750,0.0,0.0,0,896659200000,,,
1,2,2,Bearing Ball,,1000,750,0.0,0.0,0,896659200000,,,


### 4.0. Verify Dimension Tables

In [62]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_customers,False
1,adventureworks_dlh,dim_date,False
2,adventureworks_dlh,dim_products,False


## Section III: Integrate Reference Data with Real-Time Data
### 5.0. Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Orders</span> Fact Data  
#### 5.1. Verify the location of the source data files on the file system

In [63]:
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,fact_sales_part1.json,240603,2025-05-07 19:58:16.696646690
1,fact_sales_part2.json,241982,2025-05-07 19:58:16.704794884
2,fact_sales_part3.json,242034,2025-05-07 19:58:16.715305328


#### 5.2. Create the Bronze Layer: Stage <span style="color:darkred">Sales Fact table</span> Data
##### 5.2.1. Read "Raw" JSON file data into a Stream

In [64]:
df_sales_bronze = (
    spark.readStream
    .option("schemaLocation", sales_output_bronze)
    .option("maxFilesPerTrigger", 1)
    .option("multiLine", "true")
    .json(orders_stream_dir)
)

df_sales_bronze.isStreaming


True

##### 5.2.2. Write the Streaming Data to a Parquet file

In [65]:
# Define checkpoint path inside your Bronze layer output folder
sales_checkpoint_bronze = os.path.join(sales_output_bronze, '_checkpoint')

# WriteStream query for Bronze ingestion
sales_bronze_query = (
    df_sales_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    .writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("sales_bronze")
    .trigger(availableNow=True)
    .option("checkpointLocation", sales_checkpoint_bronze)
    .option("compression", "snappy")
    .start(sales_output_bronze)
)


##### 5.2.3. Unit Test: Implement Query Monitoring

In [66]:
print(f"Query ID: {sales_bronze_query.id}")
print(f"Query Name: {sales_bronze_query.name}")
print(f"Query Status: {sales_bronze_query.status}")


Query ID: 4939261c-bed6-44fb-8bb3-8d43e111bc4c
Query Name: sales_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [67]:
sales_bronze_query.awaitTermination()

#### 5.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 5.3.1. Prepare Role-Playing Dimension Primary and Business Keys

In [68]:
df_order_date = df_dim_date.select(
    col("date_key").alias("order_date_key"),
    col("full_date").alias("order_full_date")
)

df_shipped_date = df_dim_date.select(
    col("date_key").alias("shipped_date_key"),
    col("full_date").alias("shipped_full_date")
)

df_due_date = df_dim_date.select(
    col("date_key").alias("due_date_key"),
    col("full_date").alias("due_full_date")
)


##### 5.3.2. Define Silver Query to Join Streaming with Batch Data

In [69]:
df_sales_silver = (
    spark.readStream
    .format("parquet")
    .load(sales_output_bronze)
    .join(df_dim_customers, on="customer_key", how="inner")
    .join(df_dim_products, on="product_key", how="inner")
    .join(df_order_date, on="order_date_key", how="left_outer")
    # Only include shipped/due joins if those keys exist
    .select(
        col("sales_order_id").cast(LongType()),
        col("customer_key").cast(LongType()),
        col("product_key").cast(LongType()),
        col("TotalDue").cast(DoubleType()),
        col("order_date_key").cast(LongType()),
        col("order_full_date").cast(DateType()),
        col("receipt_time"),
        col("source_file")
    )
)


In [70]:
df_sales_silver.isStreaming

True

In [71]:
df_sales_silver.printSchema()

root
 |-- sales_order_id: long (nullable = true)
 |-- customer_key: long (nullable = true)
 |-- product_key: long (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- order_date_key: long (nullable = true)
 |-- order_full_date: date (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



##### 5.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [72]:
# Define your Silver output and checkpoint directories
sales_checkpoint_silver = os.path.join(sales_output_silver, '_checkpoint')

# Start streaming write from your Silver DataFrame
sales_silver_query = (
    df_sales_silver.writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("sales_silver")
    .trigger(availableNow=True)
    .option("checkpointLocation", sales_checkpoint_silver)
    .option("compression", "snappy")
    .start(sales_output_silver)
)

##### 5.3.4. Unit Test: Implement Query Monitoring

In [73]:
print(f"Query ID: {sales_silver_query.id}")
print(f"Query Name: {sales_silver_query.name}")
print(f"Query Status: {sales_silver_query.status}")

Query ID: 2c3f26c8-21c8-4367-98b1-cbc75cfd41ab
Query Name: sales_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [74]:
sales_silver_query.awaitTermination()

#### 5.4. Create Gold Layer: Perform Aggregations
##### 5.4.1. Define a Query to Create a Business Report
Aggregates streaming sales data by product category and month to create a summarized gold layer.

In [75]:
from pyspark.sql.functions import count, asc, desc
from pyspark.sql.types import IntegerType

df_sales_by_category_gold = (
    spark.readStream.format("parquet").load(sales_output_silver)
    .join(df_dim_products, "product_key")
    .join(df_dim_date, df_dim_date.date_key.cast(IntegerType()) == col("order_date_key").cast(IntegerType()))
    .groupBy("month_of_year", "ProductCategory", "month_name")
    .agg(count("product_key").alias("product_count"))
    .orderBy(asc("month_of_year"), desc("product_count"))
)


In [76]:
df_sales_by_category_gold.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = false)



##### 5.4.2. Write the Streaming data to a Parquet File in "Complete" mode

In [77]:
sales_gold_query = (
    df_sales_by_category_gold.writeStream
    .format("memory")
    .outputMode("complete")
    .queryName("fact_sales_by_product_category")
    .start()
)


In [78]:
wait_until_stream_is_ready(sales_gold_query, 1)

The stream has processed 1 batchs


##### 5.4.3. Query the Gold Data from Memory

In [79]:
df_fact_sales_by_product_category = spark.sql("SELECT * FROM fact_sales_by_product_category")
df_fact_sales_by_product_category.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = false)



##### 5.4.4 Create the Final Selection

In [80]:
from pyspark.sql.functions import col, asc, desc

df_fact_sales_by_product_category_gold_final = df_fact_sales_by_product_category \
    .select(
        col("month_name").alias("Month"),
        col("ProductCategory").alias("Product Category"),
        col("product_count").alias("Product Count")
    ) \
    .orderBy(asc("month_of_year"), desc("Product Count"))


##### 5.4.5. Load the Final Results into a New Table and Display the Results

In [81]:
df_fact_sales_by_product_category_gold_final.write.saveAsTable(
    f"{dest_database}.fact_sales_by_product_category", 
    mode="overwrite"
)

spark.sql(f"SELECT * FROM {dest_database}.fact_sales_by_product_category").toPandas()


Unnamed: 0,Month,Product Category,Product Count
0,September,Components,117
1,September,Clothing,96
2,September,Accessories,64
3,December,Components,178
4,December,Clothing,124
5,December,Accessories,65
6,October,Accessories,36
7,November,Bikes,833
8,November,Components,239
9,July,Accessories,37
