# Final Project
## Part 1: Prerequisites
### Importing required libraries

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

import math
import builtins

C:\spark-3.5.4-bin-hadoop3


### Instantiating global variables

In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "sneham",
        "password" : "password",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas", # "atlas"
    "user_name" : "snehasmoothedan",
    "password" : "password101",
    "cluster_name" : "Sandbox",
    "cluster_subnet" : "eylzz",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'data')
data_dir = os.path.join(base_dir, 'adventureworks')
mongo_dir = os.path.join(data_dir, 'mongodb')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

purchase_orders_stream_dir = os.path.join(stream_dir, 'purchase_orders')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

purchase_orders_output_bronze = os.path.join(database_dir, 'fact_purchase_orders', 'bronze')
purchase_orders_output_silver = os.path.join(database_dir, 'fact_purchase_orders', 'silver')
purchase_orders_output_gold = os.path.join(database_dir, 'fact_purchase_orders', 'gold')

### Defining global functions

In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Adventureworks Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = [json.loads(line) for line in openfile]
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

### Initializing Data Lakehouse Directory Structure
Removing the current Date Lakehouse Directory Structure

In [4]:
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\sneha\\Documents\\Spring Sem 2025\\DS2002\\Final Project\\spark-warehouse\\adventureworks_dlh.db' has been removed successfully."

### Creating a new Spark session

In [5]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

### Creating a new metadata database

In [6]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Final Project'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Final Project');
"""
spark.sql(sql_create_db)

DataFrame[]

In [7]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,vendor.csv,8448,2025-05-07 18:21:54.641071796


## Creating JSON files

### For MongoDB
Creating the JSON file for the employee data

In [8]:
sql_employee = f"SELECT * FROM {mysql_args['db_name']}.employee"
df_employee = get_mysql_dataframe(spark, sql_employee, **mysql_args)

In [9]:
df_employee = df_employee.coalesce(1)
temp_dir = "data/adventureworks/temp"
df_employee.write.mode("overwrite").json(temp_dir)

for file in os.listdir(temp_dir):
    if file.startswith("part-") and file.endswith(".json"):
        part_file = file
        break

shutil.move(os.path.join(temp_dir, part_file), "data/adventureworks/mongodb/employee.json")
shutil.rmtree(temp_dir)

#### Uploading the file onto MongoDB

In [10]:
client = get_mongo_client(**mongodb_args)
json_files = {"employee" : 'employee.json'}

set_mongo_collections(client, mongodb_args["db_name"], mongo_dir, json_files)

### For local file
Creating one local CSV file for vendors

In [11]:
sql_vendor = f"SELECT * FROM {mysql_args['db_name']}.vendor"
df_vendor = get_mysql_dataframe(spark, sql_vendor, **mysql_args)

In [12]:
df_vendor = df_vendor.coalesce(1)
temp_dir = "data/adventureworks/temp"
df_vendor.write.mode("overwrite").option("header", True).csv(temp_dir)

for file in os.listdir(temp_dir):
    if file.startswith("part-") and file.endswith(".csv"):
        part_file = file
        break

shutil.move(os.path.join(temp_dir, part_file), "data/adventureworks/batch/vendor.csv")
shutil.rmtree(temp_dir)

### For Spark
Splitting the fact table data into three parts and saving them as JSON files

In [13]:
def split_spark_dataframe_to_single_json_files(df, base_filename, output_dir=".", num_parts=3):
    total_rows = df.count()
    rows_per_part = math.ceil(total_rows / num_parts)

    for i in range(num_parts):
        start_idx = i * rows_per_part
        
        end_idx = builtins.min((i + 1) * rows_per_part, total_rows)
        df_part = df.limit(end_idx).subtract(df.limit(start_idx))

        temp_path = os.path.join(output_dir, f"tmp_{base_filename}_{i+1}")
        df_part.coalesce(1).write.mode("overwrite").json(temp_path)

        for fname in os.listdir(temp_path):
            if fname.startswith("part-") and fname.endswith(".json"):
                src_path = os.path.join(temp_path, fname)
                dest_path = os.path.join(output_dir, f"{base_filename}_{i+1}.json")
                shutil.move(src_path, dest_path)
                break

        shutil.rmtree(temp_path)

In [14]:
sql_po_header = f"SELECT * FROM {mysql_args['db_name']}.purchaseorderheader"
df_po_header = get_mysql_dataframe(spark, sql_po_header, **mysql_args)

In [15]:
split_spark_dataframe_to_single_json_files(df_po_header, base_filename="po_part", output_dir="data/adventureworks/streaming", num_parts=3)

## Part 2: Populating the dimension tables

In [16]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,vendor.csv,8448,2025-05-07 18:39:08.228654861


### Populating the Vendors Dimension
Using PySpark to read data from the vendors.csv file

In [17]:
vendors_csv = os.path.join(batch_dir, 'vendor.csv')
print(vendors_csv)

df_dim_vendors = spark.read.format('csv').options(header='true', inferSchema='true').load(vendors_csv)
df_dim_vendors.toPandas().head(2)

C:\Users\sneha\Documents\Spring Sem 2025\DS2002\Final Project\data\adventureworks\batch\vendor.csv


Unnamed: 0,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,PurchasingWebServiceURL,ModifiedDate
0,1,INTERNAT0001,International,1,True,True,,2002-02-25
1,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,True,True,,2002-02-17


#### Make necessary transformations to the new dataframe

In [18]:
# ----------------------------------------------------------------------------------
# Rename columnss
# ----------------------------------------------------------------------------------
df_dim_vendors = df_dim_vendors.withColumnRenamed("VendorID", "vendor_id")
df_dim_vendors = df_dim_vendors.withColumnRenamed("Name", "vendor_name")
df_dim_vendors = df_dim_vendors.withColumnRenamed("PreferredVendorStatus", "preferred_vendor_status")
df_dim_vendors = df_dim_vendors.withColumnRenamed("CreditRating", "credit_rating")
df_dim_vendors = df_dim_vendors.withColumnRenamed("AccountNumber", "account_number")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_vendors.createOrReplaceTempView("vendors")
sql_vendors = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY vendor_id ORDER BY vendor_id) AS vendor_key
    FROM vendors;
"""
df_dim_vendors = spark.sql(sql_vendors)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['vendor_key', 'vendor_id', 'vendor_name', 'account_number'
                   , 'preferred_vendor_status', 'credit_rating']

