# DS-2002 Final Project: Rental Data Lakehouse

This project builds a dimensional **Data Lakehouse** using Azure Databricks and PySpark, designed to support the post hoc summarization and analysis of a simple **rental business process**. It integrates batch and streaming data from multiple sources into a unified schema optimized for analysis.

### Business Process
The pipeline models a movie rental process using the AdventureWorks database as a base. It integrates:

- **Customers, Stores, and Products** as dimension tables
- **Rental Orders** as the fact table with transaction details

### Data Sources
- **MySQL**: Core AdventureWorks views (batch)
- **MongoDB Atlas**: Store details (semi-structured batch)
- **CSV**: Date and Customer dimensions
- **Streaming JSON**: Simulated real-time order data in 3 intervals

### Architecture Used
- **Medallion Architecture**: Bronze → Silver → Gold
- **ETL / ELT patterns**
- **Structured Streaming** using AutoLoader

### Goal
Demonstrate value by enabling aggregated queries (e.g., orders by store or customer segment) to support historical business analysis and planning.

## Section I: Prerequisites

### 1.0. Import Required Libraries

In [1]:
import findspark
findspark.init()
print(findspark.find())

/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Final Project Spark Test") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark.version

25/05/26 17:16:44 WARN Utils: Your hostname, Sonias-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.29 instead (on interface en0)
25/05/26 17:16:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/26 17:16:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.5.5'

In [4]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark


### 2.0. Instantiate Final Global Variables

In [6]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "ds2002-mysql.mysql.database.azure.com",
    "port" : "3306",
    "db_name" : "rental_dw",
    "conn_props" : {
        "user" : "root",
        "password" : "Carrots123",
        "driver" : "org.mariadb.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location": "atlas",
    "user_name": "sonia_sadani",
    "password": "Healthyeating67",
    "cluster_name": "Cluster0",
    "cluster_subnet": "s7ikf",
    "db_name": "northwind_purchasing",
    "collection": "purchase_orders",  # You can try with "inventory_transactions" too
    "null_column_threshold": 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'northwind')
batch_dir = os.path.join(base_dir, 'retail-org')
stream_dir = os.path.join(data_dir, 'streaming')

orders_stream_dir = os.path.join(base_dir, 'retail-org', 'streaming', 'orders')
purchase_orders_stream_dir = os.path.join(stream_dir, 'purchase_orders')
inventory_trans_stream_dir = os.path.join(stream_dir, 'inventory_transactions')
# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "rental_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

orders_output_bronze = os.path.join(database_dir, 'fact_orders', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_orders', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_orders', 'gold')

purchase_orders_output_bronze = os.path.join(database_dir, 'fact_purchase_orders', 'bronze')
purchase_orders_output_silver = os.path.join(database_dir, 'fact_purchase_orders', 'silver')
purchase_orders_output_gold = os.path.join(database_dir, 'fact_purchase_orders', 'gold')

inventory_trans_output_bronze = os.path.join(database_dir, 'fact_inventory_transactions', 'bronze')
inventory_trans_output_silver = os.path.join(database_dir, 'fact_inventory_transactions', 'silver')
inventory_trans_output_gold = os.path.join(database_dir, 'fact_inventory_transactions', 'gold')

### 3.0. Define Final Project Utility Functions

In [8]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped

