## Lab 6: Building a Data Lakehouse with the PySpark Structured Streaming Medallion Architecture
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark/PySpark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

## Section I: Prerequisites

### 1.0. Import Required Libraries

In [209]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark


### 2.0. Instantiate Global Variables

In [210]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "Millerlite99",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location": "atlas",
    "user_name": "emilym0723_db_user",
    "password": "RSMC3jpzIh5aez1z",
    "cluster_name": "Cluster0",
    "cluster_subnet": "hytrbnr",
    "db_name": "adventureworks",
    "collection": "M2002",
    "null_column_threshold": 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'Scripts')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

orders_stream_dir = os.path.join(stream_dir, 'orders')
purchase_orders_stream_dir = os.path.join(stream_dir, 'purchase_orders')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

orders_output_bronze = os.path.join(database_dir, 'fact_sales_order_vw', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_sales_order_vw', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_sales_order_vw', 'gold')

### 3.0. Define Global Functions

In [211]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark AdventureWorks Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

### 4.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [212]:
remove_directory_tree(database_dir)

"An error occurred: [Errno 66] Directory not empty: '/Users/emilymoore/Downloads/DS-2002-main/04-PySpark/spark-warehouse/adventureworks_dlh.db/fact_sales_order_vw/silver'"

### 5.0. Create a New Spark Session

In [213]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

### 6.0. Create a New Metadata Database.

In [214]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Capstone Project Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Capstone Project');
"""
spark.sql(sql_create_db)

DataFrame[]

## Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
### 1.0. Fetch Data from the File System
#### 1.1. Verify the location of the source data files on the file system

In [215]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,.DS_Store,6148,2025-12-09 21:07:35.477997780
1,adventureworks_address.csv,75516,2025-12-09 17:09:43.810978651
2,adventureworks_customers.json,149540,2025-12-09 20:47:35.457307816
3,adventureworks_employees.json,129366,2025-12-09 20:48:55.777734756
4,adventureworks_products.json,327276,2025-12-09 20:49:25.467095613
5,adventureworks_territory.csv,935,2025-12-09 17:29:33.271051168


#### Populate the <span style="color:darkred">Address Dimension</span>
##### Use PySpark to Read data from a CSV file

In [216]:
address_csv = os.path.join(batch_dir, 'adventureworks_address.csv')
print(address_csv)

df_dim_address = spark.read.format('csv').options(header='true', inferSchema='true').load(address_csv)
df_dim_address.toPandas().head(2)

/Users/emilymoore/Downloads/DS-2002-main/04-PySpark/Scripts/adventureworks/batch/adventureworks_address.csv


Unnamed: 0,AddressID,AddressLine1,AddressLine2,City,StateProvinceID,PostalCode,rowguid,ModifiedDate
0,1,1970 Napa Ct.,,Bothell,79,98011,...,1998-01-04
1,2,9833 Mt. Dias Blv.,,Bothell,79,98011,...,1999-01-01


##### Make Necessary Transformations to the New DataFrame

In [217]:
# ----------------------------------------------------------------------------------
# Rename and remove necessary columns ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_address = df_dim_address.drop("rowguid")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_address.createOrReplaceTempView("address")
sql_address = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY AddressID) AS AddressKey
    FROM address;
"""
df_dim_address = spark.sql(sql_address)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['AddressKey', 'AddressID', 'AddressLine1', 'AddressLine2'
                   , 'City', 'StateProvinceID', 'PostalCode', 'ModifiedDate']

df_dim_address = df_dim_address[ordered_columns]
df_dim_address.toPandas().head(2)

Unnamed: 0,AddressKey,AddressID,AddressLine1,AddressLine2,City,StateProvinceID,PostalCode,ModifiedDate
0,1,1,1970 Napa Ct.,,Bothell,79,98011,1998-01-04
1,2,2,9833 Mt. Dias Blv.,,Bothell,79,98011,1999-01-01


##### Save as the <span style="color:darkred">dim_address</span> table in the Data Lakehouse

