## DS 2002 Project 2
Project completed according to the requirements sent out via email/Canvas Course Announcement on April 15 (and also drawing from the requirements of the project document here that don't conflict with the version sent out via email: https://github.com/JTupitza-UVA/DS-2002/blob/main/Projects/DS-2002-Data-Project-2-Capstone.pdf)

### Project Info:
**Author Name**: Calvin Pan  
**Author Computing ID**: nqc8gh@virginia.edu  
**Instructor Name**: Jon Tupitza  
**Course**: DS 2002 Data Science Systems - Spring 2025 Section 001  
**Due Date**: 5/09/2025  

## Step 0: Setup
### Step 0.1: Import the Necessary Libraries

In [97]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

/opt/anaconda3/lib/python3.12/site-packages/pyspark


### Step 0.2: Instantiate Global Variables

In [99]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "127.0.0.1",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "#Pelican1",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas", # "local"
    "user_name" : "calvinpan1",
    "password" : "Pelican1",
    "cluster_name" : "ds2002",
    "cluster_subnet" : "hc2ph",
    "db_name" : "lab6pyspark",
    "collection" : "project2",
    "null_column_threshold" : 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'project_data')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

output_bronze = os.path.join(database_dir, 'bronze')
output_silver = os.path.join(database_dir, 'silver')
output_gold = os.path.join(database_dir, 'gold')

### Step 0.3. Define Global Functions