def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB Atlas using the modern "mongodb" Spark connector format'''
    uri = get_mongo_uri(**args)

    dframe = spark_session.read.format("mongodb") \
        .option("spark.mongodb.read.connection.uri", uri) \
        .option("spark.mongodb.read.database", args['db_name']) \
        .option("spark.mongodb.read.collection", args['collection']) \
        .load()

    dframe = dframe.drop('_id')  # Remove Mongo internal column
    dframe = drop_null_columns(dframe, args['null_column_threshold'])  # Optional cleanup

    return dframe

### 4.0. Initialize Final Project Data Lakehouse Directory Structure                                    Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [10]:
remove_directory_tree(database_dir)

"Directory '/Users/soniasadani/Documents/04-PySpark/spark-warehouse/rental_dlh.db' has been removed successfully."

### 5.0. Create Spark Session with Final Project Configuration

In [17]:
# Section 5.0: Create a New Spark Session

jars = []

# MongoDB Spark Connector (absolute path)
mongo_spark_jar = "/Users/soniasadani/Documents/04-PySpark/mongo-spark-connector_2.12-3.0.1.jar"
jars.append(mongo_spark_jar)

# MySQL Connector (absolute path)
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
jars.append(mysql_spark_jar)

# Get Spark configuration
sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = SparkConf().setAppName("Data Lakehouse Project") \
    .setMaster(f"local[{int(os.cpu_count()/2)}]") \
    .set("spark.driver.memory", "4g") \
    .set("spark.executor.memory", "2g") \
    .set("spark.jars", ",".join(jars)) \
    .set("spark.driver.extraClassPath", ",".join(jars)) \
    .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .set("spark.sql.adaptive.enabled", "false") \
    .set("spark.sql.debug.maxToStringFields", "35") \
    .set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .set("spark.sql.streaming.schemaInference", "true") \
    .set("spark.sql.warehouse.dir", sql_warehouse_dir)

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")

### # 6.0. Create Metadata Database and Load Initial Dimension (Store)

In [20]:
json_path = "/Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/store_data.json"
df_store_json = spark.read.option("multiLine", "true").json(json_path)
df_store_json = df_store_json.drop("_id")
df_store_json.show(5, truncate=False)
spark.sql(f"CREATE DATABASE IF NOT EXISTS {dest_database}")

+----------+-------------+----------------+--------+
|address_id|last_update  |manager_staff_id|store_id|
+----------+-------------+----------------+--------+
|1         |1139979432000|1               |1       |
|2         |1139979432000|2               |2       |
+----------+-------------+----------------+--------+



DataFrame[]

## Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
### 1.0. Fetch Data from the File System
#### 1.1. Verify the location of the source data files on the file system

In [22]:
# 1.1. Verify the location of the source data files on the file system
cold_path_dir = os.path.join(base_dir, 'retail-org')
get_file_info(cold_path_dir)

Unnamed: 0,name,size,modification_time
0,.DS_Store,6148,2025-05-24 17:07:54.204872847
1,customers.csv,4550361,2025-03-26 22:08:45.000000000
2,dim_date.csv,554937,2025-05-20 22:25:30.383128643
3,film_data.json,456530,2025-03-27 04:15:43.000000000
4,store_data.json,212,2025-05-23 00:55:57.441878796


#### 1.2. Populate the <span style="color:darkred">Customers Dimension</span>
##### 1.2.1. Use PySpark to Read data from a CSV file

In [24]:
customer_csv = os.path.join(batch_dir, 'customers.csv')
print(customer_csv)

df_dim_customers = spark.read.format('csv') \
    .options(header='true', inferSchema='true') \
    .load(customer_csv)

df_dim_customers.show(2, truncate=False)

/Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/customers.csv
+-----------+------+--------+----------------------+-----+-------+--------+-----------+------+----+-------+--------+------------+----------+-------------------------------+----------+-------------+---------------+---------------+
|customer_id|tax_id|tax_code|customer_name         |state|city   |postcode|street     |number|unit|region |district|lon         |lat       |ship_to_address                |valid_from|valid_to     |units_purchased|loyalty_segment|
+-----------+------+--------+----------------------+-----+-------+--------+-----------+------+----+-------+--------+------------+----------+-------------------------------+----------+-------------+---------------+---------------+
|11123757   |NULL  |NULL    |SMITH,  SHIRLEY       |IN   |BREMEN |46506.0 |N CENTER ST|521.0 |NULL|Indiana|50.0    |-86.1465825 |41.4507625|IN, 46506.0, N CENTER ST, 521.0|1532824233|1.548137353E9|34.0           |3              |
|30585

##### 1.2.2. Transform the Customers Dimension Table

In [26]:
# Rename 'id' column if needed (not applicable here since it's 'customer_id')
# Add surrogate key
df_dim_customers.createOrReplaceTempView("customers")
sql_customers = """
    SELECT *, ROW_NUMBER() OVER (ORDER BY customer_id) AS customer_key
    FROM customers
"""
df_dim_customers = spark.sql(sql_customers)

# Choose and reorder relevant columns
ordered_columns = ['customer_key', 'customer_id', 'customer_name', 'tax_id', 'tax_code',
                   'street', 'number', 'unit', 'city', 'state', 'postcode', 'country_region',
                   'region', 'district', 'lon', 'lat', 'loyalty_segment']

# Some of these might still be missing — so run:
print(df_dim_customers.columns)
# Then remove any that don't match before running the next line

df_dim_customers = df_dim_customers.select([col for col in ordered_columns if col in df_dim_customers.columns])
df_dim_customers.toPandas().head(2)

['customer_id', 'tax_id', 'tax_code', 'customer_name', 'state', 'city', 'postcode', 'street', 'number', 'unit', 'region', 'district', 'lon', 'lat', 'ship_to_address', 'valid_from', 'valid_to', 'units_purchased', 'loyalty_segment', 'customer_key']


Unnamed: 0,customer_key,customer_id,customer_name,tax_id,tax_code,street,number,unit,city,state,postcode,region,district,lon,lat,loyalty_segment
0,1,1668,"NGUYEN, LINH THI MY",,,COLBY RD,1360,,LUNENBURG,VT,5906,VT,ESSEX,-71.673995,44.513836,0
1,2,2405,intel security,766739662.0,A,HARVEY ST,83,,SAINT JOHNSBURY,VT,5819,VT,,-72.023713,44.425748,0


##### 1.2.3. Save as the `dim_customers` Table in the Data Lakehouse

In [30]:
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

                                                                                

##### 1.2.4. Unit Test: Describe and Preview Table

In [33]:
# === Unit Test: Describe and Preview dim_customers table ===
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show(truncate=False)
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+----------------------------+-------------+-------+
|col_name                    |data_type    |comment|
+----------------------------+-------------+-------+
|customer_key                |int          |NULL   |
|customer_id                 |int          |NULL   |
|customer_name               |string       |NULL   |
|tax_id                      |double       |NULL   |
|tax_code                    |string       |NULL   |
|street                      |string       |NULL   |
|number                      |string       |NULL   |
|unit                        |string       |NULL   |
|city                        |string       |NULL   |
|state                       |string       |NULL   |
|postcode                    |string       |NULL   |
|region                      |string       |NULL   |
|district                    |string       |NULL   |
|lon                         |double       |NULL   |
|lat                         |double       |NULL   |
|loyalty_segment             |int          |NU

Unnamed: 0,customer_key,customer_id,customer_name,tax_id,tax_code,street,number,unit,city,state,postcode,region,district,lon,lat,loyalty_segment
0,1,1668,"NGUYEN, LINH THI MY",,,COLBY RD,1360,,LUNENBURG,VT,5906,VT,ESSEX,-71.673995,44.513836,0
1,2,2405,intel security,766739662.0,A,HARVEY ST,83,,SAINT JOHNSBURY,VT,5819,VT,,-72.023713,44.425748,0


### 2.0. Fetch Reference Data from a MongoDB Atlas Database
#### 2.1. Create a New MongoDB Database, and Load Each JSON File into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [36]:
# === Section 2.0: Simulate MongoDB Reference Data ===
# Instead of connecting to MongoDB Atlas, load the store_data.json file directly for the final project

# Load the JSON data
json_path = "/Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/store_data.json"

df_dim_stores = spark.read.option("multiLine", "true").json(json_path)
df_dim_stores = df_dim_stores.drop("_id")  # Just in case it exists

# Add surrogate key
df_dim_stores.createOrReplaceTempView("stores")
sql_stores = """
    SELECT *, ROW_NUMBER() OVER (ORDER BY store_id) AS store_key
    FROM stores
"""
df_dim_stores = spark.sql(sql_stores)

# Reorder columns based on real schema
ordered_columns = ['store_key', 'store_id', 'manager_staff_id', 'address_id', 'last_update']
df_dim_stores = df_dim_stores.select(*ordered_columns)

# Preview result
df_dim_stores.show(2, truncate=False)


+---------+--------+----------------+----------+-------------+
|store_key|store_id|manager_staff_id|address_id|last_update  |
+---------+--------+----------------+----------+-------------+
|1        |1       |1               |1         |1139979432000|
|2        |2       |2               |2         |1139979432000|
+---------+--------+----------------+----------+-------------+



##### 2.2.2. Make Necessary Transformations to the New Dataframe

In [41]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Rename ID column
df_dim_stores = df_dim_stores.withColumnRenamed("id", "store_id")

# Add surrogate primary key
df_dim_stores = df_dim_stores.withColumn(
    "store_key",
    row_number().over(Window.orderBy("store_id"))
)

# Reorder columns (put store_key first)
cols = ["store_key"] + [col for col in df_dim_stores.columns if col != "store_key"]
df_dim_stores = df_dim_stores.select(cols)

# Optional: Preview
df_dim_stores.show(5, truncate=False)

+---------+--------+----------------+----------+-------------+
|store_key|store_id|manager_staff_id|address_id|last_update  |
+---------+--------+----------------+----------+-------------+
|1        |1       |1               |1         |1139979432000|
|2        |2       |2               |2         |1139979432000|
+---------+--------+----------------+----------+-------------+



##### 2.2.3. Save as the <span style="color:darkred">df_dim_stores</span> table in the Data lakehouse

In [44]:
df_dim_stores.write.saveAsTable(f"{dest_database}.dim_stores", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.dim_stores LIMIT 5").show(truncate=False)

+---------+--------+----------------+----------+-------------+
|store_key|store_id|manager_staff_id|address_id|last_update  |
+---------+--------+----------------+----------+-------------+
|1        |1       |1               |1         |1139979432000|
|2        |2       |2               |2         |1139979432000|
+---------+--------+----------------+----------+-------------+



##### 2.2.4: Validate Store Dimension Table Structure and Contents

In [47]:
# Describe the table structure
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_stores").show(truncate=False)

# Preview first 5 rows
spark.sql(f"SELECT * FROM {dest_database}.dim_stores LIMIT 5").show(truncate=False)

+----------------------------+-------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                            |comment|
+----------------------------+-------------------------------------------------------------------------------------+-------+
|store_key                   |int                                                                                  |NULL   |
|store_id                    |bigint                                                                               |NULL   |
|manager_staff_id            |bigint                                                                               |NULL   |
|address_id                  |bigint                                                                               |NULL   |
|last_update                 |bigint                                                                               |NULL   |


##### 2.4. Ingest and Transform Store Dimension (Simulated MongoDB JSON)
##### 2.4.1. Load Store JSON Data into Spark DataFrame <span style="color:darkred">Invoices</span> Collection

In [50]:
# === Load store_data.json as replacement for MongoDB store dimension ===
store_json = os.path.join(batch_dir, 'store_data.json')
print(f"Loading store data from: {store_json}")

# Load JSON
df_dim_stores = (
    spark.read
         .option("multiLine", True)
         .json(store_json)
)

# Print schema and preview
df_dim_stores.printSchema()
df_dim_stores.show(5, truncate=False)

Loading store data from: /Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/store_data.json
root
 |-- address_id: long (nullable = true)
 |-- last_update: long (nullable = true)
 |-- manager_staff_id: long (nullable = true)
 |-- store_id: long (nullable = true)

+----------+-------------+----------------+--------+
|address_id|last_update  |manager_staff_id|store_id|
+----------+-------------+----------------+--------+
|1         |1139979432000|1               |1       |
|2         |1139979432000|2               |2       |
+----------+-------------+----------------+--------+



##### 2.4.2. Apply Transformations and Add Surrogate Key to Store Dimension

In [53]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# --- Rename 'id' to 'store_id' if needed (depends on your JSON structure) ---
df_dim_stores = df_dim_stores.withColumnRenamed("id", "store_id")

# --- Add surrogate key ---
window_spec = Window.orderBy("store_id")
df_dim_stores = df_dim_stores.withColumn("store_key", row_number().over(window_spec))

# --- Reorder columns (adjust based on your actual schema) ---
cols = ["store_key"] + [col for col in df_dim_stores.columns if col != "store_key"]
df_dim_stores = df_dim_stores.select(cols)

# --- Preview ---
df_dim_stores.show(2, truncate=False)

+---------+----------+-------------+----------------+--------+
|store_key|address_id|last_update  |manager_staff_id|store_id|
+---------+----------+-------------+----------------+--------+
|1        |1         |1139979432000|1               |1       |
|2        |2         |1139979432000|2               |2       |
+---------+----------+-------------+----------------+--------+



##### 2.4.3. Save Store Dimension as dim_stores Table in the Lakehouse

In [56]:
df_dim_stores.write.saveAsTable(f"{dest_database}.dim_stores", mode="overwrite")

##### 2.4.4. Validate Store Dimension Table Structure and Preview Records

In [59]:
spark.sql(f"DESCRIBE {dest_database}.dim_stores").show(truncate=False)
spark.sql(f"SELECT * FROM {dest_database}.dim_stores LIMIT 5").show(truncate=False)

+----------------+---------+-------+
|col_name        |data_type|comment|
+----------------+---------+-------+
|store_key       |int      |NULL   |
|address_id      |bigint   |NULL   |
|last_update     |bigint   |NULL   |
|manager_staff_id|bigint   |NULL   |
|store_id        |bigint   |NULL   |
+----------------+---------+-------+

+---------+----------+-------------+----------------+--------+
|store_key|address_id|last_update  |manager_staff_id|store_id|
+---------+----------+-------------+----------------+--------+
|1        |1         |1139979432000|1               |1       |
|2        |2         |1139979432000|2               |2       |
+---------+----------+-------------+----------------+--------+



### 3.0. Ingest and Transform Language Dimension (MySQL Source)
#### 3.1. Read Language Table from MySQL Data Warehouse
##### 3.1.1. Use Spark JDBC to Load language Table from MySQL

In [62]:
#I was having this driver Java error and couldn't fix it
#I don't know why this driver's not working, because it was working for the stuff I did before 
#So, I loaded them in locally as CSV files

# Set the path to the date dimension CSV (adjust path if needed)
dim_date_csv = os.path.join(batch_dir, 'dim_date.csv')
print(f"Loading dim_date from: {dim_date_csv}")

# Load into Spark
df_dim_date = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(dim_date_csv)
)

# Preview
df_dim_date.printSchema()
df_dim_date.show(5, truncate=False)

Loading dim_date from: /Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/dim_date.csv
root
 |-- date_key: integer (nullable = true)
 |-- full_date: date (nullable = true)
 |-- date_name: string (nullable = true)
 |-- date_name_us: string (nullable = true)
 |-- date_name_eu: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_name_of_week: string (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- weekday_weekend: string (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- month_of_year: integer (nullable = true)
 |-- is_last_day_of_month: string (nullable = true)
 |-- calendar_quarter: integer (nullable = true)
 |-- calendar_year: integer (nullable = true)
 |-- calendar_year_month: timestamp (nullable = true)
 |-- calendar_year_qtr: string (nullable = true)
 |-- fiscal_month_of_year: integer (nullable = true)
 |-- fiscal_quarter: integ

##### 3.1.2. Save as the <span style="color:darkred">dim_date</span> table in the Data Lakehouse

In [65]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

##### 3.1.3. Unit Test: Describe and Preview Table

In [68]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date").show(truncate=False)
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 5").show(truncate=False)

+--------------------+---------+-------+
|col_name            |data_type|comment|
+--------------------+---------+-------+
|date_key            |int      |NULL   |
|full_date           |date     |NULL   |
|date_name           |string   |NULL   |
|date_name_us        |string   |NULL   |
|date_name_eu        |string   |NULL   |
|day_of_week         |int      |NULL   |
|day_name_of_week    |string   |NULL   |
|day_of_month        |int      |NULL   |
|day_of_year         |int      |NULL   |
|weekday_weekend     |string   |NULL   |
|week_of_year        |int      |NULL   |
|month_name          |string   |NULL   |
|month_of_year       |int      |NULL   |
|is_last_day_of_month|string   |NULL   |
|calendar_quarter    |int      |NULL   |
|calendar_year       |int      |NULL   |
|calendar_year_month |timestamp|NULL   |
|calendar_year_qtr   |string   |NULL   |
|fiscal_month_of_year|int      |NULL   |
|fiscal_quarter      |int      |NULL   |
+--------------------+---------+-------+
only showing top

#### 3.2. Populate the <span style="color:darkred">Product Dimension</span>
##### 3.2.1. Fetch data from the <span style="color:darkred">Products</span> table in MySQL

In [71]:
# 3.2.1 Fetch data from film_data.json (Product Dimension)

# Set the correct path to your JSON file
products_json = os.path.join(batch_dir, 'film_data.json')
print(f"Loading products from: {products_json}")

# Load into Spark DataFrame
df_dim_products = (
    spark.read
         .option("multiLine", True)
         .json(products_json)
)

# Preview structure and content
df_dim_products.printSchema()
df_dim_products.show(5, truncate=False)

Loading products from: /Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/film_data.json
root
 |-- description: string (nullable = true)
 |-- film_id: long (nullable = true)
 |-- language_id: long (nullable = true)
 |-- last_update: long (nullable = true)
 |-- length: long (nullable = true)
 |-- original_language_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- release_year: long (nullable = true)
 |-- rental_duration: long (nullable = true)
 |-- rental_rate: double (nullable = true)
 |-- replacement_cost: double (nullable = true)
 |-- special_features: string (nullable = true)
 |-- title: string (nullable = true)

+---------------------------------------------------------------------------------------------------------------------+-------+-----------+-------------+------+--------------------+------+------------+---------------+-----------+----------------+--------------------------------+----------------+
|description                                    

##### 3.2.2. Perform any Necessary Transformations

In [74]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Rename the 'film_id' column to 'product_id' for consistency
df_dim_products = df_dim_products.withColumnRenamed("film_id", "product_id")

# Drop columns not needed (update if needed)
columns_to_drop = ['description', 'attachments']  # Remove if these don't exist
existing_cols = df_dim_products.columns
df_dim_products = df_dim_products.drop(*[col for col in columns_to_drop if col in existing_cols])

# Add surrogate primary key
window_spec = Window.orderBy("product_id")
df_dim_products = df_dim_products.withColumn("product_key", row_number().over(window_spec))

# Reorder columns
cols = ["product_key"] + [col for col in df_dim_products.columns if col != "product_key"]
df_dim_products = df_dim_products.select(cols)

# Preview
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,product_id,language_id,last_update,length,original_language_id,rating,release_year,rental_duration,rental_rate,replacement_cost,special_features,title
0,1,1,1,1139979822000,86,,PG,2006,6,0.99,20.99,"Deleted Scenes,Behind the Scenes",ACADEMY DINOSAUR
1,2,2,1,1139979822000,48,,G,2006,3,4.99,12.99,"Trailers,Deleted Scenes",ACE GOLDFINGER


##### 3.2.3. Save as the <span style="color:darkred">dim_products</span> table in the Data Lakehouse

In [77]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

##### 3.2.4. Unit Test: Describe and Preview Table

In [80]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products").show(truncate=False)
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 5").show(truncate=False)

+----------------------------+----------------------------+-------+
|col_name                    |data_type                   |comment|
+----------------------------+----------------------------+-------+
|product_key                 |int                         |NULL   |
|product_id                  |bigint                      |NULL   |
|language_id                 |bigint                      |NULL   |
|last_update                 |bigint                      |NULL   |
|length                      |bigint                      |NULL   |
|original_language_id        |string                      |NULL   |
|rating                      |string                      |NULL   |
|release_year                |bigint                      |NULL   |
|rental_duration             |bigint                      |NULL   |
|rental_rate                 |double                      |NULL   |
|replacement_cost            |double                      |NULL   |
|special_features            |string            

### 4.0. Verify Dimension Tables

In [83]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,rental_dlh,dim_customers,False
1,rental_dlh,dim_date,False
2,rental_dlh,dim_products,False
3,rental_dlh,dim_stores,False
4,,customers,True
5,,stores,True


## Section III: Integrate Reference Data with Real-Time Data
### 6.0. Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Orders</span> Fact Data  
#### 6.1. Verify the location of the source data files on the file system

In [86]:
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,northwind_orders_01.json,9609,2025-03-26 22:08:45
1,northwind_orders_02.json,9103,2025-03-26 22:08:45
2,northwind_orders_03.json,9008,2025-03-26 22:08:45


#### 6.2. Create the Bronze Layer: Stage <span style="color:darkred">Orders Fact table</span> Data
##### 6.2.1. Read "Raw" JSON file data into a Stream

In [89]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),  # or DateType() if formatted correctly
    StructField("ship_date", StringType(), True),
    StructField("status", StringType(), True),
    StructField("total", DoubleType(), True)
])

In [90]:
# Clean old outputs (recommended before starting)
remove_directory_tree(orders_output_bronze)
remove_directory_tree(os.path.join(orders_output_bronze, '_checkpoint'))

# Load the streaming orders data (Bronze layer)
df_orders_bronze = (
    spark.readStream
         .format("json")
         .schema(orders_schema)  
         .option("maxFilesPerTrigger", 1)
         .option("multiLine", "true")
         .load(orders_stream_dir)
)
df_orders_bronze.isStreaming

True

In [91]:
remove_directory_tree(orders_output_bronze)
remove_directory_tree(os.path.join(orders_output_bronze, '_checkpoint'))
remove_directory_tree(orders_output_silver)
remove_directory_tree(os.path.join(orders_output_silver, '_checkpoint'))

"Directory '/Users/soniasadani/Documents/04-PySpark/spark-warehouse/rental_dlh.db/fact_orders/silver/_checkpoint' does not exist."

##### 6.2.2. Write the Streaming Data to a Parquet file

In [96]:
import os

# Show what's in your retail-org directory
os.listdir("/Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders")

['northwind_orders_02.json',
 'northwind_orders_03.json',
 'northwind_orders_01.json']

In [98]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import current_timestamp, input_file_name

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("ship_date", StringType(), True),
    StructField("status", StringType(), True),
    StructField("total", DoubleType(), True)
])

# Clean any existing outputs
remove_directory_tree(orders_output_bronze)
remove_directory_tree(os.path.join(orders_output_bronze, '_checkpoint'))

# Bronze streaming read
df_orders_bronze = (
    spark.readStream
         .format("json")
         .schema(orders_schema)
         .option("maxFilesPerTrigger", 1)
         .option("multiLine", "true")
         .load(orders_stream_dir)
         .withColumn("receipt_time", current_timestamp())
         .withColumn("source_file", input_file_name())
)

# Bronze streaming write
orders_bronze_query = (
    df_orders_bronze.writeStream
        .format("parquet")
        .outputMode("append")
        .queryName("orders_bronze")
        .trigger(availableNow=True)
        .option("checkpointLocation", os.path.join(orders_output_bronze, '_checkpoint'))
        .option("compression", "snappy")
        .start(orders_output_bronze)
)

orders_bronze_query.awaitTermination()

In [99]:
df_orders_bronze_staged = spark.read.parquet(orders_output_bronze)
df_orders_bronze_staged.show(5, truncate=False) 

+--------+-----------+-------------------+---------+------+-----+-----------------------+------------------------------------------------------------------------------------------------------------+
|order_id|customer_id|order_date         |ship_date|status|total|receipt_time           |source_file                                                                                                 |
+--------+-----------+-------------------+---------+------+-----+-----------------------+------------------------------------------------------------------------------------------------------------+
|30      |27         |2006-01-15 00:00:00|NULL     |NULL  |NULL |2025-05-26 17:16:58.196|file:///Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders/northwind_orders_01.json|
|30      |27         |2006-01-15 00:00:00|NULL     |NULL  |NULL |2025-05-26 17:16:58.196|file:///Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders/northwind_orders_01.json|
|31  

##### 6.2.3. Unit Test: Implement Query Monitoring

In [101]:
print(f"Query ID: {orders_bronze_query.id}")
print(f"Query Name: {orders_bronze_query.name}")
print(f"Query Status: {orders_bronze_query.status}")

Query ID: 8cca1f39-919b-4825-923e-79202c2c7a7a
Query Name: orders_bronze
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


In [102]:
orders_bronze_query.awaitTermination()

In [103]:
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,northwind_orders_01.json,9609,2025-03-26 22:08:45
1,northwind_orders_02.json,9103,2025-03-26 22:08:45
2,northwind_orders_03.json,9008,2025-03-26 22:08:45


In [109]:
spark.read.option("multiLine", "true").json(orders_stream_dir).show(truncate=False)

+-----------+--------+-----------+-------------------+---------------+--------------------+--------+------------+-------------------+------------+----------+--------+-------------------+----------+------------+--------+-----+----------+
|customer_id|discount|employee_id|order_date         |order_detail_id|order_details_status|order_id|order_status|paid_date          |payment_type|product_id|quantity|shipped_date       |shipper_id|shipping_fee|tax_rate|taxes|unit_price|
+-----------+--------+-----------+-------------------+---------------+--------------------+--------+------------+-------------------+------------+----------+--------+-------------------+----------+------------+--------+-----+----------+
|27         |0       |9          |2006-01-15 00:00:00|27             |Invoiced            |30      |Closed      |2006-01-15 00:00:00|Check       |34        |100.0   |2006-01-22 00:00:00|2         |200.0       |0       |0.0  |14.0      |
|27         |0       |9          |2006-01-15 00:00:0

In [111]:
spark.read.parquet(orders_output_bronze).show(5, truncate=False)

+--------+-----------+-------------------+---------+------+-----+-----------------------+------------------------------------------------------------------------------------------------------------+
|order_id|customer_id|order_date         |ship_date|status|total|receipt_time           |source_file                                                                                                 |
+--------+-----------+-------------------+---------+------+-----+-----------------------+------------------------------------------------------------------------------------------------------------+
|30      |27         |2006-01-15 00:00:00|NULL     |NULL  |NULL |2025-05-26 17:16:58.196|file:///Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders/northwind_orders_01.json|
|30      |27         |2006-01-15 00:00:00|NULL     |NULL  |NULL |2025-05-26 17:16:58.196|file:///Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders/northwind_orders_01.json|
|31  

In [113]:
spark.read.parquet(orders_output_bronze).select("order_date", "customer_id").distinct().show(20, truncate=False)

+-------------------+-----------+
|order_date         |customer_id|
+-------------------+-----------+
|2006-01-22 00:00:00|12         |
|2006-06-05 00:00:00|29         |
|2006-04-05 00:00:00|9          |
|2006-03-06 00:00:00|6          |
|2006-03-24 00:00:00|1          |
|2006-04-30 00:00:00|8          |
|2006-06-07 00:00:00|28         |
|2006-06-08 00:00:00|6          |
|2006-02-06 00:00:00|4          |
|2006-04-07 00:00:00|28         |
|2006-03-24 00:00:00|11         |
|2006-04-05 00:00:00|25         |
|2006-04-05 00:00:00|8          |
|2006-04-08 00:00:00|6          |
|2006-03-24 00:00:00|7          |
|2006-04-03 00:00:00|6          |
|2006-04-05 00:00:00|26         |
|2006-03-10 00:00:00|28         |
|2006-03-22 00:00:00|8          |
|2006-01-20 00:00:00|4          |
+-------------------+-----------+
only showing top 20 rows



In [114]:
df_dim_customers.select("customer_id").distinct().show(20, truncate=False)
df_dim_date.select("full_date", "date_key").distinct().show(20, truncate=False)

+-----------+
|customer_id|
+-----------+
|23200537   |
|31701287   |
|61992418   |
|16243030   |
|17185921   |
|14362584   |
|15424531   |
|14397430   |
|15271602   |
|25765433   |
|16903400   |
|4101697    |
|21442856   |
|3609418    |
|19035609   |
|12866703   |
|19506364   |
|16605520   |
|5404337    |
|26801418   |
+-----------+
only showing top 20 rows

+----------+--------+
|full_date |date_key|
+----------+--------+
|2002-08-27|20020827|
|2004-01-19|20040119|
|2004-02-18|20040218|
|2005-04-16|20050416|
|2005-09-02|20050902|
|2005-10-30|20051030|
|2006-03-01|20060301|
|2006-08-09|20060809|
|2006-12-15|20061215|
|2007-04-11|20070411|
|2007-09-21|20070921|
|2008-06-05|20080605|
|2008-11-29|20081129|
|2009-01-04|20090104|
|2009-01-06|20090106|
|2009-02-09|20090209|
|2009-12-18|20091218|
|2010-05-07|20100507|
|2010-07-04|20100704|
|2010-08-23|20100823|
+----------+--------+
only showing top 20 rows



In [115]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Reload Bronze layer
df_orders_bronze = spark.read.parquet(orders_output_bronze)

# Extract distinct customer IDs
df_dim_customers = df_orders_bronze.select("customer_id").distinct()

# Add surrogate key
window_spec = Window.orderBy("customer_id")
df_dim_customers = df_dim_customers.withColumn("customer_key", row_number().over(window_spec))

# Reorder columns
df_dim_customers = df_dim_customers.select("customer_key", "customer_id")

# Save updated dimension
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [117]:
# Extract distinct order dates
df_dim_date = df_orders_bronze.selectExpr("CAST(order_date AS DATE) AS full_date").distinct()

# Add surrogate key
window_spec = Window.orderBy("full_date")
df_dim_date = df_dim_date.withColumn("date_key", row_number().over(window_spec))

# Reorder
df_dim_date = df_dim_date.select("date_key", "full_date")

# Save updated dimension
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

#### 6.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 6.3.1. Prepare Role-Playing Dimension Primary and Business Keys

In [120]:
# Create role-playing versions of the dim_date table
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("order_full_date"))
df_dim_paid_date = df_dim_date.select(col("date_key").alias("paid_date_key"), col("full_date").alias("paid_full_date"))
df_dim_shipped_date = df_dim_date.select(col("date_key").alias("shipped_date_key"), col("full_date").alias("shipped_full_date"))

##### 6.3.2. Define Silver Query to Join Streaming with Batch Data

In [123]:
from pyspark.sql.functions import col
from pyspark.sql.types import LongType, DateType

df_dim_order_date = df_dim_date.select(
    col("date_key").alias("order_date_key"),
    col("full_date").alias("order_full_date")
)

df_dim_shipped_date = df_dim_date.select(
    col("date_key").alias("shipped_date_key"),
    col("full_date").alias("shipped_full_date")
)

df_orders_silver = (
    spark.readStream
        .format("parquet")
        .load(orders_output_bronze)
        .join(df_dim_customers.select("customer_id", "customer_key"), on="customer_id", how="inner")
        .join(df_dim_order_date, col("order_date").cast(DateType()) == col("order_full_date").cast(DateType()), "inner")
        .join(df_dim_shipped_date, col("ship_date").cast(DateType()) == col("shipped_full_date").cast(DateType()), "left_outer")
        .select(
            col("order_id").cast(LongType()),
            col("customer_key").cast(LongType()),
            col("order_date_key").cast(LongType()),
            col("shipped_date_key").cast(LongType()),
            col("status"),
            col("total"),
            col("receipt_time"),
            col("source_file")
        )
)

In [124]:
df_orders_silver.isStreaming

True

In [127]:
df_orders_silver.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- customer_key: long (nullable = false)
 |-- order_date_key: long (nullable = false)
 |-- shipped_date_key: long (nullable = true)
 |-- status: string (nullable = true)
 |-- total: double (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



##### 6.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [132]:
# Define checkpoint directory for Silver output
orders_checkpoint_silver = os.path.join(orders_output_silver, '_checkpoint')

# Write the Silver stream to Parquet files
orders_silver_query = (
    df_orders_silver.writeStream
        .format("parquet")
        .outputMode("append")
        .queryName("orders_silver")
        .trigger(availableNow=True)
        .option("checkpointLocation", orders_checkpoint_silver)
        .option("compression", "snappy")
        .start(orders_output_silver)
)

# Wait until all files in the stream are processed
orders_silver_query.awaitTermination()

In [133]:
df_silver_check = spark.read.parquet(orders_output_silver)

df_silver_check.printSchema()

df_silver_check.select("order_id", "customer_key", "order_date_key").show(10, truncate=False)

root
 |-- order_id: long (nullable = true)
 |-- customer_key: long (nullable = false)
 |-- order_date_key: long (nullable = false)
 |-- shipped_date_key: long (nullable = true)
 |-- status: string (nullable = true)
 |-- total: double (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)

+--------+------------+--------------+
|order_id|customer_key|order_date_key|
+--------+------------+--------------+
|30      |13          |1             |
|30      |13          |1             |
|31      |3           |2             |
|31      |3           |2             |
|31      |3           |2             |
|32      |10          |3             |
|32      |10          |3             |
|33      |6           |4             |
|34      |3           |5             |
|35      |15          |6             |
+--------+------------+--------------+
only showing top 10 rows



In [134]:
spark.read.parquet(orders_output_silver).show(5)
spark.read.parquet(orders_output_silver).show(5, truncate=False)

+--------+------------+--------------+----------------+------+-----+--------------------+--------------------+
|order_id|customer_key|order_date_key|shipped_date_key|status|total|        receipt_time|         source_file|
+--------+------------+--------------+----------------+------+-----+--------------------+--------------------+
|      30|          13|             1|            NULL|  NULL| NULL|2025-05-26 17:16:...|file:///Users/son...|
|      30|          13|             1|            NULL|  NULL| NULL|2025-05-26 17:16:...|file:///Users/son...|
|      31|           3|             2|            NULL|  NULL| NULL|2025-05-26 17:16:...|file:///Users/son...|
|      31|           3|             2|            NULL|  NULL| NULL|2025-05-26 17:16:...|file:///Users/son...|
|      31|           3|             2|            NULL|  NULL| NULL|2025-05-26 17:16:...|file:///Users/son...|
+--------+------------+--------------+----------------+------+-----+--------------------+--------------------+
o

In [135]:
spark.read.parquet(orders_output_silver).select("order_id", "order_date_key", "customer_key").show(20)

+--------+--------------+------------+
|order_id|order_date_key|customer_key|
+--------+--------------+------------+
|      30|             1|          13|
|      30|             1|          13|
|      31|             2|           3|
|      31|             2|           3|
|      31|             2|           3|
|      32|             3|          10|
|      32|             3|          10|
|      33|             4|           6|
|      34|             5|           3|
|      35|             6|          15|
|      36|             7|           2|
|      37|             8|           4|
|      38|             9|          14|
|      39|            10|           6|
|      40|            11|           8|
|      41|            11|           5|
|      42|            11|           8|
|      42|            11|           8|
|      42|            11|           8|
|      43|            11|           9|
+--------+--------------+------------+
only showing top 20 rows



##### 6.3.4. Unit Test: Implement Query Monitoring

In [139]:
print(f"Query ID: {orders_silver_query.id}")
print(f"Query Name: {orders_silver_query.name}")
print(f"Query Status: {orders_silver_query.status}")

Query ID: 18d303ec-f6ad-451e-a5c9-525d5cb23541
Query Name: orders_silver
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


In [143]:
orders_silver_query.awaitTermination()

In [145]:
spark.read.parquet(orders_output_silver).show(5, truncate=False)

+--------+------------+--------------+----------------+------+-----+-----------------------+------------------------------------------------------------------------------------------------------------+
|order_id|customer_key|order_date_key|shipped_date_key|status|total|receipt_time           |source_file                                                                                                 |
+--------+------------+--------------+----------------+------+-----+-----------------------+------------------------------------------------------------------------------------------------------------+
|30      |13          |1             |NULL            |NULL  |NULL |2025-05-26 17:16:58.196|file:///Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders/northwind_orders_01.json|
|30      |13          |1             |NULL            |NULL  |NULL |2025-05-26 17:16:58.196|file:///Users/soniasadani/Documents/04-PySpark/lab_data/retail-org/streaming/orders/northwind_orders

In [147]:
spark.read.parquet(orders_output_silver).count()

58

In [149]:
spark.read.parquet(orders_output_bronze).select("order_id").distinct().count()

40

In [150]:
df_silver_check.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- customer_key: long (nullable = false)
 |-- order_date_key: long (nullable = false)
 |-- shipped_date_key: long (nullable = true)
 |-- status: string (nullable = true)
 |-- total: double (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



In [152]:
df_silver_check.select("order_id", "customer_key", "order_date_key").show(10, truncate=False)

+--------+------------+--------------+
|order_id|customer_key|order_date_key|
+--------+------------+--------------+
|30      |13          |1             |
|30      |13          |1             |
|31      |3           |2             |
|31      |3           |2             |
|31      |3           |2             |
|32      |10          |3             |
|32      |10          |3             |
|33      |6           |4             |
|34      |3           |5             |
|35      |15          |6             |
+--------+------------+--------------+
only showing top 10 rows



In [155]:
df_silver_check.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- customer_key: long (nullable = false)
 |-- order_date_key: long (nullable = false)
 |-- shipped_date_key: long (nullable = true)
 |-- status: string (nullable = true)
 |-- total: double (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



In [157]:
spark.read.parquet(orders_output_silver).select("order_date_key").distinct().orderBy("order_date_key").show(20)

+--------------+
|order_date_key|
+--------------+
|             1|
|             2|
|             3|
|             4|
|             5|
|             6|
|             7|
|             8|
|             9|
|            10|
|            11|
|            12|
|            13|
|            14|
|            15|
|            16|
|            17|
|            18|
|            19|
|            20|
+--------------+
only showing top 20 rows



In [158]:
df_dim_date.select("date_key").distinct().orderBy("date_key").show(20)

+--------+
|date_key|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      12|
|      13|
|      14|
|      15|
|      16|
|      17|
|      18|
|      19|
|      20|
+--------+
only showing top 20 rows



In [160]:
spark.read.parquet(orders_output_silver).select("order_date_key").distinct().orderBy("order_date_key").show(20)

+--------------+
|order_date_key|
+--------------+
|             1|
|             2|
|             3|
|             4|
|             5|
|             6|
|             7|
|             8|
|             9|
|            10|
|            11|
|            12|
|            13|
|            14|
|            15|
|            16|
|            17|
|            18|
|            19|
|            20|
+--------------+
only showing top 20 rows



In [162]:
df_dim_date.select("date_key").distinct().orderBy("date_key").show(20)

+--------+
|date_key|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      12|
|      13|
|      14|
|      15|
|      16|
|      17|
|      18|
|      19|
|      20|
+--------+
only showing top 20 rows



In [164]:
from pyspark.sql.functions import month, date_format

# Add month columns
df_dim_date = df_dim_date.withColumn("month_of_year", month(col("full_date").cast("date")))
df_dim_date = df_dim_date.withColumn("month_name", date_format(col("full_date").cast("date"), "MMMM"))

# Overwrite the table so downstream cells have access to these columns
df_dim_date.write.mode("overwrite").saveAsTable(f"{dest_database}.dim_date")

In [166]:
df_dim_date = spark.read.table(f"{dest_database}.dim_date")

In [167]:
# Static join test — simulate the Gold aggregation with a batch read of Silver
df_orders_silver = spark.read.parquet(orders_output_silver)

df_static_test = (
    df_orders_silver
        .join(
            df_dim_date,
            df_dim_date.date_key.cast("int") == col("order_date_key").cast("int")
        )
        .groupBy("month_of_year", "month_name", "customer_key")
        .agg(count("*").alias("orders_placed"))
        .orderBy("month_of_year", "customer_key")
)

df_static_test.show(20, truncate=False)

+-------------+----------+------------+-------------+
|month_of_year|month_name|customer_key|orders_placed|
+-------------+----------+------------+-------------+
|1            |January   |3           |3            |
|1            |January   |6           |1            |
|1            |January   |10          |2            |
|1            |January   |13          |2            |
|2            |February  |2           |1            |
|2            |February  |3           |1            |
|2            |February  |15          |1            |
|3            |March     |1           |3            |
|3            |March     |4           |1            |
|3            |March     |5           |1            |
|3            |March     |6           |1            |
|3            |March     |8           |4            |
|3            |March     |9           |2            |
|3            |March     |14          |1            |
|4            |April     |2           |4            |
|4            |April     |3 

In [170]:
df_test_join = (
    spark.read.parquet(orders_output_silver)
         .join(df_dim_date, col("order_date_key").cast("int") == df_dim_date["date_key"].cast("int"))
)

df_test_join.select("order_id", "customer_key", "order_date_key", "month_name").show(10, truncate=False)

+--------+------------+--------------+----------+
|order_id|customer_key|order_date_key|month_name|
+--------+------------+--------------+----------+
|30      |13          |1             |January   |
|30      |13          |1             |January   |
|31      |3           |2             |January   |
|31      |3           |2             |January   |
|31      |3           |2             |January   |
|32      |10          |3             |January   |
|32      |10          |3             |January   |
|33      |6           |4             |January   |
|34      |3           |5             |February  |
|35      |15          |6             |February  |
+--------+------------+--------------+----------+
only showing top 10 rows



#### 6.4. Create Gold Layer: Perform Aggregations
##### 6.4.1. Define a Query to Create a Business Report
Create a new Gold table using the PySpark API. The table should include the number of Products sold per Category each Month. The results should include The Month, Product Category and Number of Products sold, sorted by the month number when the orders were placed: e.g., January, February, March.

In [174]:
from pyspark.sql.functions import month, date_format

df_dim_date = df_dim_date.withColumn("month_of_year", month(col("full_date").cast("date")))
df_dim_date = df_dim_date.withColumn("month_name", date_format(col("full_date").cast("date"), "MMMM"))

In [176]:
from pyspark.sql.functions import count, asc
from pyspark.sql.types import IntegerType

df_orders_by_customer_gold = (
    spark.readStream
         .format("parquet")
         .load(orders_output_silver)
         .join(
             df_dim_date,
             df_dim_date.date_key.cast(IntegerType()) == col("order_date_key").cast(IntegerType())
         )
         .groupBy("month_of_year", "month_name", "customer_key")
         .agg(count("*").alias("orders_placed"))
         .orderBy(asc("month_of_year"), asc("customer_key"))
)

df_orders_by_customer_gold.printSchema()

root
 |-- month_of_year: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- customer_key: long (nullable = true)
 |-- orders_placed: long (nullable = false)



In [178]:
# Check 10 example values from Silver fact
spark.read.parquet(orders_output_silver).select("order_date_key").distinct().show(10)

# Check 10 values from the Date Dimension
df_dim_date.select("date_key").distinct().orderBy("date_key").show(10)

+--------------+
|order_date_key|
+--------------+
|            19|
|            22|
|             7|
|             6|
|             9|
|            17|
|             5|
|             1|
|            10|
|             3|
+--------------+
only showing top 10 rows

+--------+
|date_key|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
+--------+
only showing top 10 rows



In [179]:
df_dim_date.select("date_key", "month_of_year", "month_name").orderBy("date_key").show(10, truncate=False)

+--------+-------------+----------+
|date_key|month_of_year|month_name|
+--------+-------------+----------+
|1       |1            |January   |
|2       |1            |January   |
|3       |1            |January   |
|4       |1            |January   |
|5       |2            |February  |
|6       |2            |February  |
|7       |2            |February  |
|8       |3            |March     |
|9       |3            |March     |
|10      |3            |March     |
+--------+-------------+----------+
only showing top 10 rows



##### 6.4.2. Write the Streaming data to a Parquet File in "Complete" mode

In [183]:
for q in spark.streams.active:
    q.stop()

In [185]:
from pyspark.sql.functions import count, asc
from pyspark.sql.types import IntegerType

df_orders_by_customer_gold = (
    spark.readStream
         .format("parquet")
         .load(orders_output_silver)
         .join(
             df_dim_date,
             df_dim_date.date_key.cast(IntegerType()) == col("order_date_key").cast(IntegerType())
         )
         .groupBy("month_of_year", "month_name", "customer_key")
         .agg(count("*").alias("orders_placed"))
         .orderBy(asc("month_of_year"), asc("customer_key"))
)

In [187]:
orders_gold_query = (
    df_orders_by_customer_gold.writeStream
        .format("memory")
        .outputMode("complete")
        .queryName("fact_orders_by_customer_month")
        .start()
)

wait_until_stream_is_ready(orders_gold_query, 3)

                                                                                

The stream has processed 3 batchs


In [188]:
spark.sql("SELECT * FROM fact_orders_by_customer_month ORDER BY month_of_year, customer_key").show(20, truncate=False)

+-------------+----------+------------+-------------+
|month_of_year|month_name|customer_key|orders_placed|
+-------------+----------+------------+-------------+
|1            |January   |3           |3            |
|1            |January   |6           |1            |
|1            |January   |10          |2            |
|1            |January   |13          |2            |
|2            |February  |2           |1            |
|2            |February  |3           |1            |
|2            |February  |15          |1            |
|3            |March     |1           |3            |
|3            |March     |4           |1            |
|3            |March     |5           |1            |
|3            |March     |6           |1            |
|3            |March     |8           |4            |
|3            |March     |9           |2            |
|3            |March     |14          |1            |
|4            |April     |2           |4            |
|4            |April     |3 

##### 6.4.3. Query the Gold Data from Memory

In [190]:
df_fact_orders_by_customer = spark.sql("SELECT * FROM fact_orders_by_customer_month")
df_fact_orders_by_customer.printSchema()
df_fact_orders_by_customer.show(20, truncate=False)

root
 |-- month_of_year: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- customer_key: long (nullable = true)
 |-- orders_placed: long (nullable = false)

+-------------+----------+------------+-------------+
|month_of_year|month_name|customer_key|orders_placed|
+-------------+----------+------------+-------------+
|1            |January   |3           |3            |
|1            |January   |6           |1            |
|1            |January   |10          |2            |
|1            |January   |13          |2            |
|2            |February  |2           |1            |
|2            |February  |3           |1            |
|2            |February  |15          |1            |
|3            |March     |1           |3            |
|3            |March     |4           |1            |
|3            |March     |5           |1            |
|3            |March     |6           |1            |
|3            |March     |8           |4            |
|3        

##### 6.4.4 Create the Final Selection

In [192]:
# Optional sanity check
spark.sql("SELECT COUNT(*) FROM fact_orders_by_customer_month").show()

# Final formatted selection for customer-level Gold table
df_fact_orders_by_customer_gold_final = df_fact_orders_by_customer \
    .select(
        col("month_name").alias("Month"),
        col("customer_key").alias("Customer Key"),
        col("orders_placed").alias("Total Orders")
    ) \
    .orderBy(asc("month_of_year"), asc("Customer Key"))

# Preview the final selection
df_fact_orders_by_customer_gold_final.show(20, truncate=False)

+--------+
|count(1)|
+--------+
|      33|
+--------+

+--------+------------+------------+
|Month   |Customer Key|Total Orders|
+--------+------------+------------+
|January |3           |3           |
|January |6           |1           |
|January |10          |2           |
|January |13          |2           |
|February|2           |1           |
|February|3           |1           |
|February|15          |1           |
|March   |1           |3           |
|March   |4           |1           |
|March   |5           |1           |
|March   |6           |1           |
|March   |8           |4           |
|March   |9           |2           |
|March   |14          |1           |
|April   |2           |4           |
|April   |3           |3           |
|April   |4           |2           |
|April   |6           |3           |
|April   |7           |2           |
|April   |11          |1           |
+--------+------------+------------+
only showing top 20 rows



##### 6.4.5. Load the Final Results into a New Table and Display the Results

In [194]:
# Save the final Gold-layer results into a managed table
df_fact_orders_by_customer_gold_final.write.saveAsTable(
    f"{dest_database}.fact_orders_by_customer_month",
    mode="overwrite"
)

# Confirm it's saved correctly
spark.sql(f"SELECT * FROM {dest_database}.fact_orders_by_customer_month").show()

+--------+------------+------------+
|   Month|Customer Key|Total Orders|
+--------+------------+------------+
|February|           2|           1|
|February|           3|           1|
|February|          15|           1|
| January|           3|           3|
| January|           6|           1|
| January|          13|           2|
| January|          10|           2|
|   April|           2|           4|
|   March|           4|           1|
|   April|          15|           1|
|   March|          14|           1|
|   April|          11|           1|
|   April|           3|           3|
|   March|           5|           1|
|   March|           6|           1|
|   March|           1|           3|
|   April|           6|           3|
|   April|          12|           3|
|   March|           8|           4|
|   April|           7|           2|
+--------+------------+------------+
only showing top 20 rows



### 7.0. Final Project Wrap-Up & Validation
#### 7.1. Validate All Tables Exist

In [196]:
spark.sql(f"USE {dest_database}")
spark.sql("SHOW TABLES").show()

+----------+--------------------+-----------+
| namespace|           tableName|isTemporary|
+----------+--------------------+-----------+
|rental_dlh|       dim_customers|      false|
|rental_dlh|            dim_date|      false|
|rental_dlh|        dim_products|      false|
|rental_dlh|          dim_stores|      false|
|rental_dlh|fact_orders_by_cu...|      false|
|          |           customers|       true|
|          |fact_orders_by_cu...|       true|
|          |              stores|       true|
+----------+--------------------+-----------+



#### 7.2. Optional: Save Gold Table as Parquet

In [198]:
# Define the output directory for Gold layer if not already defined
gold_output_dir = os.path.join(base_dir, "spark-warehouse", dest_database, "fact_orders", "gold")

In [199]:
df_fact_orders_by_customer_gold_final.write.mode("overwrite").parquet(
    os.path.join(gold_output_dir, "fact_orders_by_customer_month")
)

#### 7.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 7.3.1. Prepare Role-Playing Dimension Primary and Business Keys

### 8.0. Stop the Spark Session

In [202]:
spark.stop()