# DS 2002 Final Project/Capstone
## Using Juptyer Notebook integrate adventureWorks sample data into Pyspark Environment
## Data: Adventure Works

#### Directions/Objectives
1. The Date dimension, and one other dimension of your choosing must be extracted from the MySQL adventureworks database.
2. Data from at least one other dimension must be exported from the MySQL adventureworks database as a JSON file, which must then be uploaded to MongoDB to create a new database/collection.  Your final project notebook must then extract the data from that MongoDB database/collection to create a new dimension in your data lakehouse.
3. Data from one at least one other dimension must be exported from the MySQL adventureworks database as a CSV file, which must then be read from your local hard disk to create a new dimension in your data lakehouse.
4. Data from your Fact Table must be exported from the MySQL adventureworks database into at least three (3) separate JSON files, which will then be read into a series of PySpark Streaming Dataframes (e.g., Bronze, Silver and Gold) to complete the Lambda architecture of your Data Lakehouse. 

## Section I: Prerequisites

### 1.0. Import Required Libraries

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

import numpy
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine, text

C:\spark-3.5.4-bin-hadoop3


In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.37
Running PyMongo Version: 4.11.3


### 2.0. Instantiate Global Variables

In [3]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
uid = "BryanTang"
pwd = "pAssword123"
hostname = "localhost"
dbname = "adventureworks"
src_dbname = "adventureworks"
dst_dbname = "adventureworks_dw"

mysql_args = {
    "host_name": "localhost",
    "port": "3306",
    "db_name": "adventureworks",         # Overwrite with the intended db
    "src_dbname": "adventureworks",      # Added from first block
    "dst_dbname": "adventureworks_dw",   # Added from first block
    "conn_props": {
        "user": "BryanTang",
        "password": "pAssword123",
        "driver": "com.mysql.cj.jdbc.Driver"
    }
}


# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas", # "atlas"
    "user_name" : "tangbryan8",
    "password" : "bryan817510",
    "cluster_name" : "sandbox",
    "cluster_subnet" : "zdqab",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}


# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------

base_dir = os.path.join(os.getcwd(), 'proj_data')
data_dir = os.path.join(base_dir, 'adventureworks')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

fact_orders_stream_dir = os.path.join(stream_dir, 'fact_orders')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

fact_orders_output_bronze = os.path.join(database_dir, 'fact_orders', 'bronze')
fact_orders_output_silver = os.path.join(database_dir, 'fact_orders', 'silver')
fact_orders_output_gold = os.path.join(database_dir, 'fact_orders', 'gold')


### 3.0. Define Global Functions

