### 1.0. Import Required Libraries

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

os.environ["SPARK_LOCAL_IP"] = "127.0.0.1" # for some reason if i don't do this then it won't create a new spark session ...

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

C:\spark-3.5.4-bin-hadoop3


### 2.0. Instantiate Global Variables

In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "chinook",
    "conn_props" : {
        "user" : "your_mysql_username",
        "password" : "your_mysql_password",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas", # or "local" if running MongoDB locally
    "user_name" : "your_mongodb_username",
    "password" : "your_mongodb_password",
    "cluster_name" : "your_cluster_name",
    "cluster_subnet" : "your_cluster_subnet",
    "db_name" : "chinook",
    "collection" : "",
    "null_column_threshold" : 0.5
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'chinook')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

invoices_stream_dir = os.path.join(stream_dir, 'invoices')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "chinook_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

invoice_sales_output_bronze = os.path.join(database_dir, 'fact_invoice_sales', 'bronze')
invoice_sales_output_silver = os.path.join(database_dir, 'fact_invoice_sales', 'silver')
invoice_sales_output_gold = os.path.join(database_dir, 'fact_invoice_sales', 'gold')

### 3.0 Define Global Functions

In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Chinook Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r', encoding='utf-8') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

### 4.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [63]:
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\emili\\ds2002\\ds-2002-capstone\\spark-warehouse\\chinook_dlh.db' has been removed successfully."

### 5.0. Create a New Spark Session

In [5]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

### 6.0. Create a New Metadata Database.

In [6]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Capstone Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Capstone');
"""
spark.sql(sql_create_db)

DataFrame[]

## Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
### 1.0. Fetch Data from the File System
#### 1.1. Verify the location of the source data files on the file system

In [7]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,chinook_albums.csv,11366,2025-12-15 06:36:48.386648178
1,chinook_artists.csv,7434,2025-12-15 06:36:59.900135040
2,chinook_customers.json,20081,2025-12-15 06:37:49.282068253
3,chinook_genres.csv,346,2025-12-15 06:37:15.528573274
4,chinook_mediatypes.csv,146,2025-12-15 06:37:40.556010723
5,chinook_tracks.csv,70308,2025-12-15 06:36:31.412338734


#### 1.2. Populate the <span style="color:darkred">Track Dimension</span>
##### 1.2.1. Use PySpark to Read data from CSV files (track, album, artist, genre, mediatype)

In [8]:
track_csv = os.path.join(batch_dir, 'chinook_tracks.csv')
print(track_csv)

df_tracks = spark.read.format('csv').options(header='true', inferSchema='true').load(track_csv)
df_tracks.toPandas().head(2)

C:\Users\emili\ds2002\ds-2002-capstone\lab_data\chinook\batch\chinook_tracks.csv


Unnamed: 0,TrackId,Name,AlbumId,MediaTypeId,GenreId,Composer,Milliseconds,Bytes,UnitPrice
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99
1,2,Balls to the Wall,2,2,1,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...",342562,5510424,0.99


In [9]:
album_csv = os.path.join(batch_dir, 'chinook_albums.csv')
print(album_csv)

df_albums = spark.read.format('csv').options(header='true', inferSchema='true').load(album_csv)
df_albums.toPandas().head(2)

C:\Users\emili\ds2002\ds-2002-capstone\lab_data\chinook\batch\chinook_albums.csv


Unnamed: 0,AlbumId,Title,ArtistId
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2


In [10]:
artist_csv = os.path.join(batch_dir, 'chinook_artists.csv')
print(artist_csv)

df_artists = spark.read.format('csv').options(header='true', inferSchema='true').load(artist_csv)
df_artists.toPandas().head(2)

C:\Users\emili\ds2002\ds-2002-capstone\lab_data\chinook\batch\chinook_artists.csv


Unnamed: 0,ArtistId,Name
0,1,AC/DC
1,2,Accept


In [11]:
genre_csv = os.path.join(batch_dir, 'chinook_genres.csv')
print(genre_csv)

df_genres = spark.read.format('csv').options(header='true', inferSchema='true').load(genre_csv)
df_genres.toPandas().head(2)

C:\Users\emili\ds2002\ds-2002-capstone\lab_data\chinook\batch\chinook_genres.csv


Unnamed: 0,GenreId,Name
0,1,Rock
1,2,Jazz


In [12]:
mediatype_csv = os.path.join(batch_dir, 'chinook_mediatypes.csv')
print(mediatype_csv)

df_mediatypes = spark.read.format('csv').options(header='true', inferSchema='true').load(mediatype_csv)
df_mediatypes.toPandas().head(2)

C:\Users\emili\ds2002\ds-2002-capstone\lab_data\chinook\batch\chinook_mediatypes.csv


Unnamed: 0,MediaTypeId,Name
0,1,MPEG audio file
1,2,Protected AAC audio file


##### 1.2.2. Make Necessary Transformations to the New DataFrame

In [13]:
# ----------------------------------------------------------------------------------
# Rename columns (fix casing)
# ----------------------------------------------------------------------------------
df_tracks = (
    df_tracks
    .withColumnRenamed("TrackId", "track_id")
    .withColumnRenamed("Name", "track_name")
    .withColumnRenamed("AlbumId", "album_id")
    .withColumnRenamed("GenreId", "genre_id")
    .withColumnRenamed("MediaTypeId", "media_type_id")
    .withColumnRenamed("UnitPrice", "unit_price")
)

df_albums = (
    df_albums
    .withColumnRenamed("Title", "album_title")
    .withColumnRenamed("AlbumId", "album_id")
    .withColumnRenamed("ArtistId", "artist_id")
)

df_artists = (
    df_artists
    .withColumnRenamed("Name", "artist_name")
    .withColumnRenamed("ArtistId", "artist_id")
)

df_genres = (
    df_genres
    .withColumnRenamed("Name", "genre_name")
    .withColumnRenamed("GenreId", "genre_id")
)

df_mediatypes = (
    df_mediatypes
    .withColumnRenamed("Name", "media_type")
    .withColumnRenamed("MediaTypeId", "media_type_id")
)

In [14]:
# ----------------------------------------------------------------------------------
# Merge Tables (join tracks w/ albuns, artists, genres, mediatypes)
# ----------------------------------------------------------------------------------
df_dim_track = ( 
    df_tracks .join(df_albums, "album_id", "left") 
    .join(df_artists, "artist_id", "left") 
    .join(df_genres, "genre_id", "left")
    .join(df_mediatypes, "media_type_id", "left")
)

In [15]:
# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_track.createOrReplaceTempView("tracks")

sql_dim_track = """
    SELECT *,
           ROW_NUMBER() OVER (ORDER BY track_id) AS track_key
    FROM tracks