In [101]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['conn_props']['user']}:{args['conn_props']['password']}@{args['host_name']}/{args['db_name']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    uri = f"mongodb+srv://{args['user_name']}:{args['password']}@{args['cluster_subnet']}.{args['cluster_name']}.mongodb.net/{args['db_name']}?retryWrites=true&w=majority"
    
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


### Step 0.4: Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [103]:
remove_directory_tree(database_dir)

"Directory '/Users/calvin/Documents/Calvin/UVA/2024-25/DS 2002 - Data Science Systems/Projects/Project 2/spark-warehouse/adventureworks_dlh.db' has been removed successfully."

### Step 0.5: Create a New Spark Session

In [105]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

### Step 0.6" Create a New Metadata Database.

In [107]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 PROJECT 2 Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Lab 6.0');
"""
spark.sql(sql_create_db)

DataFrame[]

## Step 1: Exporting adventureworks to everything
### Step 1.1: Downloading adventureworks and exporting it into MySQL

Done via running this script in my local SQL instance: https://github.com/JTupitza-UVA/DS-2002/blob/main/Projects/Scripts/AdventureWorks_MySQL.sql 

### Step 1.2 Exporting dim_date into the adventureworks schema in MySQL


Done via running in my local SQL instance a slightly edited version of the Lab 2b code for dim_date obtained here (only change was to replace USE northwind_dw with USE adventureworks): https://github.com/JTupitza-UVA/DS-2002/blob/main/01-SQL/Labs/Lab_02c_Create_Populate_Dim_Date.sql

### Step 1.3 Exporting PurchaseOrderHeader from the adventureworks table into MongoDB

In [113]:
# Export the SQL purchaseorderheader data to a dataframe:
sql_get_adventureworks_poh = 'SELECT * FROM adventureworks.purchaseorderheader'
df_dim_purchaseorderheader = get_sql_dataframe(sql_get_adventureworks_poh, **mysql_args)
df_dim_purchaseorderheader.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ShipMethodID,OrderDate,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
0,1,0,4,244,83,3,2001-05-17,2001-05-26,201.04,16.0832,5.026,222.1492,2001-05-26
1,2,0,1,231,32,5,2001-05-17,2001-05-26,272.1015,21.7681,6.8025,300.6721,2001-05-26


In [114]:
# Export the dataframe to a JSON formatted dict:
dict_dim_purchaseorderheader = df_dim_purchaseorderheader.to_dict(orient='records')

# To avoid date type error, convert DataFrame to JSON with ISO dates and load back into a dictionary
dict_dim_purchaseorderheader = json.loads(
    df_dim_purchaseorderheader.to_json(date_format='iso', orient='records')
)

# Write the dict to a JSON file:
with open(os.path.join(batch_dir, 'purchaseorderheader.json'), "w") as o:
    json.dump(dict_dim_purchaseorderheader, o, indent=4)

In [115]:
# Load the JSON file to MongoDB 
client = get_mongo_client(**mongodb_args)
json_files = {'purchaseorderheader': 'purchaseorderheader.json'}
set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files)

### Step 1.4 Exporting products from the SQL adventureworks table into a CSV through a PySQL query

In [117]:
# Export the SQL products data to a dataframe:
sql_get_adventureworks_products = 'SELECT * FROM adventureworks.product'
df_dim_products = get_sql_dataframe(sql_get_adventureworks_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


In [118]:
# Export the dataframe to a CSV:
dest_file = (os.path.join(batch_dir, 'products.csv'))
csv_dim_products = df_dim_products.to_csv(dest_file)

### Step 1.5 Extracting purchaseorderdetail from the SQL adventureworks table into 3 separate jsons to be used for batch data

#### Step 1.5.1 Export from SQL adventureworks table into a dataframe

In [121]:
# Export from SQL adventureworks table into a dataframe
sql_get_pod = 'SELECT * FROM adventureworks.purchaseorderdetail'
df_pod = get_sql_dataframe(sql_get_pod, **mysql_args)
df_pod.head(2)

Unnamed: 0,PurchaseOrderID,PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,LineTotal,ReceivedQty,RejectedQty,StockedQty,...,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,EmpModifiedDate,Employee_Key,PurchaseOrderHistory_Key
0,1,1,2001-05-31,4,1,50.26,201.04,3.0,0.0,3.0,...,S,F,2000-03-03,�,53,46,,2004-07-31,244,1
1,2,2,2001-05-31,3,359,45.12,135.36,3.0,0.0,3.0,...,M,M,2000-02-05,�,57,48,,2004-07-31,231,2


#### Step 1.5.2 Split this dataframe into 3 .jsons and save them each

In [123]:
# Get the total number of records
total_records = len(df_pod)

# Calculate the number of records per split
records_per_split = total_records // 3

# Determine the split points
split_points = [records_per_split, 2 * records_per_split]

# Split the DataFrame into three parts
df_part1 = df_pod.iloc[:split_points[0]]
df_part2 = df_pod.iloc[split_points[0]:split_points[1]]
df_part3 = df_pod.iloc[split_points[1]:]

# Define the base filename
base_filename = 'purchaseorderdetail_part'

# Save each part as a JSON file
dataframes_to_save = [df_part1, df_part2, df_part3]

for i, df_part in enumerate(dataframes_to_save):
    # Convert DataFrame to JSON formatted string with ISO dates
    json_string = df_part.to_json(date_format='iso', orient='records')

    # Load the JSON string into a dictionary
    dict_part = json.loads(json_string)

    # Define the output filename
    output_filename = f'{base_filename}{i + 1}.json'
    output_filepath = os.path.join(stream_dir, output_filename)

    # Write the dictionary to a JSON file
    with open(output_filepath, "w") as o:
        json.dump(dict_part, o, indent=4)

    print(f"Saved {len(df_part)} records to {output_filepath}")

Saved 2929 records to /Users/calvin/Documents/Calvin/UVA/2024-25/DS 2002 - Data Science Systems/Projects/Project 2/project_data/adventureworks/streaming/purchaseorderdetail_part1.json
Saved 2929 records to /Users/calvin/Documents/Calvin/UVA/2024-25/DS 2002 - Data Science Systems/Projects/Project 2/project_data/adventureworks/streaming/purchaseorderdetail_part2.json
Saved 2930 records to /Users/calvin/Documents/Calvin/UVA/2024-25/DS 2002 - Data Science Systems/Projects/Project 2/project_data/adventureworks/streaming/purchaseorderdetail_part3.json


## Step 2: Extracting all the BATCH data from all necessary sources (Cold Path)
### Step 2.1: Extracting employees from the MySQL version of adventureworks

In [125]:
sql_get_employees = 'SELECT * FROM adventureworks.employee'
df_dim_employees = get_mysql_dataframe(spark, sql_get_employees, **mysql_args)

In [126]:
# Create an EmployeeKey that ascends
df_dim_employees.createOrReplaceTempView("employees")
sql_employees = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY EmployeeId) AS EmployeeKey
    FROM employees;
"""
df_dim_employees = spark.sql(sql_employees)

# Reorder columns and display the first two rows in a Pandas dataframe, dropping rowguid because it causes problems
ordered_columns = ['EmployeeKey', 'EmployeeID', 'NationalIDNumber', 'ContactID', 'LoginID', 'ManagerID', 'Title', 'BirthDate','MaritalStatus','Gender','HireDate','SalariedFlag','VacationHours','SickLeaveHours','CurrentFlag','ModifiedDate']
df_dim_employees = df_dim_employees[ordered_columns]
df_dim_employees.toPandas().head(2)

                                                                                

Unnamed: 0,EmployeeKey,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,ModifiedDate
0,1,1,14417807,1209,adventure-works\guy1,16.0,Production Technician - WC60,1972-05-15,M,M,1996-07-31,False,21,30,True,2004-07-31
1,2,2,253022876,1030,adventure-works\kevin0,6.0,Marketing Assistant,1977-06-03,S,M,1997-02-26,False,42,41,True,2004-07-31


In [127]:
# Save to Data Lakehouse
df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

                                                                                

In [128]:
# Unit Test: Describe and Preview Table
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+------------------+-------+
|            col_name|         data_type|comment|
+--------------------+------------------+-------+
|         EmployeeKey|               int|   NULL|
|          EmployeeID|               int|   NULL|
|    NationalIDNumber|       varchar(15)|   NULL|
|           ContactID|               int|   NULL|
|             LoginID|      varchar(256)|   NULL|
|           ManagerID|               int|   NULL|
|               Title|       varchar(50)|   NULL|
|           BirthDate|         timestamp|   NULL|
|       MaritalStatus|        varchar(1)|   NULL|
|              Gender|        varchar(1)|   NULL|
|            HireDate|         timestamp|   NULL|
|        SalariedFlag|           boolean|   NULL|
|       VacationHours|               int|   NULL|
|      SickLeaveHours|               int|   NULL|
|         CurrentFlag|           boolean|   NULL|
|        ModifiedDate|         timestamp|   NULL|
|                    |                  |       |


                                                                                

Unnamed: 0,EmployeeKey,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,ModifiedDate
0,1,1,14417807,1209,adventure-works\guy1,16,Production Technician - WC60,1972-05-15,M,M,1996-07-31,False,21,30,True,2004-07-31
1,2,2,253022876,1030,adventure-works\kevin0,6,Marketing Assistant,1977-06-03,S,M,1997-02-26,False,42,41,True,2004-07-31


### Step 2.2: Extracting purchaseorderheader from the MongoDB version of adventureworks

In [130]:
mongodb_args["collection"] = "purchaseorderheader"

df_dim_poh = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_poh.toPandas().head(2)

                                                                                

Unnamed: 0,EmployeeID,Freight,ModifiedDate,OrderDate,PurchaseOrderID,RevisionNumber,ShipDate,ShipMethodID,Status,SubTotal,TaxAmt,TotalDue,VendorID
0,244,5.026,2001-05-26T00:00:00.000,2001-05-17T00:00:00.000,1,0,2001-05-26T00:00:00.000,3,4,201.04,16.0832,222.1492,83
1,231,6.8025,2001-05-26T00:00:00.000,2001-05-17T00:00:00.000,2,0,2001-05-26T00:00:00.000,5,1,272.1015,21.7681,300.6721,32


In [131]:
# Add ascending Key
df_dim_poh.createOrReplaceTempView("poh")
sql_poh = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY PurchaseOrderID) AS PurchaseOrderHeaderKey
    FROM poh;