In [4]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe

    
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    host = args["host_name"]
    port = args.get("port", "3306")
    db = args["db_name"]

    conn_props = args["conn_props"]
    user = conn_props["user"]
    pwd = conn_props["password"]

    conn_str = f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    
    return dframe

    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['user']}:{args['password']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()



### 4.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [5]:
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\tangb\\DS-2002-main\\Projects\\spark-warehouse\\adventureworks_dlh.db' does not exist."

### 5.0. Create a New Spark Session

In [6]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf_args["spark.master"] = worker_threads
sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
print("Loaded JARs:", spark.sparkContext._conf.get("spark.jars"))
spark

Loaded JARs: C:\Users\tangb\DS-2002-main\Projects\mysql-connector-j-9.1.0\mysql-connector-j-9.1.0.jar


### 6.0. Create a New Metadata Database.

In [7]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Lab 06 Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Lab 6.0');
"""
spark.sql(sql_create_db)

DataFrame[]

## Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
### 1.0. Fetch Data from the File System
#### 1.1. Verify the location of the source data files on the file system

In [8]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,dim_products.csv,83024,2025-03-17 20:31:00.838961124
1,dim_vendors.json,39705,2025-03-17 19:39:53.109505415


### 1.2 Extract Dataframe from MySQL Adventureworks
- Dimdate
- Employee

In [9]:
sql_dim_date = "SELECT date_key, full_date FROM adventureworks.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)
df_dim_date.show(2)
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------+----------+
|date_key| full_date|
+--------+----------+
|20000101|2000-01-01|
|20000102|2000-01-02|
+--------+----------+
only showing top 2 rows

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|            date_key|                 int|   NULL|
|           full_date|                date|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|            dim_date|       |
|        Created Time|Wed May 07 17:54:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/C:/Users/ta...|       |
+----------

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [10]:
sql_dim_employee = "SELECT * FROM adventureworks.employee"
df_dim_employee = get_mysql_dataframe(spark, sql_dim_employee, **mysql_args)
df_dim_employee.show(2)
df_dim_employee.write.saveAsTable(f"{dest_database}.dim_employee", mode="overwrite")
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employee;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employee LIMIT 2").toPandas()

+----------+----------------+---------+--------------------+---------+--------------------+-------------------+-------------+------+-------------------+------------+-------------+--------------+-----------+--------------------+-------------------+
|EmployeeID|NationalIDNumber|ContactID|             LoginID|ManagerID|               Title|          BirthDate|MaritalStatus|Gender|           HireDate|SalariedFlag|VacationHours|SickLeaveHours|CurrentFlag|             rowguid|       ModifiedDate|
+----------+----------------+---------+--------------------+---------+--------------------+-------------------+-------------+------+-------------------+------------+-------------+--------------+-----------+--------------------+-------------------+
|         1|        14417807|     1209|adventure-works\guy1|       16|Production Techni...|1972-05-15 00:00:00|            M|     M|1996-07-31 00:00:00|       false|           21|            30|       true|[4A D0 E1 AA 37 C...|2004-07-31 00:00:00|
|       

Unnamed: 0,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,rowguid,ModifiedDate
0,1,14417807,1209,adventure-works\guy1,16,Production Technician - WC60,1972-05-15,M,M,1996-07-31,False,21,30,True,"[74, 208, 225, 170, 55, 194, 116, 73, 180, 213...",2004-07-31
1,2,253022876,1030,adventure-works\kevin0,6,Marketing Assistant,1977-06-03,S,M,1997-02-26,False,42,41,True,"[64, 2, 72, 27, 192, 149, 15, 65, 167, 23, 235...",2004-07-31


### 1.3 Extract MySQL Query into jSON file for mongodb upload
    - vendors (json) extracted to mongodb, then read back to spark from mongodb

In [11]:
client = get_mongo_client(**mongodb_args)

json_files = {"vendors" : "dim_vendors.json"}

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

In [12]:
mongodb_args["collection"] = "vendors"

df_dim_vendors = get_mongodb_dataframe(spark, **mongodb_args)

ordered_columns = ['VendorID', 'AccountNumber', 'ActiveFlag', 'AddressLine1', 'AddressType'
                   , 'City', 'CreditRating', 'Name', 'PostalCode'
                   , 'PreferredVendorStatus', 'StateProvinceCode', 'State_Province']
df_dim_vendors = df_dim_vendors[ordered_columns]
df_dim_vendors.toPandas().head(2)

Unnamed: 0,VendorID,AccountNumber,ActiveFlag,AddressLine1,AddressType,City,CreditRating,Name,PostalCode,PreferredVendorStatus,StateProvinceCode,State_Province
0,1,INTERNAT0001,1,683 Larch Ct.,Main Office,Salt Lake City,1,International,84101,1,UT,Utah
1,2,ELECTRON0002,1,8547 Catherine Way,Main Office,Tacoma,1,Electronic Bike Repair & Supplies,98403,1,WA,Washington


In [13]:
df_dim_vendors.write.saveAsTable(f"{dest_database}.dim_vendors", mode="overwrite")

In [14]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_vendors;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_vendors LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|            VendorID|                 int|   NULL|
|       AccountNumber|              string|   NULL|
|          ActiveFlag|              string|   NULL|
|        AddressLine1|              string|   NULL|
|         AddressType|              string|   NULL|
|                City|              string|   NULL|
|        CreditRating|                 int|   NULL|
|                Name|              string|   NULL|
|          PostalCode|              string|   NULL|
|PreferredVendorSt...|              string|   NULL|
|   StateProvinceCode|              string|   NULL|
|      State_Province|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|           

Unnamed: 0,VendorID,AccountNumber,ActiveFlag,AddressLine1,AddressType,City,CreditRating,Name,PostalCode,PreferredVendorStatus,StateProvinceCode,State_Province
0,1,INTERNAT0001,1,683 Larch Ct.,Main Office,Salt Lake City,1,International,84101,1,UT,Utah
1,2,ELECTRON0002,1,8547 Catherine Way,Main Office,Tacoma,1,Electronic Bike Repair & Supplies,98403,1,WA,Washington


### 1.4 Extract CSV from local into pyspark
- products (CSV) was locally extracted and read into pyspark 

In [15]:
products_csv = os.path.join(batch_dir, 'dim_products.csv')
print(products_csv)

df_dim_products = spark.read.format('csv').options(header='true', inferSchema='true').load(products_csv)
df_dim_products.toPandas().head(2)

C:\Users\tangb\DS-2002-main\Projects\proj_data\adventureworks\batch\dim_products.csv


Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01,,
1,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01,,


In [16]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

In [17]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_products LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|           ProductID|      int|   NULL|
|                Name|   string|   NULL|
|       ProductNumber|   string|   NULL|
|            MakeFlag|      int|   NULL|
|   FinishedGoodsFlag|      int|   NULL|
|               Color|   string|   NULL|
|    SafetyStockLevel|      int|   NULL|
|        ReorderPoint|      int|   NULL|
|        StandardCost|   double|   NULL|
|           ListPrice|   double|   NULL|
|                Size|   string|   NULL|
| SizeUnitMeasureCode|   string|   NULL|
|WeightUnitMeasure...|   string|   NULL|
|              Weight|   string|   NULL|
|   DaysToManufacture|      int|   NULL|
|         ProductLine|   string|   NULL|
|               Class|   string|   NULL|
|               Style|   string|   NULL|
|     ProductCategory|   string|   NULL|
|  ProductSubcategory|   string|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01,,
1,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01,,


### 1.5 add surgate primary key for each table

In [18]:
df_dim_products.createOrReplaceTempView("products")
sql_products = f"""
    SELECT *, ROW_NUMBER() OVER (PARTITION BY ProductID ORDER BY ProductID) AS ProductKey
    FROM products;
"""
df_dim_products = spark.sql(sql_products)

cols = df_dim_products.columns

# Move 'ProductKey' to the front
reordered_cols = ['ProductKey'] + [col for col in cols if col != 'ProductKey']

# Apply the new column order
df_dim_products = df_dim_products.select(reordered_cols)

df_dim_products.toPandas().head(2)

Unnamed: 0,ProductKey,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,317,LL Crankarm,CA-5965,0,0,Black,500,375,0.0,...,0,,L,,,,,1998-06-01,,
1,1,343,Flat Washer 2,FW-1400,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,


In [19]:
df_dim_employee.createOrReplaceTempView("employees")
sql_employees = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY EmployeeId) AS EmployeeKey
    FROM employees
"""
df_dim_employee = spark.sql(sql_employees)

cols = df_dim_employee.columns
reordered_cols = ['EmployeeKey'] + [col for col in cols if col != 'EmployeeKey']
df_dim_employee = df_dim_employee.select(reordered_cols)

# Option 1: Drop problematic timestamp columns
# Identify TimestampType columns in df_dim_employee
from pyspark.sql.types import TimestampType

timestamp_cols = [f.name for f in df_dim_employee.schema.fields if isinstance(f.dataType, TimestampType)]
print("Timestamp columns:", timestamp_cols)

df_dim_employee_cleaned = df_dim_employee.drop(*timestamp_cols)

# Option 2: Cast timestamp columns to string instead
from pyspark.sql.functions import col
for ts_col in timestamp_cols:
    df_dim_employee = df_dim_employee.withColumn(ts_col, col(ts_col).cast("string"))
df_dim_employee.toPandas().head(2)

Timestamp columns: ['BirthDate', 'HireDate', 'ModifiedDate']


Unnamed: 0,EmployeeKey,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,rowguid,ModifiedDate
0,1,1,14417807,1209,adventure-works\guy1,16.0,Production Technician - WC60,1972-05-15 00:00:00,M,M,1996-07-31 00:00:00,False,21,30,True,"[74, 208, 225, 170, 55, 194, 116, 73, 180, 213...",2004-07-31 00:00:00
1,2,2,253022876,1030,adventure-works\kevin0,6.0,Marketing Assistant,1977-06-03 00:00:00,S,M,1997-02-26 00:00:00,False,42,41,True,"[64, 2, 72, 27, 192, 149, 15, 65, 167, 23, 235...",2004-07-31 00:00:00


In [20]:
df_dim_vendors.createOrReplaceTempView("vendors")
sql_vendors = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY VendorId) AS VendorKey
    FROM vendors