"""

df_dim_track = spark.sql(sql_dim_track)

# ----------------------------------------------------------------------------------
# Rename/Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------


ordered_columns = [
    "track_key",  "track_id",  "track_name", "album_title", "artist_name", "genre_name", "media_type", "unit_price"]

df_dim_track = df_dim_track[ordered_columns]
df_dim_track.toPandas().head(2)

Unnamed: 0,track_key,track_id,track_name,album_title,artist_name,genre_name,media_type,unit_price
0,1,1,For Those About To Rock (We Salute You),For Those About To Rock We Salute You,AC/DC,Rock,MPEG audio file,0.99
1,2,2,Balls to the Wall,Balls to the Wall,Accept,Rock,Protected AAC audio file,0.99


##### 1.2.3. Save as the <span style="color:darkred">dim_track</span> table in the Data Lakehouse

In [16]:
df_dim_track.write.saveAsTable(f"{dest_database}.dim_track", mode="overwrite")

##### 1.2.4. Unit Test: Describe and Preview Table

In [17]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_track;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_track LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|           track_key|                 int|   NULL|
|            track_id|                 int|   NULL|
|          track_name|              string|   NULL|
|         album_title|              string|   NULL|
|         artist_name|              string|   NULL|
|          genre_name|              string|   NULL|
|          media_type|              string|   NULL|
|          unit_price|              double|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|         chinook_dlh|       |
|               Table|           dim_track|       |
|        Created Time|Mon Dec 15 22:31:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|           

Unnamed: 0,track_key,track_id,track_name,album_title,artist_name,genre_name,media_type,unit_price
0,1,1,For Those About To Rock (We Salute You),For Those About To Rock We Salute You,AC/DC,Rock,MPEG audio file,0.99
1,2,2,Balls to the Wall,Balls to the Wall,Accept,Rock,Protected AAC audio file,0.99


### 2.0. Fetch Reference Data from a MongoDB Atlas Database
#### 2.1. Create a New MongoDB Database, and Load JSON File into a New MongoDB Collection

In [18]:
client = get_mongo_client(**mongodb_args)

json_files = {"customers" : "chinook_customers.json"}

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

#### 2.2. Populate the <span style="color:darkred">Customers Dimension</span>
##### 2.2.1. Fetch Data from the New MongoDB <span style="color:darkred">Customers</span> Collection

In [19]:
mongodb_args["collection"] = "customers"

df_dim_customers = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_customers.toPandas().head(2)

Unnamed: 0,Address,City,Country,CustomerId,Email,FirstName,LastName,Phone,PostalCode,State,SupportRepId
0,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,Brazil,1,luisg@embraer.com.br,Luís,Gonçalves,+55 (12) 3923-5555,12227-000,SP,3
1,Theodor-Heuss-Straße 34,Stuttgart,Germany,2,leonekohler@surfeu.de,Leonie,Köhler,+49 0711 2842222,70174,,5


##### 2.2.2. Make Necessary Transformations to the New Dataframe

In [20]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'customer_id' + fix other column names ------------------------------------------
# ----------------------------------------------------------------------------------

df_dim_customers = df_dim_customers.withColumnRenamed("id", "customer_id")

df_dim_customers = (
    df_dim_customers
    .withColumnRenamed("CustomerId", "customer_id")
    .withColumnRenamed("Address", "address")
    .withColumnRenamed("City", "city")
    .withColumnRenamed("Country", "country")
    .withColumnRenamed("Email", "email")
    .withColumnRenamed("FirstName", "first_name")
    .withColumnRenamed("LastName", "last_name")
    .withColumnRenamed("Phone", "phone")
    .withColumnRenamed("PostalCode", "postal_code")
    .withColumnRenamed("State", "state")
    .withColumnRenamed("SupportRepId", "support_rep_id")
)

# ----------------------------------------------------------------------------------
# Add Primary Key column using the SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------

df_dim_customers.createOrReplaceTempView("customers")
sql_customers = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY customer_id) AS customer_key
    FROM customers;
"""
df_dim_customers = spark.sql(sql_customers)

# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------

ordered_columns = ['customer_key', 'customer_id', 'first_name', 'last_name'
                   , 'phone', 'email' , 'address', 'city', 'state', 'country'
                   , 'postal_code', 'support_rep_id']

df_dim_customers = df_dim_customers[ordered_columns]
df_dim_customers.toPandas().head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,phone,email,address,city,state,country,postal_code,support_rep_id
0,1,1,Luís,Gonçalves,+55 (12) 3923-5555,luisg@embraer.com.br,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,3
1,2,2,Leonie,Köhler,+49 0711 2842222,leonekohler@surfeu.de,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,5


##### 2.2.3. Save as the <span style="color:darkred">dim_customers</span> table in the Data lakehouse

In [21]:
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

##### 2.2.4. Unit Test: Describe and Preview Table

In [22]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        customer_key|                 int|   NULL|
|         customer_id|                 int|   NULL|
|          first_name|              string|   NULL|
|           last_name|              string|   NULL|
|               phone|              string|   NULL|
|               email|              string|   NULL|
|             address|              string|   NULL|
|                city|              string|   NULL|
|               state|              string|   NULL|
|             country|              string|   NULL|
|         postal_code|              string|   NULL|
|      support_rep_id|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|         chinook_dlh|       |
|           

Unnamed: 0,customer_key,customer_id,first_name,last_name,phone,email,address,city,state,country,postal_code,support_rep_id
0,1,1,Luís,Gonçalves,+55 (12) 3923-5555,luisg@embraer.com.br,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,3
1,2,2,Leonie,Köhler,+49 0711 2842222,leonekohler@surfeu.de,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,5


### 3.0. Fetch Reference Data from a MySQL Database
#### 3.1. Populate the <span style="color:darkred">Date Dimension</span>
##### 3.1.1 Fetch data from the <span style="color:darkred">dim_date</span> table in MySQL

In [23]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

##### 3.1.2. Save as the <span style="color:darkred">dim_date</span> table in the Data Lakehouse

In [24]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