In [218]:
df_dim_address.write.saveAsTable(f"{dest_database}.dim_address", mode="overwrite")

##### Describe and Preview Table

In [219]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_address;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_address LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          AddressKey|                 int|   NULL|
|           AddressID|                 int|   NULL|
|        AddressLine1|              string|   NULL|
|        AddressLine2|              string|   NULL|
|                City|              string|   NULL|
|     StateProvinceID|                 int|   NULL|
|          PostalCode|              string|   NULL|
|        ModifiedDate|           timestamp|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|         dim_address|       |
|        Created Time|Tue Dec 09 19:16:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.7|       |
|           

Unnamed: 0,AddressKey,AddressID,AddressLine1,AddressLine2,City,StateProvinceID,PostalCode,ModifiedDate
0,1,1,1970 Napa Ct.,,Bothell,79,98011,1998-01-04
1,2,2,9833 Mt. Dias Blv.,,Bothell,79,98011,1999-01-01


#### Populate the <span style="color:darkred">Territory Dimension</span>
##### Use PySpark to Read Data from a CSV File

In [220]:
territory_csv = os.path.join(batch_dir, 'adventureworks_territory.csv')
print(territory_csv)

df_dim_territory = spark.read.format('csv').options(header='true', inferSchema='true').load(territory_csv)
df_dim_territory.toPandas().head(2)


/Users/emilymoore/Downloads/DS-2002-main/04-PySpark/Scripts/adventureworks/batch/adventureworks_territory.csv


Unnamed: 0,TerritoryID,Name,CountryRegionCode,Group,SalesYTD,SalesLastYear,CostYTD,CostLastYear,rowguid,ModifiedDate
0,1,Northwest,US,North America,5767342.0,3298694.0,0,0,...,1998-06-01
1,2,Northeast,US,North America,3857164.0,3607149.0,0,0,...,1998-06-01


##### Make Necessary Transformations to the New DataFrame

In [221]:
# ----------------------------------------------------------------------------------
# Rename and remove necessary columns ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_territory = df_dim_territory.drop("rowguid")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_territory.createOrReplaceTempView("territory")
sql_territory = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY TerritoryID) AS Territory_Key
    FROM territory;
"""
df_dim_territory = spark.sql(sql_territory)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['Territory_Key', 'TerritoryID', 'Name'
                   , 'CountryRegionCode', 'Group', 'SalesYTD', 'SalesLastYear', 'CostYTD',
                  'CostLastYear', 'ModifiedDate']

df_dim_territory = df_dim_territory[ordered_columns]
df_dim_territory.toPandas().head(2)

Unnamed: 0,Territory_Key,TerritoryID,Name,CountryRegionCode,Group,SalesYTD,SalesLastYear,CostYTD,CostLastYear,ModifiedDate
0,1,1,Northwest,US,North America,5767342.0,3298694.0,0,0,1998-06-01
1,2,2,Northeast,US,North America,3857164.0,3607149.0,0,0,1998-06-01


### Save as the <span style="color:darkred">dim_territory</span> table in the Data Lakehouse

In [222]:
df_dim_territory.write.saveAsTable(f"{dest_database}.dim_territory", mode="overwrite")

##### 1.3.4. Unit Test: Describe and Preview Table

In [223]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_territory;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_territory LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|       Territory_Key|                 int|   NULL|
|         TerritoryID|                 int|   NULL|
|                Name|              string|   NULL|
|   CountryRegionCode|              string|   NULL|
|               Group|              string|   NULL|
|            SalesYTD|              double|   NULL|
|       SalesLastYear|              double|   NULL|
|             CostYTD|                 int|   NULL|
|        CostLastYear|                 int|   NULL|
|        ModifiedDate|           timestamp|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|       dim_territory|       |
|        Created Time|Tue Dec 09 19:16:...|       |
|         La

Unnamed: 0,Territory_Key,TerritoryID,Name,CountryRegionCode,Group,SalesYTD,SalesLastYear,CostYTD,CostLastYear,ModifiedDate
0,1,1,Northwest,US,North America,5767342.0,3298694.0,0,0,1998-06-01
1,2,2,Northeast,US,North America,3857164.0,3607149.0,0,0,1998-06-01


### 2.0. Fetch Reference Data from a MongoDB Atlas Database
#### 2.1. Create a New MongoDB Database, and Load Each JSON File into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [224]:
client = get_mongo_client(**mongodb_args)

json_files = {"customers" : "adventureworks_customers.json",
              "employees" : 'adventureworks_employees.json',
              "products" : 'adventureworks_products.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

In [225]:
mongodb_args["collection"] = "customers"

df_dim_customers = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_customers.toPandas().head(2)

Unnamed: 0,AccountNumber,CustomerID,CustomerType,ModifiedDate,TerritoryID
0,AW00000001,1,S,2004-10-13 11:15:07,1
1,AW00000002,2,S,2004-10-13 11:15:07,1


In [226]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'customer_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_customers = df_dim_customers.drop("rowguid")

# ----------------------------------------------------------------------------------
# Add Primary Key column using the SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_customers.createOrReplaceTempView("customers")
sql_customers = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY CustomerID) AS CustomerKey
    FROM customers;
"""
df_dim_customers = spark.sql(sql_customers)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['CustomerKey', 'CustomerID', 'TerritoryID', 'AccountNumber', 'ModifiedDate']