"""
df_dim_poh = spark.sql(sql_poh)

# Reorder Columns and display first two rows in Pandas dataframe
ordered_columns = ['PurchaseOrderHeaderKey','PurchaseOrderID','EmployeeID','Freight',
                   'ModifiedDate','OrderDate','RevisionNumber',
                  'ShipDate','ShipMethodID','Status','SubTotal','TaxAmt','TotalDue','VendorID']
df_dim_poh = df_dim_poh [ordered_columns]
df_dim_poh.toPandas().head(2)

                                                                                

Unnamed: 0,PurchaseOrderHeaderKey,PurchaseOrderID,EmployeeID,Freight,ModifiedDate,OrderDate,RevisionNumber,ShipDate,ShipMethodID,Status,SubTotal,TaxAmt,TotalDue,VendorID
0,1,1,244,5.026,2001-05-26T00:00:00.000,2001-05-17T00:00:00.000,0,2001-05-26T00:00:00.000,3,4,201.04,16.0832,222.1492,83
1,2,2,231,6.8025,2001-05-26T00:00:00.000,2001-05-17T00:00:00.000,0,2001-05-26T00:00:00.000,5,1,272.1015,21.7681,300.6721,32


In [132]:
# Save as the dim_poh table in the Data Lakehouse
df_dim_poh.write.saveAsTable(f"{dest_database}.dim_poh", mode="overwrite")

In [133]:
# Unit Test: Describe and Preview Table
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_poh;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_poh LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|PurchaseOrderHead...|                 int|   NULL|
|     PurchaseOrderID|                 int|   NULL|
|          EmployeeID|                 int|   NULL|
|             Freight|              double|   NULL|
|        ModifiedDate|              string|   NULL|
|           OrderDate|              string|   NULL|
|      RevisionNumber|                 int|   NULL|
|            ShipDate|              string|   NULL|
|        ShipMethodID|                 int|   NULL|
|              Status|                 int|   NULL|
|            SubTotal|              double|   NULL|
|              TaxAmt|              double|   NULL|
|            TotalDue|              double|   NULL|
|            VendorID|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|           

Unnamed: 0,PurchaseOrderHeaderKey,PurchaseOrderID,EmployeeID,Freight,ModifiedDate,OrderDate,RevisionNumber,ShipDate,ShipMethodID,Status,SubTotal,TaxAmt,TotalDue,VendorID
0,1,1,244,5.026,2001-05-26T00:00:00.000,2001-05-17T00:00:00.000,0,2001-05-26T00:00:00.000,3,4,201.04,16.0832,222.1492,83
1,2,2,231,6.8025,2001-05-26T00:00:00.000,2001-05-17T00:00:00.000,0,2001-05-26T00:00:00.000,5,1,272.1015,21.7681,300.6721,32


### Step 2.3: Extracting products from our CSV

In [135]:
dest_file = (os.path.join(batch_dir, 'products.csv'))
df_dim_products = spark.read.format('csv').options(header='true', inferSchema='true').load(dest_file)
df_dim_products.toPandas().head(2)

Unnamed: 0,_c0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,...,,,,,,1998-06-01,,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,...,,,,,,1998-06-01,,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


In [136]:
# Add Primary Key
df_dim_products.createOrReplaceTempView("products")
sql_products = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY ProductID) AS ProductKey
    FROM products;
"""
df_dim_products = spark.sql(sql_products)

