In [1]:
# using python to perform ETL

In [3]:
%%python -m pip install pymongo
!pip install sqlalchemy



In [4]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [8]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.34
Running PyMongo Version: 4.11.2


In [10]:
# MongoDB: Declare and Assign Connection Variables for the MongoDB Server 
mysql_args = {
    "uid" : "root",
    "pwd" : "Strawberrylime",
    "hostname" : "localhost",
    "dbname" : "classicmodels_dw"
}
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/{mysql_args['dbname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "aryaa_desh",
    "password" : "Strawberrylime",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local", # "local"
    "db_name" : "classicmodels_dw" 
}

In [12]:
# Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Strawberrylime"

src_dbname = "classicmodels"
dst_dbname = "classicmodels_dw"

In [14]:
# Define Functions for Getting Data From and Setting Data into Database for MongoDB
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for collection_name in json_files:
        json_path = os.path.join(data_directory, json_files[collection_name])
        records = []
        
        with open(json_path, 'r') as f:
            for i, line in enumerate(f, start=1):
                line = line.strip()
                if not line:
                    continue  # skip blank lines
                try:
                    records.append(json.loads(line))
                except json.JSONDecodeError as e:
                    print(f"Skipping line {i} due to error: {e}")
        
        if records:
            result = db[collection_name].insert_many(records)
            print(f"✅ Inserted {len(result.inserted_ids)} records into '{collection_name}'")
        else:
            print(f"⚠️ No valid records found for '{collection_name}'")


In [16]:
# Define CSV File Path (Ensure the file is in the correct directory)
data_dir = os.getcwd()  # Adjust if necessary
data_file = os.path.join(data_dir, "orders.csv")

# Read CSV into Pandas DataFrame
df = pd.read_csv(data_file)

# Load Data into MySQL
table_name = "orders"  # Ensure this matches your MySQL table name
df.to_sql(name=table_name, con=sqlEngine, if_exists="replace", index=False)

print(f"Data successfully inserted into `{table_name}` table in `{mysql_args['dbname']}` database.")

Data successfully inserted into `orders` table in `classicmodels_dw` database.


In [18]:
# Populate MongoDB with Source Data

In [20]:
!nslookup cluster_name.xxxxx.mongodb.net

Server:		1.1.1.1
Address:	1.1.1.1#53

** server can't find cluster_name.xxxxx.mongodb.net: NXDOMAIN



In [22]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')
print(f"Current working directory: {os.getcwd()}")

json_files = {"payments" : 'payments.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)      

Current working directory: /Users/aryaadeshpande/Desktop
Skipping line 1 due to error: Expecting value: line 1 column 2 (char 1)
Skipping line 2 due to error: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
Skipping line 3 due to error: Extra data: line 1 column 17 (char 16)
Skipping line 4 due to error: Extra data: line 1 column 14 (char 13)
Skipping line 5 due to error: Extra data: line 1 column 14 (char 13)
Skipping line 6 due to error: Extra data: line 1 column 9 (char 8)
Skipping line 7 due to error: Expecting value: line 1 column 1 (char 0)
Skipping line 8 due to error: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
Skipping line 9 due to error: Extra data: line 1 column 17 (char 16)
Skipping line 10 due to error: Extra data: line 1 column 14 (char 13)
Skipping line 11 due to error: Extra data: line 1 column 14 (char 13)
Skipping line 12 due to error: Extra data: line 1 column 9 (char 8)
Skipping line 13 due to error: Expec

In [24]:
# Define your query and collection first
query = {}  # get all documents
collection = "payments"

# Extract Data from the Source MongoDV Collections into DataFrames
client = pymongo.MongoClient("mongodb://localhost:27017/")  # Adjust the URI as needed
db = client["db_name"]

dframe = pd.DataFrame(list(db[collection].find(query)))

client = get_mongo_client(**mongodb_args)

# query = {} # Select all elements (columns), and all documents (rows).
# collection = "payments"

df_payments = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
0,103,HQ336336,2004-10-19,6066.78
1,103,JM555205,2003-06-05,14571.44


In [26]:
# At this point, run the [DS2002 Midterm dim_date SQL file]

In [29]:
# Look up the payments_date keys from the date dimension table
sql_dim_date = "SELECT date_key, full_date FROM classicmodels_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(10)


Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05
5,20000106,2000-01-06
6,20000107,2000-01-07
7,20000108,2000-01-08
8,20000109,2000-01-09
9,20000110,2000-01-10


In [31]:
# Look up the Surrogate Primary Key (date_key) that corresponds to the paymentDate column

df_dim_paymentDate = df_dim_date.rename(columns={"date_key" : "paymentDate_key", "full_date" : "paymentDate"})
df_payments.paymentDate = df_payments.paymentDate.astype('datetime64[ns]').dt.date
df_payments = pd.merge(df_payments, df_dim_paymentDate, on='paymentDate', how='left')
df_payments.drop(['paymentDate'], axis=1, inplace=True)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,amount,paymentDate_key
0,103,HQ336336,6066.78,20041019
1,103,JM555205,14571.44,20030605


In [33]:
# Perform Any Necessary Transformations to the DataFrames

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_payments.drop(['checkNumber'], axis=1, inplace=True)
df_payments.insert(0, "payment_key", range(1, df_payments.shape[0]+1))
df_payments.head(2)

Unnamed: 0,payment_key,customerNumber,amount,paymentDate_key
0,1,103,6066.78,20041019
1,2,103,14571.44,20030605


In [35]:
# Load the transformed payments DataFrames into the New Data Warehouse by creating new tables

In [38]:
# # Define Functions for Getting Data From and Setting Data Into Databases
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()

    # Debugging the query and connection
    print(f"Inserting into {table_name} with primary key {pk_column}")

    # Safely insert or update data
    if db_operation == "insert":
        try:
            df.to_sql(table_name, con=connection, index=False, if_exists='append')  # Append data without replacing
            print("Data inserted successfully.")
        except Exception as e:
            print(f"Error inserting data into {table_name}: {e}")

        # Ensure the primary key column exists with AUTO_INCREMENT before inserting
        try:
            # Check if the column exists
            result = connection.execute(f"SHOW COLUMNS FROM {table_name} LIKE '{pk_column}';")
            if not result.fetchone():
                print(f"Adding primary key column {pk_column} as AUTO_INCREMENT.")
                # Add primary key if it doesn't exist
                connection.execute(f"ALTER TABLE {table_name} ADD COLUMN {pk_column} INT AUTO_INCREMENT PRIMARY KEY;")
        except Exception as e:
            print(f"Error adding primary key to {table_name}: {e}")

    elif db_operation == "replace":
        try:
            df.to_sql(table_name, con=connection, index=False, if_exists='replace')  # ⚠️ Use this carefully!
            connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            print("Data replaced successfully.")
        except Exception as e:
            print(f"Error replacing data into {table_name}: {e}")

    connection.close()


In [40]:
# TODO: Upload the "payments" dataframe to create the new "dim_payments" dimension table
# Check for duplicates in the payment_key column

dataframe = df_payments
table_name = 'dim_payments'
primary_key = 'payment_key'
db_operation = "update"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

Inserting into dim_payments with primary key payment_key


In [42]:
# fetching data for dimensions table

In [44]:
# fetching data from customers
sql_customers = "SELECT * FROM classicmodels.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


In [46]:
# fetching data from orders
sql_orders = "SELECT * FROM classicmodels_dw.orders"
df_orders = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_orders)
df_orders.head(2)

Unnamed: 0,OrderID,OrderDate,RequiredDate,ShippedDate,Status,Comments,CustomerID
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


In [48]:
# fetching data from products
sql_products = "SELECT * FROM classicmodels.products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


In [50]:
# Perform any necessary transformations

In [52]:
# CUSTOMERS
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['contactLastName','contactFirstName','phone','addressLine2','salesRepEmployeeNumber','creditLimit']
df_customers.drop(drop_cols, axis=1, inplace=True)

# 2. Rename the "customerNumber" column to "customer_id" to reflect the entity as it will serve as the business key for lookup operations
df_customers.rename(columns={"customerNumber":"customer_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_customers.head(2)

Unnamed: 0,customer_key,customer_id,customerName,addressLine1,city,state,postalCode,country
0,1,103,Atelier graphique,"54, rue Royale",Nantes,,44000,France
1,2,112,Signal Gift Stores,8489 Strong St.,Las Vegas,NV,83030,USA


In [54]:
# PRODUCTS
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['productScale','productVendor','productDescription','quantityInStock']
df_products.drop(drop_cols, axis=1, inplace=True)

# 2. Rename the "productCode" column to "product_id" to reflect the entity as it will serve as the business key for lookup operations
df_products.rename(columns={"productCode":"product_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_products.head(2)

Unnamed: 0,product_key,product_id,productName,productLine,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,98.58,214.3


In [56]:
sql_check_table = "SHOW TABLES;"
df_tables = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_check_table)
print(df_tables)

  Tables_in_classicmodels_dw
0              dim_customers
1                   dim_date
2               dim_payments
3               dim_products
4                     orders
5                 sales_fact


In [58]:
# load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [60]:
db_operation = "update"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_products', df_products, 'product_key')]

In [62]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)


Inserting into dim_customers with primary key customer_key
Inserting into dim_products with primary key product_key


In [64]:
# creating and populating the fact table called sales_fact

In [66]:
sql_check_table = "SHOW TABLES;"
df_tables = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_check_table)
print(df_tables)

  Tables_in_classicmodels_dw
0              dim_customers
1                   dim_date
2               dim_payments
3               dim_products
4                     orders
5                 sales_fact


In [68]:
sql_sales_fact = """
SELECT 
    c.customer_key, 
    py.payment_key, 
    py.amount AS total_price,
    py.paymentDate_key
FROM dim_payments py
JOIN dim_customers c ON py.customerNumber = c.customer_key


"""

df_sales_fact = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_sales_fact)
print(df_sales_fact.shape)  # Should be a non-zero shape if data exists
print(df_sales_fact.head(2))  # Check the first 2 rows to ensure data is fetched


(17, 4)
   customer_key  payment_key  total_price  paymentDate_key
0           103            1      6066.78         20041019
1           103            2     14571.44         20030605


In [70]:
# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_sales_fact.insert(0, "sales_fact_key", range(1, df_sales_fact.shape[0] + 1))
# df_sales_fact.drop(columns=["sales_fact_key"], errors="ignore", inplace=True)

# 4. Display the first 2 rows of the dataframe to validate your work
df_sales_fact.head(2)

Unnamed: 0,sales_fact_key,customer_key,payment_key,total_price,paymentDate_key
0,1,103,1,6066.78,20041019
1,2,103,2,14571.44,20030605


In [72]:
# Insert data into the sales_fact table

df_sales_fact.to_sql('sales_fact', con=sqlEngine, if_exists='replace', index=False)


print("Data successfully inserted into the sales_fact table.")
df_sales_fact.head(5)

Data successfully inserted into the sales_fact table.


Unnamed: 0,sales_fact_key,customer_key,payment_key,total_price,paymentDate_key
0,1,103,1,6066.78,20041019
1,2,103,2,14571.44,20030605
2,3,103,3,1676.14,20041218
3,4,112,4,14191.12,20041217
4,5,112,5,32641.98,20030606


In [74]:
# demonstrate that the new data warehouse exists and contains the correct data

In [77]:
#This query retrieves total sales data for each customer, including total transactions, revenue, and average payment amount.

sql_query = """
SELECT 
    c.customer_key, 
    c.customerName, 
    COUNT(sf.sales_fact_key) AS total_sales, 
    SUM(py.amount) AS total_revenue, 
    AVG(py.amount) AS avg_payment_amount
FROM sales_fact sf
JOIN dim_payments py ON sf.payment_key = py.payment_key
JOIN dim_customers c ON py.customerNumber = c.customer_key
GROUP BY c.customer_key, c.customerName
ORDER BY total_revenue DESC;
"""

# Execute the SQL query and load the results into a Pandas DataFrame
df_total_sales_by_customer = pd.read_sql(sql_query, sqlEngine)

# Display the first 2 rows to check the output
print(df_total_sales_by_customer.head(2))

   customer_key              customerName  total_sales  total_revenue  \
0           114       Mit Vergnügen & Co.            4      180585.07   
1           119  Signal Collectibles Ltd.            3      116949.68   

   avg_payment_amount  
0        45146.267500  
1        38983.226667  


In [79]:
#End of midterm, start of captstone

In [81]:
!{sys.executable} -m pip install pymongo
!pip install pymongo certifi pandas findspark

zsh:1: parse error near `-m'


In [83]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W
import os
print(os.getcwd())
print(os.listdir())  

/opt/anaconda3/lib/python3.12/site-packages/pyspark
/Users/aryaadeshpande/Desktop
['Screenshot 2025-05-04 at 7.07.26\u202fPM.png', 'sqljdbc_12.10.0.0_enu.zip', 'Visual Studio Code.app', 'Screenshot 2025-05-07 at 10.54.04\u202fPM.png', 'Lab_4_Unsupervised_Learning.pdf', 'orders.csv', '.DS_Store', 'DS 2002', 'Lab_02c_Create_Populate_Dim_Date (1).sql', 'lab_data', '[DS2002] 1Midterm.html', 'Design Portfolio Link.pdf', 'Lab-06-PySpark-Data-Lakehouse-Structured-Streaming.ipynb', '6.0-Lab-DataLakehouse-Structured-Streaming.ipynb', '.localized', 'Screenshot 2025-04-21 at 3.38.30\u202fPM.png', 'DS2002_Final_Project (1) (1).ipynb', 'RESUME - Aryaa Deshpande (8).pdf', 'DS-2002-main', 'mysql-connector-j-9.1.0', 'mysql-connector-j-9.1.0.jar', 'classic_model_products.json', 'Demo-05-PySpark-Database-Connectivity.ipynb', 'DS2002_Final_Project.ipynb', 'DS2002_Final_Project (1) (3).ipynb', 'mysql-connector-j-9.3.0.jar', 'DS2002_Final_Project (1).ipynb', 'DS 3001', 'classic_orders_2c.json', 'spark-ware

In [85]:

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------

mongodb_args = {
    "user_name" : "aryaa_desh",
    "password" : "Strawberrylime",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local", # "local"
    "db_name" : "classicmodels_dw"
}

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'classic_models')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

orders_stream_dir = os.path.join(stream_dir, 'orders')
purchase_orders_stream_dir = os.path.join(stream_dir, 'customers')
inventory_trans_stream_dir = os.path.join(stream_dir, 'inventory_transactions')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "classic_models_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

orders_output_bronze = os.path.join(database_dir, 'fact_orders', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_orders', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_orders', 'gold')

customers_output_bronze = os.path.join(database_dir, 'dim_customers', 'bronze')
customers_output_silver = os.path.join(database_dir, 'dim_customers', 'silver')
customers_output_gold = os.path.join(database_dir, 'dim_customers', 'gold')

inventory_trans_output_bronze = os.path.join(database_dir, 'fact_inventory_transactions', 'bronze')
inventory_trans_output_silver = os.path.join(database_dir, 'fact_inventory_transactions', 'silver')
inventory_trans_output_gold = os.path.join(database_dir, 'fact_inventory_transactions', 'gold') 

In [87]:
# Define Global Functions
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark C Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = [json.loads(line) for line in openfile if line.strip()]
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

mongodb_args["null_column_threshold"] = 0.5  # or another threshold value

    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

In [89]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [91]:
# Initialize Data Lakehouse Directory Structure

In [93]:
#Remove the data lakehouse database directory structure to ensure idempotency
remove_directory_tree(database_dir)

"Directory '/Users/aryaadeshpande/Desktop/spark-warehouse/classic_models_dlh.db' has been removed successfully."

In [95]:
# Create A New Spark Session
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = "/Users/aryaadeshpande/Desktop/mysql-connector-j-9.1.0.jar"
# mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")

mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

25/05/08 00:12:06 WARN Utils: Your hostname, Aryaas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.41.1.51 instead (on interface en0)
25/05/08 00:12:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/aryaadeshpande/.ivy2/cache
The jars for the packages stored in: /Users/aryaadeshpande/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2572d880-d8c7-4526-8b2f-6148be106ad4;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 298ms :: artifacts dl 11ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules        

In [97]:
# Create a New Metadata Database

In [100]:
print(spark.sparkContext.getConf().get("spark.jars"))
import os
mysql_jar_path = "/Users/aryaadeshpande/Desktop/mysql-connector-j-9.1.0.jar"
print(os.path.exists(mysql_jar_path))


/Users/aryaadeshpande/Desktop/mysql-connector-j-9.1.0.jar
True


In [102]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'Capstone Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Capstone');
"""
spark.sql(sql_create_db)

DataFrame[]

In [104]:
# Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
#Fetch data from the file system 
#Verify the location of the source data files on the file system 

In [106]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,.DS_Store,6148,2025-05-08 04:04:11.139058828
1,classic_models_customers.csv,5473,2025-05-06 21:07:13.551294327
2,classic_models_employees.csv,1781,2025-05-06 21:03:53.490443945
3,classic_models_orders.csv,3389,2025-05-06 21:09:11.481813908
4,classic_models_productlines.csv,3446,2025-05-06 20:57:26.006496668
5,classic_models_productlines.json,3886,2025-05-07 22:06:31.654823780
6,classic_models_products.csv,2011,2025-05-07 01:56:58.468213081
7,classic_models_products.json,4686,2025-05-08 03:16:45.490648985


In [108]:
# Populate employees dimension 
# Use pyspark to read from a CSV file 
employee_csv = os.path.join(batch_dir, 'classic_models_employees.csv')
print(employee_csv)

df_dim_employees = spark.read.format('csv').options(header='true', inferSchema='true').load(employee_csv)
df_dim_employees.toPandas().head(2)


/Users/aryaadeshpande/Desktop/lab_data/classic_models/batch/classic_models_employees.csv


                                                                                

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


In [109]:
# Make neccessary transformations to employees dataframe
# rename 'id' column to 'employee_id'
df_dim_employees = df_dim_employees.withColumnRenamed("employeeNumber", "employee_id")
# Add primary key column 
df_dim_employees.createOrReplaceTempView("employees")
sql_employees = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY employee_id) AS employee_key
    FROM employees;
"""
df_dim_employees = spark.sql(sql_employees)
df_dim_employees.toPandas().head(2)

Unnamed: 0,employee_id,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle,employee_key
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President,1
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales,2


In [112]:

# save as the dim_employees in the data lakehouse

import shutil

shutil.rmtree("/Users/aryaadeshpande/Desktop/spark-warehouse/classic_models_dlh.db/dim_employees", ignore_errors=True)


df_dim_employees.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")



                                                                                

In [114]:
# Unit test: describe and preview the table
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         employee_id|                 int|   NULL|
|            lastName|              string|   NULL|
|           firstName|              string|   NULL|
|           extension|              string|   NULL|
|               email|              string|   NULL|
|          officeCode|                 int|   NULL|
|           reportsTo|              double|   NULL|
|            jobTitle|              string|   NULL|
|        employee_key|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  classic_models_dlh|       |
|               Table|       dim_employees|       |
|        Created Time|Thu May 08 00:12:...|       |
|         Last Access|             UNKNOWN|       |
|          C

Unnamed: 0,employee_id,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle,employee_key
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President,1
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales,2


In [116]:
# Populate productlines dimension
# Use pyspark to read from a CSV file 
productlines_csv = os.path.join(batch_dir, 'classic_models_productlines.csv')
print(productlines_csv)

df_dim_productlines = spark.read.format('csv').options(header='true', inferSchema='true').load(productlines_csv)
df_dim_productlines.toPandas().head(2)

/Users/aryaadeshpande/Desktop/lab_data/classic_models/batch/classic_models_productlines.csv


Unnamed: 0,productLine,textDescription,htmlDescription,image
0,Classic Cars,Attention car enthusiasts: Make your wildest c...,,
1,Motorcycles,Our motorcycles are state of the art replicas ...,,


In [117]:
# save as the dim_productlines in the data lakehouse
import shutil

shutil.rmtree("/Users/aryaadeshpande/Desktop/spark-warehouse/classic_models_dlh.db/dim_productlines", ignore_errors=True)

spark.sql("DROP TABLE IF EXISTS classic_models_dlh.dim_productlines")
df_dim_productlines.write.saveAsTable(f"{dest_database}.dim_productlines", mode="overwrite")

In [119]:
# Populate customers dimension 
# Use pyspark to read from a CSV file 
customers_csv = os.path.join(batch_dir, 'classic_models_customers.csv')
print(customers_csv)

df_dim_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customers_csv)
df_dim_customers.toPandas().head(2)

/Users/aryaadeshpande/Desktop/lab_data/classic_models/batch/classic_models_customers.csv


Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


In [121]:
# Make neccessary transformations to customers dataframe
# rename 'customerNumber' column to 'customer_id'
customers_csv = os.path.join(batch_dir, 'classic_models_customers.csv')
df_dim_customers = spark.read.format('csv').options(header='true', inferSchema=True).load(customers_csv)

df_dim_customers = df_dim_customers.withColumnRenamed("customerNumber", "customer_id")
# Add order primary key column 
df_dim_customers.createOrReplaceTempView("customers")
sql_customers = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY customer_id) AS customer_key
    FROM customers;
"""
df_dim_customer = spark.sql(sql_customers)
df_dim_customer.toPandas().head(2)

Unnamed: 0,customer_id,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit,customer_key
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0,1
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0,2


In [122]:
# save as the dim_customers in the data lakehouse
import shutil

shutil.rmtree("/Users/aryaadeshpande/Desktop/spark-warehouse/classic_models_dlh.db/dim_customers", ignore_errors=True)

df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [132]:
# Populate the Date Dimension

In [134]:
# Fetch data from dim_date table in MySQL

In [136]:
import os
print(os.path.exists("mysql-connector-j-9.1.0.jar"))
print(spark.sparkContext.getConf().get("spark.jars"))

True
/Users/aryaadeshpande/Desktop/mysql-connector-j-9.1.0.jar


In [138]:
mysql_args = {
    "host_name": "localhost",         # or the remote host address
    "port": "3306",
    "db_name": "classicmodels_dw",  # must match the key used in get_mysql_dataframe
    "conn_props": {
        "user": "root",
        "password": "Strawberrylime",
        "driver": "com.mysql.cj.jdbc.Driver"
    }
}


In [140]:
sql_test = f"""
SELECT TABLE_NAME 
FROM INFORMATION_SCHEMA.TABLES 
WHERE TABLE_SCHEMA = '{mysql_args['db_name']}'
"""
df_test = get_mysql_dataframe(spark, sql_test, **mysql_args)
df_test.show()


+-------------+
|   TABLE_NAME|
+-------------+
|dim_customers|
|     dim_date|
| dim_payments|
| dim_products|
|       orders|
|   sales_fact|
+-------------+



In [141]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)

In [142]:
# Save dim_date table in data lakehouse
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

                                                                                

In [144]:
### Fetch Reference Data from a MongoDB Atlas Database
####Create a New MongoDB Database, and Load Each JSON File into a New MongoDB Collection

In [146]:
spark.catalog.clearCache()


In [147]:
client = get_mongo_client(**mongodb_args)

json_files = {"products" : "classic_models_products.json"}

set_mongo_collections(client, mongodb_args["db_name"], batch_dir, json_files) 

In [152]:
####Populate the products Dimension 
##### 2.2.1. Fetch Data from the New MongoDB <span style="color:darkred">Customers</span> Collection
mongodb_args["collection"] = "products"
mongodb_args["null_column_threshold"] = 0.5

df_dim_products = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_products.toPandas().head(2)

Unnamed: 0,MSRP,buyPrice,productCode,productDescription,productLine,productName,productScale,productVendor,quantityInStock
0,95.7,48.81,S10_1678,"This replica features working kickstand, front...",Motorcycles,1969 Harley Davidson Ultimate Chopper,1:10,Min Lin Diecast,7933
1,214.3,98.58,S10_1949,Turnable front wheels; steering function; deta...,Classic Cars,1952 Alpine Renault 1300,1:10,Classic Metal Creations,7305


In [153]:
df_dim_products.printSchema()


root
 |-- MSRP: double (nullable = true)
 |-- buyPrice: double (nullable = true)
 |-- productCode: string (nullable = true)
 |-- productDescription: string (nullable = true)
 |-- productLine: string (nullable = true)
 |-- productName: string (nullable = true)
 |-- productScale: string (nullable = true)
 |-- productVendor: string (nullable = true)
 |-- quantityInStock: integer (nullable = true)



In [156]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# ----------------------------------------------------------------------------------
# Rename the 'productCode' column to 'product_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products.withColumnRenamed("productCode", "product_id")


# ----------------------------------------------------------------------------------
# Add Primary Key column using the SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------

window_spec = Window.orderBy("product_id")
df_dim_products = df_dim_products.withColumn("product_key", row_number().over(window_spec))
# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products.select(
    "product_key",
    "product_id",
    "productName",
    "productLine",
    "productDescription",
    "productScale",
    "productVendor",
    "quantityInStock",
    "buyPrice",
    "MSRP"
)

# Show first 2 rows for verification
df_dim_products.toPandas().head(2)





Unnamed: 0,product_key,product_id,productName,productLine,productDescription,productScale,productVendor,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,"This replica features working kickstand, front...",1:10,Min Lin Diecast,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Turnable front wheels; steering function; deta...,1:10,Classic Metal Creations,7305,98.58,214.3


In [158]:
# Save as dim_products to data lakehouse
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

In [160]:
# unit test: describe and preview the table
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [162]:
# Verify the dimension tables
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,classic_models_dlh,dim_customers,False
1,classic_models_dlh,dim_date,False
2,classic_models_dlh,dim_employees,False
3,classic_models_dlh,dim_productlines,False
4,classic_models_dlh,dim_products,False
5,,customers,True
6,,employees,True


In [164]:
#III. Integrate reference data with real time data 

In [166]:
import shutil
shutil.rmtree("/Users/aryaadeshpande/Desktop/lab_data/checkpoints/orders_silver", ignore_errors=True)



In [168]:
# Delete existing bronze files
import os
import glob

bronze_dir = "/Users/aryaadeshpande/Desktop/spark-warehouse/classic_models_dlh.db/fact_orders/bronze"
for file in glob.glob(os.path.join(bronze_dir, "*.parquet")):
    os.remove(file)



In [170]:
# Use PySpark Structured Streaming to Process (Hot Path) to process orders fact data 
get_file_info(orders_stream_dir)

Unnamed: 0,name,size,modification_time
0,.DS_Store,6148,2025-05-08 04:05:41.836149693
1,classic_orders_1c.json,2068,2025-05-06 00:58:43.275743246
2,classic_orders_2c.json,2071,2025-05-08 04:05:16.723586798
3,classic_orders_3c.json,2155,2025-05-08 04:05:21.069364786


In [172]:
from pyspark.sql.types import *

orders_schema = StructType([
    StructField("orderNumber", LongType(), True),
    StructField("orderDate", StringType(), True),
    StructField("requiredDate", StringType(), True),
    StructField("shippedDate", StringType(), True),
    StructField("status", StringType(), True),
    StructField("comments", StringType(), True),
    StructField("customerNumber", LongType(), True),
    StructField("productCode", StringType(), True),
    StructField("quantityOrdered", IntegerType(), True),
    StructField("priceEach", DoubleType(), True),
    StructField("orderLineNumber", IntegerType(), True),
    StructField("employeeNumber", LongType(), True)
])


In [174]:
# Create the Bronze Layer: Stage Orders Fact table data 
# Read "raw" JSON file in to a stream
df_orders_bronze = (
    spark.readStream
    .option("maxFilesPerTrigger", 3)
    .option("multiLine", "true")
    .option("basePath", orders_stream_dir) \
    .schema(orders_schema)
    .json(orders_stream_dir)
)
orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint_bronze')

bronze_query = df_orders_bronze.writeStream \
    .format("parquet") \
    .option("basePath", orders_output_bronze) \
    .outputMode("append") \
    .option("checkpointLocation", orders_checkpoint_bronze) \
    .start(orders_output_bronze)

df_orders_bronze.isStreaming

True

In [176]:
# Write the streaming data to a parquet file

from pyspark.sql.functions import current_timestamp, input_file_name

orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint_bronze')



orders_bronze_query = (
    df_orders_bronze
    # Add Current Timestamp and Input Filename columns for Traceability
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(orders_output_bronze)
)

In [178]:
import os

bronze_path = "/Users/aryaadeshpande/Desktop/spark-warehouse/classic_models_dlh.db/fact_orders/bronze"
os.listdir(bronze_path)


['_checkpoint_bronze', '_spark_metadata']

In [180]:
from pyspark.sql.functions import to_date, month, year

df_bronze_check = spark.read.format("parquet").load(orders_output_bronze)

# Convert orderDate to proper DateType if needed
df_bronze_check = df_bronze_check.withColumn("orderDateParsed", to_date("orderDate", "yyyy-MM-dd"))

# Get distinct months and years
df_bronze_check.select(month("orderDateParsed").alias("month"), year("orderDateParsed").alias("year")) \
    .distinct() \
    .orderBy("year", "month") \
    .show(20)


+-----+----+
|month|year|
+-----+----+
|    1|2023|
|    5|2023|
|   11|2023|
+-----+----+



In [184]:
# Unit Test: Implement Query Monitoring
print(f"Query ID: {orders_bronze_query.id}")
print(f"Query Name: {orders_bronze_query.name}")
print(f"Query Status: {orders_bronze_query.status}")

Query ID: a74169a4-45c2-4da7-bcb9-6c908af8644d
Query Name: orders_bronze
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


In [188]:
orders_bronze_query.awaitTermination()

In [190]:
from pyspark.sql.functions import expr
from pyspark.sql.types import DateType
from datetime import date, timedelta
from pyspark.sql.functions import col
# Create a pandas range from 2023-01-01 to 2023-12-31
start_date = date(2023, 1, 1)
end_date = date(2023, 12, 31)
date_list = [(start_date + timedelta(days=i)).isoformat() for i in range((end_date - start_date).days + 1)]

df_dim_date = spark.createDataFrame([(d,) for d in date_list], ["order_full_date"]) \
    .withColumn("order_full_date", col("order_full_date").cast(DateType())) \
    .withColumn("order_date_key", expr("CAST(date_format(order_full_date, 'yyyyMMdd') AS BIGINT)"))

# Save it to your warehouse
df_dim_date.select("order_date_key", "order_full_date").write.mode("overwrite").saveAsTable("classic_models_dlh.dim_order_date")


                                                                                

In [191]:
df_dim_order_date = spark.table("classic_models_dlh.dim_order_date")

# # Confirm your date range now includes 2023
# df_dim_order_date.filter("full_date = '2023-05-01'").show()

In [194]:
from pyspark.sql.functions import col

# No need to alias 'order_date_key' again — it's already correctly named
df_dim_order_date = df_dim_date.select(
    col("order_date_key"), 
    col("order_full_date")
)
df_dim_order_date.write.mode("overwrite").saveAsTable("classic_models_dlh.dim_order_date")

# Use correct column names for paid_date and shipped_date
df_dim_paid_date = df_dim_date.select(
    col("order_date_key").alias("paid_date_key"), 
    col("order_full_date").alias("paid_full_date")
)
df_dim_shipped_date = df_dim_date.select(
    col("order_date_key").alias("shipped_date_key"), 
    col("order_full_date").alias("shipped_full_date")
)

# Reload other dimension tables
df_dim_products = spark.table("classic_models_dlh.dim_products")


In [198]:
df_orders_bronze.printSchema()


root
 |-- orderNumber: long (nullable = true)
 |-- orderDate: string (nullable = true)
 |-- requiredDate: string (nullable = true)
 |-- shippedDate: string (nullable = true)
 |-- status: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- customerNumber: long (nullable = true)
 |-- productCode: string (nullable = true)
 |-- quantityOrdered: integer (nullable = true)
 |-- priceEach: double (nullable = true)
 |-- orderLineNumber: integer (nullable = true)
 |-- employeeNumber: long (nullable = true)



In [200]:
from pyspark.sql.functions import broadcast, col, to_date
from pyspark.sql.types import LongType
import os
from pyspark.sql.functions import upper

# -----------------------------------------------------
# STEP 1: Cast join key columns in dimension tables
# -----------------------------------------------------
df_dim_customers = df_dim_customers.withColumn("customer_id", col("customer_id").cast(LongType()))
df_dim_employees = df_dim_employees \
    .withColumn("employee_id", col("employee_id").cast(LongType())) \
    .withColumn("employee_key", col("employee_key").cast(LongType()))
df_dim_products = df_dim_products.withColumn("product_key", col("product_key").cast(LongType()))
# df_dim_order_date is already fine (order_full_date is DateType, order_date_key is LongType)

# -----------------------------------------------------
# STEP 2: Transform the streaming bronze input
# -----------------------------------------------------
df_orders_bronze_transformed = df_orders_bronze.withColumn(
    "orderDateParsed", to_date(col("orderDate"), "yyyy-MM-dd")
)

# -----------------------------------------------------
# STEP 3: Join with all dimension tables using correct keys
# -----------------------------------------------------
# Normalize product codes to uppercase for case-insensitive join
df_orders_silver = df_orders_bronze_transformed \
    .withColumn("productCode_upper", upper(col("productCode"))) \
    .join(broadcast(df_dim_customers), col("customerNumber") == col("customer_id"), "left") \
    .join(broadcast(df_dim_employees), col("employeeNumber") == col("employee_id"), "left") \
    .join(
        broadcast(df_dim_products.withColumn("product_id_upper", upper(col("product_id")))),
        col("productCode_upper") == col("product_id_upper"),
        "left"
    ) \
    .join(broadcast(df_dim_order_date), col("orderDateParsed") == col("order_full_date"), "left") \
    .select(
        col("orderNumber").alias("order_id").cast(LongType()),
        col("orderNumber").alias("order_detail_id").cast(LongType()),
        col("customer_id").cast(LongType()),
        col("employee_key").cast(LongType()),
        col("product_key").cast(LongType()),
        col("order_date_key").cast(LongType()),
        col("orderDateParsed"),  
        col("quantityOrdered").alias("quantity"),
        col("priceEach").alias("unit_price")
    ) \
    .filter(col("product_key").isNotNull())  # Still filters invalid products, but now case-insensitively

# -----------------------------------------------------
# STEP 4: Define output and checkpoint directories
# -----------------------------------------------------
orders_checkpoint_silver = os.path.join(orders_output_silver, "_checkpoint_silver")

# -----------------------------------------------------
# STEP 5: Write Silver output with streaming trigger
# -----------------------------------------------------
orders_silver_query = (
    df_orders_silver.writeStream
    .format("parquet")
    .outputMode("append")
    .option("checkpointLocation", orders_checkpoint_silver)
    .option("compression", "snappy")
    .trigger(availableNow=True)  
    .start(orders_output_silver)
)


In [202]:
df_orders_silver = spark.readStream \
    .format("parquet") \
    .option("basePath", orders_output_bronze) \
    .load(orders_output_bronze)


In [204]:
df_orders_silver.isStreaming

True

In [206]:
df_orders_silver.printSchema()

root
 |-- orderNumber: long (nullable = true)
 |-- orderDate: string (nullable = true)
 |-- requiredDate: string (nullable = true)
 |-- shippedDate: string (nullable = true)
 |-- status: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- customerNumber: long (nullable = true)
 |-- productCode: string (nullable = true)
 |-- quantityOrdered: integer (nullable = true)
 |-- priceEach: double (nullable = true)
 |-- orderLineNumber: integer (nullable = true)
 |-- employeeNumber: long (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



In [212]:
df_silver_check = spark.read.format("parquet").load(orders_output_silver)
df_silver_check.count(), df_silver_check.show(truncate=False)


+--------+---------------+-----------+------------+-----------+--------------+---------------+--------+----------+
|order_id|order_detail_id|customer_id|employee_key|product_key|order_date_key|orderDateParsed|quantity|unit_price|
+--------+---------------+-----------+------------+-----------+--------------+---------------+--------+----------+
|10120   |10120          |112        |17          |7          |20231115      |2023-11-15     |9       |291.35    |
|10110   |10110          |119        |16          |7          |20230115      |2023-01-15     |7       |144.57    |
|10100   |10100          |119        |16          |2          |20230501      |2023-05-01     |2       |53.59     |
+--------+---------------+-----------+------------+-----------+--------------+---------------+--------+----------+



(3, None)

In [213]:
# Write the Transformed Streaming data to the Data Lakehouse

# import shutil
# shutil.rmtree(orders_checkpoint_silver, ignore_errors=True)
import time

orders_checkpoint_silver = os.path.join(orders_output_silver, f'_checkpoint_{int(time.time())}')


orders_silver_query = (
    df_orders_silver.writeStream
    .format("parquet")
    .outputMode("append")
    .option("checkpointLocation", orders_checkpoint_silver)
    .option("compression", "snappy")
    .trigger(availableNow=True)
    .start(orders_output_silver)
)

# Wait for at least one batch to process

while len(orders_silver_query.recentProgress) == 0:
    time.sleep(5)

In [215]:
print(orders_silver_query.status)


{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


In [216]:
df_orders_silver = spark.readStream.format("parquet").load(orders_output_bronze)


In [217]:
print(f"Query ID: {orders_silver_query.id}")
print(f"Query Name: {orders_silver_query.name}")
print(f"Query Status: {orders_silver_query.status}")

Query ID: 57ef4f0d-8368-4e0c-918a-c786fb6091c8
Query Name: None
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


In [222]:
orders_silver_query.awaitTermination()

In [228]:
# Create Gold Layer: Perform Aggregations 

In [230]:
# Define a Query to Create a Business Report 
# Create a new Gold table using the Pyspark API. This code reads streaming order data from a Parquet source and joins it with product and date dimension tables to enrich the dataset. It then aggregates the number of products ordered per month and product line, producing a monthly summary sorted by month and product popularity.

In [286]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, asc, desc, count, month, date_format

df_orders_by_product_category_gold = spark.readStream.format("parquet").load(orders_output_silver) \
    .join(df_dim_products, "product_key") \
    .withColumn("month_of_year", month("orderDateParsed")) \
    .withColumn("month_name", date_format("orderDateParsed", "MMMM")) \
    .groupBy("month_name", "month_of_year", "productLine") \
    .agg(count("*").alias("product_count")) \
    .orderBy("month_of_year", "productLine")



In [288]:
spark.read.parquet(orders_output_silver).count()


3

In [290]:
df_orders_by_product_category_gold.printSchema()

root
 |-- month_name: string (nullable = true)
 |-- month_of_year: integer (nullable = true)
 |-- productLine: string (nullable = true)
 |-- product_count: long (nullable = false)



In [292]:
# Write the streaming data to a parquet file in "Complete" mode
# Stop the existing query if it's still running
for q in spark.streams.active:
    if q.name == "fact_orders_by_product_category":
        q.stop()
orders_gold_query = (
    df_orders_by_product_category_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_orders_by_product_category")
    .start()
)

In [294]:
import time

while len(orders_gold_query.recentProgress) < 1:
    time.sleep(5)

orders_gold_query.processAllAvailable()


In [295]:
# Query the gold data from memory 
df_fact_orders_by_product_category = spark.sql("SELECT * FROM fact_orders_by_product_category")
df_fact_orders_by_product_category.printSchema()

root
 |-- month_name: string (nullable = true)
 |-- month_of_year: integer (nullable = true)
 |-- productLine: string (nullable = true)
 |-- product_count: long (nullable = false)



In [298]:
# Create the final selection
df_fact_orders_by_product_category_gold_final = df_fact_orders_by_product_category \
    .select(
        col("month_name").alias("Month"),
        col("productLine").alias("Product Category"),
        col("product_count").alias("Product Count")
    ) \
    .orderBy(asc("month_of_year"), desc("Product Count"))


In [300]:
# Load the Final Results into a New Table and Display the Results
# Drop the table if it exists
spark.sql(f"DROP TABLE IF EXISTS {dest_database}.fact_orders_by_product_category")

# Delete the associated directory manually
import shutil
import os

table_path = f"/Users/aryaadeshpande/Desktop/spark-warehouse/{dest_database}.db/fact_orders_by_product_category"
if os.path.exists(table_path):
    shutil.rmtree(table_path)
df_fact_orders_by_product_category_gold_final.write.saveAsTable(f"{dest_database}.fact_orders_by_product_category", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_orders_by_product_category").toPandas()

Unnamed: 0,Month,Product Category,Product Count
0,November,Classic Cars,1
1,January,Classic Cars,1
2,May,Classic Cars,1