df_dim_customers = df_dim_customers[ordered_columns]
df_dim_customers.toPandas().head(2)

Unnamed: 0,CustomerKey,CustomerID,TerritoryID,AccountNumber,ModifiedDate
0,1,1,1,AW00000001,2004-10-13 11:15:07
1,2,2,1,AW00000002,2004-10-13 11:15:07


In [227]:
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [228]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         CustomerKey|                 int|   NULL|
|          CustomerID|                 int|   NULL|
|         TerritoryID|                 int|   NULL|
|       AccountNumber|              string|   NULL|
|        ModifiedDate|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|       dim_customers|       |
|        Created Time|Tue Dec 09 19:16:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.7|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/Users/emily...|       |
+-----------

Unnamed: 0,CustomerKey,CustomerID,TerritoryID,AccountNumber,ModifiedDate
0,1,1,1,AW00000001,2004-10-13 11:15:07
1,2,2,1,AW00000002,2004-10-13 11:15:07


In [229]:
mongodb_args["collection"] = "employees"

df_dim_employees = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_employees.toPandas().head(2)

Unnamed: 0,BirthDate,ContactID,CurrentFlag,EmployeeID,Gender,HireDate,LoginID,ManagerID,MaritalStatus,ModifiedDate,NationalIDNumber,SalariedFlag,SickLeaveHours,Title,VacationHours
0,1972-05-15 00:00:00,1209,1,1,M,1996-07-31 00:00:00,adventure-works\guy1,16.0,M,2004-07-31 00:00:00,14417807,0,30,Production Technician - WC60,21
1,1977-06-03 00:00:00,1030,1,2,M,1997-02-26 00:00:00,adventure-works\kevin0,6.0,S,2004-07-31 00:00:00,253022876,0,41,Marketing Assistant,42


In [230]:
# ----------------------------------------------------------------------------------
# Rename columns and drop unnecessary ones
# ----------------------------------------------------------------------------------
df_dim_employees = df_dim_employees.drop("NationalIDNumber", "BirthDate", "MaritalStatus", "Gender", "HireDate"
                                        , "SalariedFlag", "VacationHours", "SickLeaveHours", "CurrentFlag", "rowguid")

# ----------------------------------------------------------------------------------
# Add Primary Key column using ROW_NUMBER()
# ----------------------------------------------------------------------------------
df_dim_employees.createOrReplaceTempView("employees")
sql_employees = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY EmployeeID) AS EmployeeKey
    FROM employees;