# Reorder columns and display the first two rows in a Pandas dataframe, dropping NAN and attachment columns
ordered_columns = ['ProductKey', 'ProductID', 'Name', 'ProductNumber','Color',
                   'SafetyStockLevel', 'ReorderPoint', 'StandardCost','ListPrice', 
                   'DaysToManufacture', 'Class','SellStartDate','SellEndDate',
                   'ModifiedDate']
df_dim_products = df_dim_products[ordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,ProductKey,ProductID,Name,ProductNumber,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,Class,SellStartDate,SellEndDate,ModifiedDate
0,1,1,Adjustable Race,AR-5381,,1000,750,0.0,0.0,0,,1998-06-01,,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,,1000,750,0.0,0.0,0,,1998-06-01,,2004-03-11 10:01:36


In [137]:
# Save as the dim_products table in the Data Lakehouse
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

In [138]:
# Unit Test: Describe and Preview Table
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          ProductKey|                 int|   NULL|
|           ProductID|                 int|   NULL|
|                Name|              string|   NULL|
|       ProductNumber|              string|   NULL|
|               Color|              string|   NULL|
|    SafetyStockLevel|                 int|   NULL|
|        ReorderPoint|                 int|   NULL|
|        StandardCost|              double|   NULL|
|           ListPrice|              double|   NULL|
|   DaysToManufacture|                 int|   NULL|
|               Class|              string|   NULL|
|       SellStartDate|                date|   NULL|
|         SellEndDate|                date|   NULL|
|        ModifiedDate|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|           

Unnamed: 0,ProductKey,ProductID,Name,ProductNumber,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,DaysToManufacture,Class,SellStartDate,SellEndDate,ModifiedDate
0,1,1,Adjustable Race,AR-5381,,1000,750,0.0,0.0,0,,1998-06-01,,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,,1000,750,0.0,0.0,0,,1998-06-01,,2004-03-11 10:01:36


### Step 2.4 Load in dim_date from the MySQL version of adventureworks

In [140]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

                                                                                

In [141]:
# Save as the dim_date table in the Data Lakehouse
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

In [142]:
# Unit Test: Describe and Preview Table
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


### Step 2.5: Join dim_employees and dim_poh

In [144]:
# Join PurchaseOrderHeader with Employees on EmployeeID
df_dim_poh_new = df_dim_poh.join(
    df_dim_employees,
    on="EmployeeID",
    how="inner"
)

df_dim_poh_new.toPandas().head(2)

Unnamed: 0,EmployeeID,PurchaseOrderHeaderKey,PurchaseOrderID,Freight,ModifiedDate,OrderDate,RevisionNumber,ShipDate,ShipMethodID,Status,...,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,ModifiedDate.1
0,164,5,5,509.9325,2001-06-09T00:00:00.000,2001-05-31T00:00:00.000,0,2001-06-09T00:00:00.000,4,4,...,Buyer,1974-09-18,S,M,1999-03-14,False,59,49,True,2004-07-31
1,164,15,15,2.5641,2002-01-23T00:00:00.000,2002-01-14T00:00:00.000,0,2002-01-23T00:00:00.000,5,4,...,Buyer,1974-09-18,S,M,1999-03-14,False,59,49,True,2004-07-31


In [145]:
df_dim_poh_new.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- PurchaseOrderHeaderKey: integer (nullable = false)
 |-- PurchaseOrderID: integer (nullable = true)
 |-- Freight: double (nullable = true)
 |-- ModifiedDate: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- ShipMethodID: integer (nullable = true)
 |-- Status: integer (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- EmployeeKey: integer (nullable = false)
 |-- NationalIDNumber: string (nullable = true)
 |-- ContactID: integer (nullable = true)
 |-- LoginID: string (nullable = true)
 |-- ManagerID: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- BirthDate: timestamp (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HireDate:

## Step 3: Integrating Cold Path Batch Data with Streaming Data (Hot Path)
### Step 3.1 Verify the location of source files on the file system

In [147]:
get_file_info(stream_dir)

Unnamed: 0,name,size,modification_time
0,purchaseorderdetail_part1.json,6064400,2025-05-10 03:59:37.888707638
1,purchaseorderdetail_part2.json,6071713,2025-05-10 03:59:41.548504353
2,purchaseorderdetail_part3.json,6073583,2025-05-10 03:59:43.252766131


### Step 3.2 Bronze Streaming

#### Step 3.2.1 Read "Raw" JSON file data into a Stream

In [150]:
df_bronze = (
    spark.readStream \
    .option("schemaLocation", output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(stream_dir)
)

df_bronze.isStreaming

                                                                                

True

#### Step 3.2.2 Write the streaming data to a Parquet file

In [152]:
checkpoint_bronze = os.path.join(output_bronze, '_checkpoint')

bronze_query = (
    df_bronze
    # Add Current Timestamp and Input Filename columns for Traceability
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(output_bronze)
)

#### Step 3.2.3 Unit Test: Implement Query Monitoring

In [154]:
print(f"Query ID: {bronze_query.id}")
print(f"Query Name: {bronze_query.name}")
print(f"Query Status: {bronze_query.status}")

Query ID: d96dc4f8-9113-4838-8d52-fe4f75753e46
Query Name: bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [155]:
bronze_query.awaitTermination()

                                                                                

### Step 3.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
#### Step 3.3.1 Prepare Role-Playing Dimension Primary and Business Keys

In [157]:
df_dim_due_date = df_dim_date.select(col("date_key").alias("DueDateKey"), col("full_date").alias("DueFullDate"))
df_dim_modified_date = df_dim_date.select(col("date_key").alias("ModifiedDateKey"), col("full_date").alias("ModifiedFullDate"))

#### Step 3.3.2 Define Silver Query to Join Streaming with Batch Data

In [159]:
df_silver = spark.readStream.format("parquet").load(output_bronze) \
    .join(df_dim_poh_new, "PurchaseOrderID") \
    .join(df_dim_products, "ProductID") \
    .join(df_dim_due_date, df_dim_due_date.DueFullDate.cast(DateType()) == col("DueDate").cast(DateType()), "left_outer") \
    .join(df_dim_modified_date, df_dim_modified_date.ModifiedFullDate.cast(DateType()) == col("ModifiedFullDate").cast(DateType()), "left_outer") \
    .select(col("PurchaseOrderID").cast(LongType()), \
            col("PurchaseOrderDetailID").cast(LongType()), \
            df_dim_poh_new.EmployeeKey.cast(LongType()), \
            df_dim_poh_new.PurchaseOrderHeaderKey.cast(LongType()), \
            df_dim_products.ProductKey.cast(LongType()), \
            df_dim_due_date.DueDateKey.cast(LongType()), \
            df_dim_modified_date.ModifiedDateKey.cast(LongType()), \
            col("OrderQty"), \
            col("UnitPrice"), \
            col("LineTotal"), \
            col("ReceivedQty"), \
            col("RejectedQty"), \
            col("StockedQty"), \
           )

In [160]:
df_silver.isStreaming

True

In [161]:
df_silver.printSchema()

root
 |-- PurchaseOrderID: long (nullable = true)
 |-- PurchaseOrderDetailID: long (nullable = true)
 |-- EmployeeKey: long (nullable = false)
 |-- PurchaseOrderHeaderKey: long (nullable = false)
 |-- ProductKey: long (nullable = false)
 |-- DueDateKey: long (nullable = true)
 |-- ModifiedDateKey: long (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- ReceivedQty: double (nullable = true)
 |-- RejectedQty: double (nullable = true)
 |-- StockedQty: double (nullable = true)



#### Step 3.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [163]:
checkpoint_silver = os.path.join(output_silver, '_checkpoint')

silver_query = (
    df_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", checkpoint_silver) \
    .option("compression", "snappy") \
    .start(output_silver)
)

In [164]:
print(f"Query ID: {silver_query.id}")
print(f"Query Name: {silver_query.name}")
print(f"Query Status: {silver_query.status}")

Query ID: 852b14fe-ec1c-4770-929c-d91e93cebf03
Query Name: silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [None]:
silver_query.awaitTermination()

[Stage 85:>                                                         (0 + 1) / 1]

### Step 3.4. Create the Gold Layer: Perform Aggregations
#### Step 3.4.1 Define a Query to Create a Business Report
This query counts the number of ordes received per employee per month by:
Joining with the df_dim_date table on DueDateKey;
Grouping by EmployeeKey, month_of_year, and optionally month_name;
Counting the number of received orders (use PurchaseOrderID as a proxy)

In [None]:

df_avg_order_qty = spark.readStream.format("parquet").load(output_silver) \
    .join(df_dim_products, "ProductKey") \
    .join(df_dim_employees, "EmployeeKey") \
    .groupBy("EmployeeID", "ProductID") \
    .agg(avg("OrderQty").alias("Average Order Quantity")) \
    .orderBy("EmployeeID", "ProductID")

In [None]:
df_avg_order_qty.printSchema()

#### 3.4.2. Write the Streaming data to a Parquet File in "Complete" mode

In [None]:
gold_query = (
    df_avg_order_qty.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("purchase_orders_by_average_order_quantity")
    .start()
)

In [None]:
wait_until_stream_is_ready(gold_query, 1)

#### 3.4.3. Query the Gold Data from Memory

In [None]:
df_purchase_orders_by_average_order_quantity = spark.sql("SELECT * FROM purchase_orders_by_average_order_quantity")
df_purchase_orders_by_average_order_quantity.printSchema()

#### 3.4.4 Create the Final Selection

In [None]:
df_purchase_orders_by_average_order_qty_gold_final = df_purchase_orders_by_average_order_quantity \
    .select(
        col("EmployeeID"),
        col("ProductID"),
        col("Average Order Quantity")
    ) \
    .orderBy(asc("Average Order Quantity"))

#### 3.4.5. Load the Final Results into a New Table and Display the Results

In [None]:
df_purchase_orders_by_average_order_qty_gold_final.write.saveAsTable(f"{dest_database}.orders_by_avg_order_qty", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.orders_by_avg_order_qty").toPandas()