"""
df_dim_vendors = spark.sql(sql_vendors)

cols = df_dim_vendors.columns
reordered_cols = ['VendorKey'] + [col for col in cols if col != 'VendorKey']
df_dim_vendors = df_dim_vendors.select(reordered_cols)
df_dim_vendors.toPandas().head(2)

Unnamed: 0,VendorKey,VendorID,AccountNumber,ActiveFlag,AddressLine1,AddressType,City,CreditRating,Name,PostalCode,PreferredVendorStatus,StateProvinceCode,State_Province
0,1,1,INTERNAT0001,1,683 Larch Ct.,Main Office,Salt Lake City,1,International,84101,1,UT,Utah
1,2,2,ELECTRON0002,1,8547 Catherine Way,Main Office,Tacoma,1,Electronic Bike Repair & Supplies,98403,1,WA,Washington


### 1.5 Verify that pyspark as all tables
- df_dim_date
- df_dim_employee
- df_dim_vendors
- df_dim_products

In [21]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,adventureworks_dlh,dim_date,False
1,adventureworks_dlh,dim_employee,False
2,adventureworks_dlh,dim_products,False
3,adventureworks_dlh,dim_vendors,False
4,,employees,True
5,,products,True
6,,vendors,True


### 2.0 Pyspark streaming with fact_orders
#### 2.1 Verify the location of the source data files on the file system

In [22]:
get_file_info(fact_orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,adventureworks_fact_orders_1.json,1041129,2025-05-02 23:28:32.406136036
1,adventureworks_fact_orders_2.json,1049473,2025-05-02 23:28:32.422135115
2,adventureworks_fact_orders_3.json,1045456,2025-05-02 23:28:32.433131218


#### 2.2 Create the Bronze Layer: Stage <span style="color:darkred">Fact Orders table</span> Data
##### 2.2.1 read in json file into stream

In [23]:
df_fact_orders_bronze = (
    spark.readStream \
    .option("schemaLocation", fact_orders_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(fact_orders_stream_dir)
)

df_fact_orders_bronze.isStreaming

True

##### 3.2.2. Write the Streaming Data to a Parquet file

In [24]:
fact_orders_checkpoint_bronze = os.path.join(fact_orders_output_bronze, '_checkpoint')

fact_orders_bronze_query = (
    df_fact_orders_bronze
    # Add Current Timestamp and Input Filename columns for Traceability
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("fact_orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", fact_orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(fact_orders_output_bronze)
)

##### 2.2.3. Unit Test: Implement Query Monitoring

In [25]:
print(f"Query ID: {fact_orders_bronze_query.id}")
print(f"Query Name: {fact_orders_bronze_query.name}")
print(f"Query Status: {fact_orders_bronze_query.status}")

Query ID: 72a64501-e85b-4437-8d34-57c79a798ff2
Query Name: fact_orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [26]:
fact_orders_bronze_query.awaitTermination()

#### 2.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 2.3.1. Prepare Role-Playing Dimension Primary and Business Keys

In [27]:
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("fact_order_full_date"))


In [28]:
spark.readStream.format("parquet").load(fact_orders_output_bronze).printSchema()

root
 |-- DueDate: long (nullable = true)
 |-- EmployeeID: long (nullable = true)
 |-- Freight: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- OrderDate: long (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- ProductID: long (nullable = true)
 |-- PurchaseOrderID: long (nullable = true)
 |-- ReceivedQty: double (nullable = true)
 |-- ShipDate: long (nullable = true)
 |-- Status: long (nullable = true)
 |-- StockedQty: double (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- VendorID: long (nullable = true)
 |-- fact_purchase_orders_key: long (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



##### 2.3.2. Define Silver Query to Join Streaming with Batch Data

In [29]:
from pyspark.sql.functions import col
from pyspark.sql.types import LongType

# Load the bronze data (without CustomerID, as you've removed it)
df_bronze = spark.readStream.format("parquet").load(fact_orders_output_bronze)

# Perform the necessary joins
df_fact_orders_silver = df_bronze \
    .join(df_dim_employee, "EmployeeID") \
    .join(df_dim_products, "ProductID") \
    .join(df_dim_vendors, "VendorID") \
    .select(
        df_dim_employee.EmployeeKey.cast(LongType()).alias("EmployeeKey"),
        df_dim_products.ProductKey.cast(LongType()).alias("ProductKey"),
        df_dim_vendors.VendorKey.cast(LongType()).alias("VendorKey"),
        df_bronze.DueDate,
        df_bronze.Freight,
        df_bronze.LineTotal,
        df_bronze.OrderDate,
        df_bronze.OrderQty,
        df_bronze.ReceivedQty,
        df_bronze.ShipDate,
        df_bronze.Status,
        df_bronze.StockedQty,
        df_bronze.ShipDate,
        df_bronze.Status,
        df_bronze.TotalDue,
        df_bronze.UnitPrice
    )

# Define the streaming output path for the silver layer
df_fact_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .start("path/to/output/dir")


<pyspark.sql.streaming.query.StreamingQuery at 0x1506d4c5f40>

In [30]:
df_fact_orders_silver.isStreaming

True

In [31]:
df_fact_orders_silver.printSchema()

root
 |-- EmployeeKey: long (nullable = false)
 |-- ProductKey: long (nullable = false)
 |-- VendorKey: long (nullable = false)
 |-- DueDate: long (nullable = true)
 |-- Freight: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- OrderDate: long (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- ReceivedQty: double (nullable = true)
 |-- ShipDate: long (nullable = true)
 |-- Status: long (nullable = true)
 |-- StockedQty: double (nullable = true)
 |-- ShipDate: long (nullable = true)
 |-- Status: long (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- UnitPrice: double (nullable = true)



##### 2.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [32]:
fact_orders_checkpoint_silver = os.path.join(fact_orders_output_silver, '_checkpoint')

fact_orders_silver_query = (
    df_fact_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", fact_orders_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(fact_orders_output_silver)
)

##### 2.3.4. Unit Test: Implement Query Monitoring

In [33]:
print(f"Query ID: {fact_orders_silver_query.id}")
print(f"Query Name: {fact_orders_silver_query.name}")
print(f"Query Status: {fact_orders_silver_query.status}")

Query ID: 20d6a4f3-7146-4dd4-b8f7-d64842ce9eac
Query Name: orders_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [34]:
fact_orders_silver_query.awaitTermination()

#### 2.4. Create Gold Layer: Perform Aggregations
##### 2.4.1. Define a Query to Create a Business Report

In [35]:
df_dim_products.toPandas().head(10)

Unnamed: 0,ProductKey,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,317,LL Crankarm,CA-5965,0,0,Black,500,375,0.0,...,0,,L,,,,,1998-06-01,,
1,1,343,Flat Washer 2,FW-1400,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
2,1,412,Internal Lock Washer 3,LI-1000,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
3,1,434,Thin-Jam Lock Nut 7,LJ-7161,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
4,1,439,Lock Nut 6,LN-1032,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
5,1,443,Lock Nut 8,LN-1420,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
6,1,452,Lock Nut 2,LN-5400,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
7,1,471,Lock Washer 12,LW-5800,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
8,1,475,Lock Washer 11,LW-9160,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01,,
9,1,496,Paint - Yellow,PA-823Y,0,0,,60,45,0.0,...,0,,,,,,,1998-06-01,,


In [36]:
from pyspark.sql.functions import month, date_format

df_dim_date = df_dim_date \
    .withColumn("month_of_year", month("full_date")) \
    .withColumn("month_name", date_format("full_date", "MMMM"))
df_dim_date.printSchema()


root
 |-- date_key: integer (nullable = true)
 |-- full_date: date (nullable = true)
 |-- month_of_year: integer (nullable = true)
 |-- month_name: string (nullable = true)



In [37]:
from pyspark.sql.functions import count, asc, desc, to_date
from pyspark.sql.types import IntegerType

# Convert OrderDate (assumed to be long or timestamp) to a comparable format (e.g., date)
df_fact_orders_by_product_category_gold = (
    spark.readStream.format("parquet").load(fact_orders_output_silver)
    .withColumn("order_date_parsed", to_date(col("OrderDate").cast("timestamp")))  # Parse OrderDate
    .join(df_dim_products, "ProductKey")
    .join(
        df_dim_date,
        df_dim_date.full_date == col("order_date_parsed")  # Adjust "full_date" to your actual column name
    )
    .groupBy("month_of_year", "Name", "month_name")
    .agg(count("StandardCost").alias("Standard Cost"))
    .orderBy(asc("month_of_year"), desc("Standard Cost"))
)


In [38]:
df_fact_orders_by_product_category_gold.printSchema()

root
 |-- month_of_year: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- Standard Cost: long (nullable = false)



##### 3.4.2. Write the Streaming data to a Parquet File in "Complete" mode

In [39]:
fact_orders_gold_query = (
    df_fact_orders_by_product_category_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_orders_by_product_category")
    .start()
)

In [40]:
df_dim_products.printSchema()

root
 |-- ProductKey: integer (nullable = false)
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: integer (nullable = true)
 |-- FinishedGoodsFlag: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)
 |-- StandardCost: double (nullable = true)
 |-- ListPrice: double (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- DaysToManufacture: integer (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- ProductSubcategory: string (nullable = true)
 |-- ProductModel: string (nullable = true)
 |-- SellStartDate: timestamp (null

In [41]:
df_fact_orders_by_product_category = spark.sql("SELECT * FROM fact_orders_by_product_category")
df_fact_orders_by_product_category.printSchema()

root
 |-- month_of_year: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- Standard Cost: long (nullable = false)



In [45]:
df_fact_orders_by_product_category_gold_final = df_fact_orders_by_product_category \
.select(col("month_name").alias("Month"), \
        col("Name").alias("Product Name"), \
        col("Standard Cost").alias("Standard Cost")) \
.orderBy(asc("month_of_year"), desc("Standard Cost"))

In [46]:
df_fact_orders_by_product_category_gold_final.write.saveAsTable(f"{dest_database}.fact_orders_by_product_category", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_orders_by_product_category").toPandas()

Unnamed: 0,Month,Product Name,Standard Cost


In [47]:
spark.stop()