"""
df_dim_employees = spark.sql(sql_employees)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['EmployeeKey', 'EmployeeID', 'ContactID', 'LoginID', 'ManagerID', 'Title']
df_dim_employees = df_dim_employees[ordered_columns]
df_dim_employees.toPandas().head(2)

Unnamed: 0,EmployeeKey,EmployeeID,ContactID,LoginID,ManagerID,Title
0,1,1,1209,adventure-works\guy1,16.0,Production Technician - WC60
1,2,2,1030,adventure-works\kevin0,6.0,Marketing Assistant


In [231]:
df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

##### 2.3.4. Unit Test: Describe and Preview Table

In [232]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         EmployeeKey|                 int|   NULL|
|          EmployeeID|                 int|   NULL|
|           ContactID|                 int|   NULL|
|             LoginID|              string|   NULL|
|           ManagerID|                 int|   NULL|
|               Title|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|       dim_employees|       |
|        Created Time|Tue Dec 09 19:16:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.7|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|           

Unnamed: 0,EmployeeKey,EmployeeID,ContactID,LoginID,ManagerID,Title
0,1,1,1209,adventure-works\guy1,16,Production Technician - WC60
1,2,2,1030,adventure-works\kevin0,6,Marketing Assistant


#### Populate the <span style="color:darkred">Products Dimension</span>
##### Fetch Data from the New MongoDB <span style="color:darkred">Products</span> Collection

In [233]:
mongodb_args["collection"] = "products"

df_dim_products = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_products.toPandas().head(2)

                                                                                

Unnamed: 0,Color,DaysToManufacture,FinishedGoodsFlag,ListPrice,MakeFlag,ModifiedDate,Name,ProductID,ProductLine,ProductModelID,ProductNumber,ProductSubcategoryID,ReorderPoint,SafetyStockLevel,SellStartDate,StandardCost
0,,0,0,0.0,0,2004-03-11 10:01:36,Adjustable Race,1,,,AR-5381,,750,1000,1998-06-01 00:00:00,0.0
1,,0,0,0.0,0,2004-03-11 10:01:36,Bearing Ball,2,,,BA-8327,,750,1000,1998-06-01 00:00:00,0.0


##### Make Necessary Transformations to the New Dataframe

In [234]:
# ----------------------------------------------------------------------------------
# Rename columns and drop unnecessary ones
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products.drop("Size", "SizeUnitMeasureCode", "WeightUnitMeasureCode", "Weight", "DaysToManufacture"
                                        , "Style", "Class", "ProductSubcategoryID", "rowguid", "ProductModelID", 
                                      "SellStartDate", "SellEndDate", "DiscontinuedDate", "SafetyStockLevel", "ReorderPoint", "MakeFlag",
                                      "FinishedGoodsFlag")

# ----------------------------------------------------------------------------------
# Add Primary Key column using ROW_NUMBER()
# ----------------------------------------------------------------------------------
df_dim_products.createOrReplaceTempView("products")
sql_products = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY ProductID) AS ProductKey
    FROM products;
"""
df_dim_products = spark.sql(sql_products)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['ProductKey', 'ProductID', 'Name', 'ProductNumber', "ProductLine", 'Color', 'StandardCost', 'ListPrice', 'ModifiedDate']
df_dim_products = df_dim_products[ordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,ProductKey,ProductID,Name,ProductNumber,ProductLine,Color,StandardCost,ListPrice,ModifiedDate
0,1,1,Adjustable Race,AR-5381,,,0.0,0.0,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,,,0.0,0.0,2004-03-11 10:01:36


##### 2.4.3. Save as the <span style="color:darkred">dim_products</span> table in the Data lakehouse

In [235]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

##### 2.4.4. Unit Test: Describe and Preview Table

In [236]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          ProductKey|                 int|   NULL|
|           ProductID|                 int|   NULL|
|                Name|              string|   NULL|
|       ProductNumber|              string|   NULL|
|         ProductLine|              string|   NULL|
|               Color|              string|   NULL|
|        StandardCost|              double|   NULL|
|           ListPrice|              double|   NULL|
|        ModifiedDate|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|        dim_products|       |
|        Created Time|Tue Dec 09 19:17:...|       |
|         Last Access|             UNKNOWN|       |
|          C

Unnamed: 0,ProductKey,ProductID,Name,ProductNumber,ProductLine,Color,StandardCost,ListPrice,ModifiedDate
0,1,1,Adjustable Race,AR-5381,,,0.0,0.0,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,,,0.0,0.0,2004-03-11 10:01:36


### 3.0. Fetch Reference Data from a MySQL Database
#### 3.1. Populate the <span style="color:darkred">Date Dimension</span>
##### 3.1.1 Fetch data from the <span style="color:darkred">dim_date</span> table in MySQL

In [237]:
sql_dim_date = f"SELECT * FROM adventureworks.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

##### 3.1.2. Save as the <span style="color:darkred">dim_date</span> table in the Data Lakehouse

In [238]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

##### 3.1.3. Unit Test: Describe and Preview Table

In [239]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


### 4.0. Verify Dimension Tables

In [240]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_address,False
1,adventureworks_dlh,dim_customers,False
2,adventureworks_dlh,dim_date,False
3,adventureworks_dlh,dim_employees,False
4,adventureworks_dlh,dim_products,False
5,adventureworks_dlh,dim_territory,False
6,,address,True
7,,customers,True
8,,employees,True
9,,products,True


## Section III: Integrate Reference Data with Real-Time Data
### 6.0. Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Orders</span> Fact Data  
#### 6.1. Verify the location of the source data files on the file system

In [241]:
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,.DS_Store,6148,2025-12-09 21:07:40.494396448
1,adventureworks_orders_01.json,42484498,2025-12-09 16:58:42.263897657
2,adventureworks_orders_02.json,41868307,2025-12-09 17:02:42.386663437
3,adventureworks_orders_03.json,41802302,2025-12-09 17:03:12.437009811


#### 6.2. Create the Bronze Layer: Stage <span style="color:darkred">Orders Fact table</span> Data
##### 6.2.1. Read "Raw" JSON file data into a Stream

In [242]:
df_orders_bronze = (
    spark.readStream \
    .option("schemaLocation", orders_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(orders_stream_dir)
)

df_orders_bronze.isStreaming

True

##### 6.2.2. Write the Streaming Data to a Parquet file

In [243]:
orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint')

orders_bronze_query = (
    df_orders_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(orders_output_bronze)
)

##### 6.2.3. Unit Test: Implement Query Monitoring

In [244]:
print(f"Query ID: {orders_bronze_query.id}")
print(f"Query Name: {orders_bronze_query.name}")
print(f"Query Status: {orders_bronze_query.status}")

Query ID: 7d717ee2-0c5d-4e3e-8182-b8916c312393
Query Name: orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': True}


In [245]:
orders_bronze_query.awaitTermination()

#### 6.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 6.3.1. Prepare Role-Playing Dimension Primary and Business Keys

In [246]:
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("order_full_date"))
df_dim_paid_date = df_dim_date.select(col("date_key").alias("paid_date_key"), col("full_date").alias("paid_full_date"))
df_dim_shipped_date = df_dim_date.select(col("date_key").alias("shipped_date_key"), col("full_date").alias("shipped_full_date"))

##### 6.3.2. Define Silver Query to Join Streaming with Batch Data

In [247]:
df_orders_silver = spark.readStream.format("parquet").load(orders_output_bronze) \
    .join(df_dim_customers, "CustomerID") \
    .join(df_dim_employees, "ContactID") \
    .join(df_dim_products, "ProductID") \
    .join(df_dim_order_date, df_dim_order_date.order_full_date.cast(DateType()) == col("OrderDate").cast(DateType()), "inner") \
    .join(df_dim_shipped_date, df_dim_shipped_date.shipped_full_date.cast(DateType()) == col("ShipDate").cast(DateType()), "left_outer") \
    .join(df_dim_paid_date, df_dim_paid_date.paid_full_date.cast(DateType()) == col("DueDate").cast(DateType()), "left_outer") \
    .select(col("order_key").cast(LongType()), \
            col("OrderDate").cast(LongType()), \
            df_dim_customers.CustomerKey.cast(LongType()), \
            df_dim_employees.EmployeeKey.cast(LongType()), \
            df_dim_products.ProductKey.cast(LongType()), \
            df_dim_order_date.order_date_key.cast(LongType()), \
            df_dim_paid_date.paid_date_key.cast(LongType()), \
            df_dim_shipped_date.shipped_date_key.cast(LongType()), \
            col("OrderQty"),\
            col("UnitPrice"), \
            col("ShipRate"), \
            col("TaxAmt"), \
            col("Status"), \
           )

In [248]:
df_orders_silver.isStreaming

True

In [249]:
df_orders_silver.printSchema()

root
 |-- order_key: long (nullable = true)
 |-- OrderDate: long (nullable = true)
 |-- CustomerKey: long (nullable = false)
 |-- EmployeeKey: long (nullable = false)
 |-- ProductKey: long (nullable = false)
 |-- order_date_key: long (nullable = true)
 |-- paid_date_key: long (nullable = true)
 |-- shipped_date_key: long (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- ShipRate: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- Status: long (nullable = true)



##### 6.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [250]:
orders_checkpoint_silver = os.path.join(orders_output_silver, '_checkpoint')

orders_silver_query = (
    df_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(orders_output_silver)
)

##### 6.3.4. Unit Test: Implement Query Monitoring

In [251]:
print(f"Query ID: {orders_silver_query.id}")
print(f"Query Name: {orders_silver_query.name}")
print(f"Query Status: {orders_silver_query.status}")

Query ID: c87fa999-5ca5-4213-af3b-effa2d68b60e
Query Name: orders_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [252]:
orders_silver_query.awaitTermination()

#### 6.4. Create Gold Layer: Perform Aggregations
##### 6.4.1. Define a Query to Create a Business Report
Create a new Gold table using the PySpark API. The table should include the number of Products sold per Category each Month. The results should include The Month, Product Category and Number of Products sold, sorted by the month number when the orders were placed: e.g., January, February, March.

In [261]:
df_orders_by_product_category_gold = spark.readStream.format("parquet").load(orders_output_silver) \
.join(df_dim_products, "ProductKey") \
.join(df_dim_date, df_dim_date.date_key.cast(IntegerType()) == col("order_date_key").cast(IntegerType())) \
.groupBy("month_of_year", "Name", "month_name") \
.agg(count("ProductKey").alias("product_count")) \
.orderBy(asc("month_of_year"), desc("product_count"))

In [262]:
df_orders_by_product_category_gold.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- Name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = false)



##### 6.4.2. Write the Streaming data to a Parquet File in "Complete" mode

In [264]:
orders_gold_query = (
    df_orders_by_product_category_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_orders_by_product_quantity")
    .start()
)

In [265]:
wait_until_stream_is_ready(orders_gold_query, 1)

The stream has processed 1 batchs


##### 6.4.3. Query the Gold Data from Memory

In [266]:
df_fact_orders_by_product_quantity = spark.sql("SELECT * FROM fact_orders_by_product_quantity")
df_fact_orders_by_product_quantity.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- Name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- product_count: long (nullable = false)



##### 6.4.4 Create the Final Selection

In [267]:
df_fact_orders_by_product_quantity_gold_final = df_fact_orders_by_product_quantity \
.select(col("month_of_year").alias("Month"),
        col("Name").alias("Product"),
        col("product_count").alias("Product Count")) \
.orderBy(asc("Month"), desc("Product Count"))

##### 6.4.5. Load the Final Results into a New Table and Display the Results

In [268]:
df_fact_orders_by_product_quantity_gold_final.write.saveAsTable(f"{dest_database}.fact_orders_by_product_quantity", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_orders_by_product_quantity").toPandas()

Unnamed: 0,Month,Product,Product Count


### 9.0. Stop the Spark Session

In [269]:
spark.stop()