df_dim_vendors = df_dim_vendors[ordered_columns]
df_dim_vendors.toPandas().head(2)

Unnamed: 0,vendor_key,vendor_id,vendor_name,account_number,preferred_vendor_status,credit_rating
0,1,12,"Compete, Inc.",COMPETE0002,True,1
1,1,13,Beaumont Bikes,BEAUMONT0001,False,1


In [19]:
df_dim_vendors.write.saveAsTable(f"{dest_database}.dim_vendors", mode="overwrite")

### Populate the Employees Dimension
Fetching the employee collection from MongoDB

In [20]:
mongodb_args["collection"] = "employee"

df_dim_employees = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_employees.toPandas().head(2)

Unnamed: 0,BirthDate,ContactID,CurrentFlag,EmployeeID,Gender,HireDate,LoginID,ManagerID,MaritalStatus,ModifiedDate,NationalIDNumber,SalariedFlag,SickLeaveHours,Title,VacationHours,rowguid
0,1972-05-15T00:00:00.000-04:00,1209,True,1,M,1996-07-31T00:00:00.000-04:00,adventure-works\guy1,16.0,M,2004-07-31T00:00:00.000-04:00,14417807,False,30,Production Technician - WC60,21,StDhqjfCdEm01ZNSR3N3GA==
1,1977-06-03T00:00:00.000-04:00,1030,True,2,M,1997-02-26T00:00:00.000-05:00,adventure-works\kevin0,6.0,S,2004-07-31T00:00:00.000-04:00,253022876,False,41,Marketing Assistant,42,QAJIG8CVD0GnF+splDyIhg==


#### Make necessary transformations to the new dataframe

In [21]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'customer_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_employees = df_dim_employees.withColumnRenamed("EmployeeID", "employee_id")
df_dim_employees = df_dim_employees.withColumnRenamed("Gender", "gender")
df_dim_employees = df_dim_employees.withColumnRenamed("NationalIDNumber", "national_id_num")
df_dim_employees = df_dim_employees.withColumnRenamed("MaritalStatus", "marital_status")
df_dim_employees = df_dim_employees.withColumnRenamed("Title", "title")