##### 3.1.3. Unit Test: Describe and Preview Table

In [25]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### 3.2. Populate the <span style="color:darkred">Product Dimension</span>
##### 3.2.1. Fetch data from the <span style="color:darkred">Employees</span> table in MySQL

In [26]:
sql_dim_employees = f"SELECT * FROM {mysql_args['db_name']}.employee"
df_dim_employees = get_mysql_dataframe(spark, sql_dim_employees, **mysql_args)

##### 3.2.2. Perform any Necessary Transformations

In [27]:
# ----------------------------------------------------------------------------------
# Rename the columns
# ----------------------------------------------------------------------------------

df_dim_employees = (
    df_dim_employees
    .withColumnRenamed("EmployeeId", "employee_id")
    .withColumnRenamed("LastName", "last_name")
    .withColumnRenamed("FirstName", "first_name")
    .withColumnRenamed("Title", "title")
    .withColumnRenamed("ReportsTo", "reports_to")
    .withColumnRenamed("BirthDate", "birth_date")
    .withColumnRenamed("HireDate", "hire_date")
    .withColumnRenamed("Address", "address")
    .withColumnRenamed("City", "city")
    .withColumnRenamed("State", "state")
    .withColumnRenamed("Country", "country")
    .withColumnRenamed("PostalCode", "postal_code")
    .withColumnRenamed("Phone", "phone")
    .withColumnRenamed("Fax", "fax")
    .withColumnRenamed("Email", "email")
)

df_dim_employees = (
    df_dim_employees
    .withColumn("birth_date", date_format(col("birth_date"), "yyyy-MM-dd"))
    .withColumn("hire_date", date_format(col("hire_date"), "yyyy-MM-dd"))
)

# ----------------------------------------------------------------------------------
# Add Primary Key column using the SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------

df_dim_employees.createOrReplaceTempView("employees")
sql_employees = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY employee_id) AS employee_key
    FROM employees;
"""
df_dim_employees = spark.sql(sql_employees)


# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------

ordered_columns = [
    'employee_key', 'employee_id', 'first_name', 'last_name'
    , 'title', 'reports_to', 'birth_date', 'hire_date'
    ,'phone','fax', 'email', 'address'
    , 'city','state', 'country', 'postal_code'
]

df_dim_employees = df_dim_employees[ordered_columns]
df_dim_employees.toPandas().head(2)


Unnamed: 0,employee_key,employee_id,first_name,last_name,title,reports_to,birth_date,hire_date,phone,fax,email,address,city,state,country,postal_code
0,1,1,Andrew,Adams,General Manager,,1962-02-18,2002-08-14,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1
1,2,2,Nancy,Edwards,Sales Manager,1.0,1958-12-08,2002-05-01,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3


##### 3.2.3. Save as the <span style="color:darkred">dim_employees</span> table in the Data Lakehouse

In [28]:
df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

##### 3.2.4. Unit Test: Describe and Preview Table

In [29]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|        employee_key|          int|   NULL|
|         employee_id|          int|   NULL|
|          first_name|  varchar(20)|   NULL|
|           last_name|  varchar(20)|   NULL|
|               title|  varchar(30)|   NULL|
|          reports_to|          int|   NULL|
|          birth_date|       string|   NULL|
|           hire_date|       string|   NULL|
|               phone|  varchar(24)|   NULL|
|                 fax|  varchar(24)|   NULL|
|               email|  varchar(60)|   NULL|
|             address|  varchar(70)|   NULL|
|                city|  varchar(40)|   NULL|
|               state|  varchar(40)|   NULL|
|             country|  varchar(40)|   NULL|
|         postal_code|  varchar(10)|   NULL|
|                    |             |       |
|# Detailed Table ...|             |       |
|             Catalog|spark_catalog|       |
|         

Unnamed: 0,employee_key,employee_id,first_name,last_name,title,reports_to,birth_date,hire_date,phone,fax,email,address,city,state,country,postal_code
0,1,1,Andrew,Adams,General Manager,,1962-02-18,2002-08-14,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1
1,2,2,Nancy,Edwards,Sales Manager,1.0,1958-12-08,2002-05-01,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3


### 4.0. Verify Dimension Tables

In [30]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,chinook_dlh,dim_customers,False
1,chinook_dlh,dim_date,False
2,chinook_dlh,dim_employees,False
3,chinook_dlh,dim_track,False
4,,customers,True
5,,employees,True
6,,tracks,True


## Section III: Integrate Reference Data with Real-Time Data
### 5.0. Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Invoice</span> Fact Data  
#### 5.1. Verify the location of the source data files on the file system

In [64]:
get_file_info(invoices_stream_dir)

Unnamed: 0,name,size,modification_time
0,invoices_batch_1.json,260218,2025-12-16 04:48:57.457730055
1,invoices_batch_2.json,262881,2025-12-16 04:48:57.463805676
2,invoices_batch_3.json,263235,2025-12-16 04:48:57.471902847


#### 5.2. Create the Bronze Layer: Stage <span style="color:darkred">Invoice Fact table</span> Data
##### 5.2.1. Read "Raw" JSON file data into a Stream

In [65]:
df_invoice_sales_bronze = (
    spark.readStream \
    .option("schemaLocation", invoice_sales_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(invoices_stream_dir)
)

df_invoice_sales_bronze.isStreaming

# should return True

True

##### 5.2.2. Write the Streaming Data to a Parquet file

In [66]:
invoice_sales_checkpoint_bronze = os.path.join(invoice_sales_output_bronze, '_checkpoint')

invoice_sales_bronze_query = (
    df_invoice_sales_bronze
    # Add Current Timestamp and Input Filename columns for Traceability
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("invoice_sales_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", invoice_sales_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(invoice_sales_output_bronze)
)

##### 5.2.3. Unit Test: Implement Query Monitoring

In [67]:
print(f"Query ID: {invoice_sales_bronze_query.id}")
print(f"Query Name: {invoice_sales_bronze_query.name}")
print(f"Query Status: {invoice_sales_bronze_query.status}")

Query ID: 945711db-5d3f-4172-aaab-e71f5946dd3e
Query Name: invoice_sales_bronze
Query Status: {'message': 'Getting offsets from FileStreamSource[file:/C:/Users/emili/ds2002/ds-2002-capstone/lab_data/chinook/streaming/invoices]', 'isDataAvailable': True, 'isTriggerActive': True}


In [68]:
invoice_sales_bronze_query.awaitTermination()

#### 5.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 5.3.1. Prepare Role-Playing Dimension Primary and Business Keys

In [69]:
df_dim_invoice_date = df_dim_date.select(col("date_key").alias("invoice_date_key"), col("full_date").alias("invoice_full_date"))

##### 5.3.2. Define Silver Query to Join Streaming with Batch Data

In [70]:
df_invoice_sales_silver = spark.readStream.format("parquet").load(invoice_sales_output_bronze) \
    .withColumnRenamed("InvoiceId", "invoice_id") \
    .withColumnRenamed("CustomerId", "customer_id") \
    .withColumnRenamed("BillingAddress", "billing_address") \
    .withColumnRenamed("BillingCity", "billing_city") \
    .withColumnRenamed("BillingState", "billing_state") \
    .withColumnRenamed("BillingCountry", "billing_country") \
    .withColumnRenamed("BillingPostalCode", "billing_postal_code") \
    .withColumnRenamed("Total", "invoice_total") \
    .withColumnRenamed("InvoiceLineId", "invoice_line_id") \
    .withColumnRenamed("TrackId", "track_id") \
    .withColumnRenamed("UnitPrice", "invoice_unit_price") \
    .withColumnRenamed("Quantity", "quantity") \
    .withColumn("invoice_date", date_format((col("InvoiceDate") / 1000).cast(TimestampType()), "yyyy-MM-dd")) \
    .withColumn("invoice_date_for_join", to_date((col("InvoiceDate") / 1000).cast(TimestampType()))) \
    .join(df_dim_customers, "customer_id", "inner") \
    .join(df_dim_track, "track_id", "inner") \
    .join(df_dim_invoice_date, 
          df_dim_invoice_date.invoice_full_date.cast(DateType()) == col("invoice_date_for_join").cast(DateType()), 
          "inner") \
    .withColumn("line_total", col("invoice_unit_price") * col("quantity")) \
    .select(
        col("invoice_id").cast(LongType()),
        col("invoice_line_id").cast(LongType()),
        col("invoice_date"),
        df_dim_invoice_date.invoice_date_key.cast(LongType()),
        df_dim_customers.customer_key.cast(LongType()),
        df_dim_track.track_key.cast(LongType()),
        col("quantity").cast(IntegerType()),
        col("invoice_unit_price").alias("unit_price").cast(DoubleType()),
        col("line_total").cast(DoubleType()),
        col("invoice_total").cast(DoubleType()),
        col("receipt_time"),
        col("source_file")
    )

In [71]:
df_invoice_sales_silver.isStreaming

True

In [72]:
df_invoice_sales_silver.printSchema()

root
 |-- invoice_id: long (nullable = true)
 |-- invoice_line_id: long (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- invoice_date_key: long (nullable = true)
 |-- customer_key: long (nullable = false)
 |-- track_key: long (nullable = false)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- line_total: double (nullable = true)
 |-- invoice_total: double (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



##### 5.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [73]:
invoice_sales_checkpoint_silver = os.path.join(invoice_sales_output_silver, '_checkpoint')

invoice_sales_silver_query = (
    df_invoice_sales_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("invoice_sales_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", invoice_sales_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(invoice_sales_output_silver)
)

##### 5.3.4. Unit Test: Implement Query Monitoring

In [74]:
print(f"Query ID: {invoice_sales_silver_query.id}")
print(f"Query Name: {invoice_sales_silver_query.name}")
print(f"Query Status: {invoice_sales_silver_query.status}")

Query ID: 47cbac32-0c2f-4559-aa70-01c5aac80bd1
Query Name: invoice_sales_silver
Query Status: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}


In [76]:
invoice_sales_silver_query.awaitTermination()

#### 5.4. Create Gold Layer: Perform Aggregations
##### 5.4.1. Define a Query to Create a Business Report
Create a new Gold table using the PySpark API.

In [78]:
df_invoice_sales_by_genre_gold = spark.readStream.format("parquet").load(invoice_sales_output_silver) \
.join(df_dim_track, "track_key") \
.join(df_dim_date, df_dim_date.date_key.cast(IntegerType()) == col("invoice_date_key").cast(IntegerType())) \
.groupBy("month_of_year", "genre_name", "month_name") \
.agg(count("invoice_line_id").alias("tracks_sold")) \
.orderBy(asc("month_of_year"), desc("tracks_sold"))

In [79]:
df_invoice_sales_by_genre_gold.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- genre_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- tracks_sold: long (nullable = false)



##### 5.4.2. Write the Streaming data to a Parquet File in "Complete" mode

In [80]:
invoice_sales_gold_query = (
    df_invoice_sales_by_genre_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_invoice_sales_by_genre") \
    .start()
)

In [81]:
wait_until_stream_is_ready(invoice_sales_gold_query, 1)

The stream has processed 1 batchs


##### 5.4.3. Query the Gold Data from Memory

In [82]:
df_fact_invoice_sales_by_genre = spark.sql("SELECT * FROM fact_invoice_sales_by_genre")
df_fact_invoice_sales_by_genre.printSchema()

root
 |-- month_of_year: byte (nullable = true)
 |-- genre_name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- tracks_sold: long (nullable = false)



##### 5.4.4 Create the Final Selection

In [83]:
df_fact_invoice_sales_by_genre_gold_final = df_fact_invoice_sales_by_genre \
.select(col("month_name").alias("Month"), \
        col("genre_name").alias("Genre"), \
        col("tracks_sold").alias("Tracks Sold"), \
        col("month_of_year")) \
.orderBy(asc("month_of_year"), desc("Tracks Sold"))

##### 5.4.5. Load the Final Results into a New Table and Display the Results

In [84]:
df_fact_invoice_sales_by_genre_gold_final.write.saveAsTable(f"{dest_database}.fact_invoice_sales_by_genre", mode="overwrite")
spark.sql(f"SELECT Month, Genre, `Tracks Sold` FROM {dest_database}.fact_invoice_sales_by_genre").toPandas()

Unnamed: 0,Month,Genre,Tracks Sold
0,July,Blues,1
1,July,Rock And Roll,1
2,August,Latin,21
3,August,Blues,13
4,August,Alternative & Punk,8
...,...,...,...
72,May,Metal,1
73,June,Rock,13
74,June,Latin,10
75,June,Jazz,7