# ----------------------------------------------------------------------------------
# Add Primary Key column using the SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_employees.createOrReplaceTempView("employees")
sql_employees = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY employee_id ORDER BY employee_id) AS employee_key
    FROM employees;
"""
df_dim_employees = spark.sql(sql_employees)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['employee_key', 'employee_id', 'title', 'gender', 'national_id_num', 'marital_status']

df_dim_employees = df_dim_employees[ordered_columns]
df_dim_employees.toPandas().head(2)


Unnamed: 0,employee_key,employee_id,title,gender,national_id_num,marital_status
0,1,12,Vice President of Engineering,F,245797967,S
1,1,13,Production Technician - WC10,M,844973625,M


In [22]:
df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

### Populate the Product Dimension
Fetching product table from MySQL

In [23]:
sql_product = f"SELECT *, ROW_NUMBER() OVER (PARTITION BY ProductID ORDER BY ProductID) AS product_key FROM {mysql_args['db_name']}.product"
df_dim_products = get_mysql_dataframe(spark, sql_product, **mysql_args)

df_dim_products.toPandas().head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate,product_key
0,1,Adjustable Race,AR-5381,False,False,,1000,750,0.0,0.0,...,,,,,1998-06-01,NaT,NaT,"[183, 21, 66, 105, 247, 8, 13, 76, 172, 177, 2...",2004-03-11 10:01:36,1
1,2,Bearing Ball,BA-8327,False,False,,1000,750,0.0,0.0,...,,,,,1998-06-01,NaT,NaT,"[32, 60, 174, 88, 58, 79, 73, 71, 167, 212, 21...",2004-03-11 10:01:36,1


#### Temporary tables for Product Sub Category and Category

In [24]:
sql_product_subcat = f"SELECT * FROM {mysql_args['db_name']}.productsubcategory"
df_product_subcat = get_mysql_dataframe(spark, sql_product_subcat, **mysql_args)

df_product_subcat.toPandas().head(2)

Unnamed: 0,ProductSubcategoryID,ProductCategoryID,Name,rowguid,ModifiedDate
0,1,1,Mountain Bikes,"[222, 74, 54, 45, 74, 38, 60, 67, 176, 146, 79...",1998-06-01
1,2,1,Road Bikes,"[192, 16, 3, 0, 200, 188, 196, 66, 176, 195, 6...",1998-06-01


In [25]:
sql_product_cat = f"SELECT * FROM {mysql_args['db_name']}.productcategory"
df_product_cat = get_mysql_dataframe(spark, sql_product_cat, **mysql_args)

df_product_cat.toPandas().head(2)

Unnamed: 0,ProductCategoryID,Name,rowguid,ModifiedDate
0,1,Bikes,"[92, 162, 189, 207, 113, 223, 167, 71, 184, 27...",1998-06-01
1,2,Components,"[141, 130, 87, 198, 8, 216, 186, 74, 145, 163,...",1998-06-01


#### Make necessary transformations to the new dataframe

In [26]:
# ----------------------------------------------------------------------------------
# Rename columns
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products.withColumnRenamed("ProductID", "product_id")
df_dim_products = df_dim_products.withColumnRenamed("Name", "product_name")
df_dim_products = df_dim_products.withColumnRenamed("SafetyStockLevel", "safety_stock_level")
df_dim_products = df_dim_products.withColumnRenamed("ReorderPoint", "reorder_point")
df_dim_products = df_dim_products.withColumnRenamed("StandardCost", "standard_cost")
df_dim_products = df_dim_products.withColumnRenamed("ListPrice", "list_price")
df_dim_products = df_dim_products.withColumnRenamed("ProductLine", "product_line")
df_dim_products = df_dim_products.withColumnRenamed("Weight", "weight")

# ----------------------------------------------------------------------------------
# Join subcategory and category to get category name
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products \
    .join(df_product_subcat, on="ProductSubcategoryID", how="inner") \
    .join(df_product_cat, on="ProductCategoryID", how = 'inner') \
    .select(
        df_dim_products["*"],
        df_product_cat["name"].alias("category")
    )

# ----------------------------------------------------------------------------------
# Drop unwanted columns (description and attachments)
# ----------------------------------------------------------------------------------
df_dim_products.drop('ProductSubcategoryID','MakeFlag','FinishedGoodsFlags','Color','Size','SizeUnitMeasureCode','WeightUnitMeasureCode','DaysToManufacture','Class','Style','ProductSubcategoryID','ProductModelID','SellStartDate','SellEndDate','DiscontinuedDate','rowguid','ModifiedDate')

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
reordered_columns = ['product_key','product_id', 'product_name','safety_stock_level'
                     ,'reorder_point','standard_cost','list_price'
                     ,'product_line','weight','category']

df_dim_products = df_dim_products[reordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,product_id,product_name,safety_stock_level,reorder_point,standard_cost,list_price,product_line,weight,category
0,1,771,"Mountain-100 Silver, 38",100,75,1912.1544,3399.99,M,20.35,Bikes
1,1,772,"Mountain-100 Silver, 42",100,75,1912.1544,3399.99,M,20.77,Bikes


In [27]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

### Populate the Date Dimension
Fetching the dim_date table from MySQL

In [28]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

In [29]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

In [30]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Temporary table for Purchase Order Details

In [31]:
sql_po_details = f"SELECT * FROM {mysql_args['db_name']}.purchaseorderdetail"
df_po_details = get_mysql_dataframe(spark, sql_po_details, **mysql_args)

df_po_details.toPandas().head(2)

Unnamed: 0,PurchaseOrderID,PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,LineTotal,ReceivedQty,RejectedQty,StockedQty,ModifiedDate
0,1,1,2001-05-31,4,1,50.26,201.04,3.0,0.0,3.0,2001-05-24
1,2,2,2001-05-31,3,359,45.12,135.36,3.0,0.0,3.0,2001-05-24


### Verify dimension tables

In [32]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_date,False
1,adventureworks_dlh,dim_employees,False
2,adventureworks_dlh,dim_products,False
3,adventureworks_dlh,dim_vendors,False
4,,employees,True
5,,vendors,True


## Part 3: Integrating reference data with real-time data

In [33]:
get_file_info(stream_dir)

Unnamed: 0,name,size,modification_time
0,po_part_1.json,416407,2025-05-07 18:39:10.284090996
1,po_part_2.json,417429,2025-05-07 18:39:11.674629450
2,po_part_3.json,416764,2025-05-07 18:39:12.969245434


### Creating the Bronze Layer

#### Read raw JSON files into stream

In [34]:
df_purchase_orders = (
    spark.readStream \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "false") \
    .json(stream_dir)
)

df_purchase_orders.isStreaming

True

#### Write streaming data to parquet file

In [35]:
purchase_orders_checkpoint_bronze = os.path.join(purchase_orders_output_bronze, '_checkpoint')

purchase_orders_bronze_query = (
    df_purchase_orders
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("purchase_orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", purchase_orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(purchase_orders_output_bronze)
)

In [36]:
purchase_orders_bronze_query.awaitTermination()

### Creating the Silver Layer

#### Preparing the role-playing dimension keys

In [37]:
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("order_full_date"))
df_dim_ship_date = df_dim_date.select(col("date_key").alias("ship_date_key"), col("full_date").alias("ship_full_date"))
df_dim_due_date = df_dim_date.select(col("date_key").alias("due_date_key"), col("full_date").alias("due_full_date"))

#### Silver query to join streaming with the batch data

In [38]:
df_purchase_orders_silver = spark.readStream.format("parquet").load(purchase_orders_output_bronze) \
    .join(df_po_details, 'PurchaseOrderID') \
    .join(df_dim_products, df_dim_products.product_id == col("ProductID")) \
    .join(df_dim_employees, df_dim_employees.employee_id == col("EmployeeID")) \
    .join(df_dim_vendors, df_dim_vendors.vendor_id == col("VendorID")) \
    .join(df_dim_order_date, df_dim_order_date.order_full_date.cast(DateType()) == col("OrderDate").cast(DateType()), "inner") \
    .join(df_dim_due_date, df_dim_due_date.due_full_date.cast(DateType()) == col("DueDate").cast(DateType()), "inner") \
    .join(df_dim_ship_date, df_dim_ship_date.ship_full_date.cast(DateType()) == col("ShipDate").cast(DateType()), "left_outer") \
    .select(col("PurchaseOrderID").cast(LongType()).alias("purchase_order_id"), \
            df_po_details.OrderQty.cast(LongType()).alias("order_qty"), \
            df_po_details.UnitPrice.cast(LongType()).alias("unit_price"), \
            df_po_details.LineTotal.cast(LongType()).alias("line_total"), \
            df_dim_products.product_key.cast(IntegerType()), \
            df_dim_vendors.vendor_key.cast(IntegerType()), \
            df_dim_employees.employee_key.cast(IntegerType()), \
            df_dim_order_date.order_date_key.cast(LongType()), \
            df_dim_due_date.due_date_key.cast(LongType()), \
            df_dim_ship_date.ship_date_key.cast(LongType()), \
            col("SubTotal").alias("subtotal"), \
            col("TaxAmt").alias("tax_amt"), \
            col("Freight").alias("freight"), \
            col("TotalDue").alias("total_due") \
           )


In [39]:
df_purchase_orders_silver.isStreaming

True

In [40]:
df_purchase_orders_silver.printSchema()

root
 |-- purchase_order_id: long (nullable = true)
 |-- order_qty: long (nullable = true)
 |-- unit_price: long (nullable = true)
 |-- line_total: long (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- vendor_key: integer (nullable = false)
 |-- employee_key: integer (nullable = false)
 |-- order_date_key: long (nullable = true)
 |-- due_date_key: long (nullable = true)
 |-- ship_date_key: long (nullable = true)
 |-- subtotal: double (nullable = true)
 |-- tax_amt: double (nullable = true)
 |-- freight: double (nullable = true)
 |-- total_due: double (nullable = true)



#### Write the transformed streaming data to the Data Lakehouse

In [41]:
purchase_orders_checkpoint_silver = os.path.join(purchase_orders_output_silver, '_checkpoint')

purchase_orders_silver_query = (
    df_purchase_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("purchase_orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", purchase_orders_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(purchase_orders_output_silver)
)

In [42]:
purchase_orders_silver_query.awaitTermination()

### Creating Gold Layer

Creating a new Gold table using the PySpark API. This table includes the number of products sold per category for each month, sorted by the month number when orders were placed.

In [43]:
df_purchase_orders_by_product_category_gold = spark.readStream.format("parquet").load(purchase_orders_output_silver) \
.join(df_dim_products, "product_key") \
.join(df_dim_date, df_dim_date.date_key.cast(IntegerType()) == col("order_date_key").cast(IntegerType())) \
.groupBy("month_of_year", "category", "month_name") \
.agg(sum("order_qty").alias("product_count")) \
.orderBy(asc("month_of_year"), desc("product_count"))

In [44]:
df_purchase_orders_by_product_category_gold.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- category: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = true)



#### Writing the streaming data to a parquet file in "complete" mode

In [45]:
purchase_orders_gold_query = (
    df_purchase_orders_by_product_category_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_purchase_orders_by_product_category")
    .start()
)

In [46]:
wait_until_stream_is_ready(purchase_orders_gold_query, 1)

The stream has processed 1 batchs


#### Query the Gold Data from memory

In [47]:
df_fact_purchase_orders_by_product_category = spark.sql("SELECT * FROM fact_purchase_orders_by_product_category")
df_fact_purchase_orders_by_product_category.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- category: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = true)



#### Creating the final selection

In [61]:
df_fact_purchase_orders_by_product_category_gold_final = df_fact_purchase_orders_by_product_category \
.select(col("month_name").alias("Month"), \
        col("month_of_year").alias("Month Num"), \
        col("category").alias("Product Category"), \
        col("product_count").alias("Product Count")) \
.orderBy(asc("Month Num"), desc("Product Count"))

#### Loading the final results into a new table and displaying the results

In [63]:
df_fact_purchase_orders_by_product_category_gold_final.write.saveAsTable(f"{dest_database}.fact_purchase_orders_by_product_category", mode="overwrite")
spark.sql(f"SELECT Month, `Product Category`, `Product Count` FROM {dest_database}.fact_purchase_orders_by_product_category ORDER BY `Month Num` ASC, `Product Count` DESC").toPandas()

Unnamed: 0,Month,Product Category,Product Count
0,January,Components,10129060
1,January,Bikes,7332230
2,January,Clothing,2645650
3,January,Accessories,2192110
4,February,Components,10784320
5,February,Bikes,7806560
6,February,Clothing,2816800
7,February,Accessories,2333920
8,March,Components,11021500
9,March,Bikes,7978250
