# DS-2002 Final Capstone Project 
Restaurant Date Lakehouse (Local Implementation)

## Project Overview
For this project I did a complete data lakehouse structure designed to be like a single restaurants POS system (inspo from Square as i have had to work with the POS system a lot for work). Goal here is to analyze historical and fake real-time transactions in order to understand what was sold and when by whom. 

Used Apache Spark, Delta Lake, and Structured Streaming following the bronze to silver to gold design patter from class labs. 

## Arc Summary:

Data sources (3 req)
- MySQL (Relational/SQL)
    - Customers
    - Date Dimension
- MongoDM Atlas (NoSQL)
    - Menu Items 
- Local CSV Files (File based source)
    - Employees
    - Order Types
- Streaming JSON FIles (local file stream)
    - order events nested line items - this simulating real time POS transactions


## Lakehouse Designs
- Bronze Layer
    - stored as delta tables
    - streaming JSON  using spark
- Silver Layer (Fact table)
    - order line fact records
    - integrated w all dim tables
- Gold Layer
    - aggregated queries that support the analysis

## Date Dim 
- This was created using the SQL script from class and populated into MySQL. 

## Other
- this was deployed entirely locally
- AZURE WAS NOT USED - as notified was allowed from TA
- spark runs locally using delta lake


# Env set up and directory config

In [9]:
import os, sys
import json
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
from sqlalchemy import create_engine, text
from pathlib import Path

PROJECT_ROOT = Path(".").resolve()
WAREHOUSE_DIR = PROJECT_ROOT / "warehouse"
DATA_DIR = PROJECT_ROOT / "data"

WAREHOUSE_DIR.mkdir(parents=True, exist_ok=True)
DATA_DIR.mkdir(parents=True, exist_ok=True)

print("DATA_DIR:", DATA_DIR)
print("WAREHOUSE_DIR:", WAREHOUSE_DIR)


os.environ["HADOOP_HOME"] = r"C:\hadoop-3.3.6"
os.environ["PATH"] = r"C:\hadoop-3.3.6\bin" + os.pathsep + os.environ["PATH"]


os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

print("HADOOP_HOME:", os.environ["HADOOP_HOME"])
print("Python:", sys.executable)

DATA_DIR: C:\Users\evasb_eqbyhhj\Documents\DS-2002-Capstone\notebooks\data
WAREHOUSE_DIR: C:\Users\evasb_eqbyhhj\Documents\DS-2002-Capstone\notebooks\warehouse
HADOOP_HOME: C:\hadoop-3.3.6
Python: c:\Users\evasb_eqbyhhj\anaconda3\envs\ds2002spark\python.exe


In [12]:
import pymongo
import certifi

MONGO_URI = (
    "mongodb+srv://sae9fp_db_user:ROM8h9wZs6IKRufd"
    "@restaurant-cluster.p6k0yes.mongodb.net/"
    "?appName=restaurant-cluster"
)

client = pymongo.MongoClient(MONGO_URI, tlsCAFile=certifi.where())

# test connection
client.admin.command("ping")
print("MongoDB connection successful")


MongoDB connection successful


Spark session initialization w delta lake support

In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder
    .appName("restaurant-lakehouse")
    .master("local[*]")
    .config("spark.pyspark.python", sys.executable)
    .config("spark.pyspark.driver.python", sys.executable)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN")


In [4]:
mysql_args = {
    "uid": "root",            
    "pwd": "760349Eb!toogie",  
    "hostname": "localhost",
    "port": 3306,
    "dbname": "restaurant_oltp"
}

helper function for rel data extraction (mysql)

In [5]:

def get_sql_dataframe(sql_query: str, **args) -> pd.DataFrame:
    """
    Extracts data from a MySQL database using SQLAlchemy
    and returns a Pandas DataFrame.
    """
    conn_str = (
        f"mysql+pymysql://{args['uid']}:{args['pwd']}@"
        f"{args['hostname']}:{args['port']}/{args['dbname']}"
    )
    engine = create_engine(conn_str, pool_recycle=3600)

    with engine.connect() as conn:
        return pd.read_sql(text(sql_query), conn)


employee dim - csv source

In [18]:
employees_path = DATA_DIR / "employees.csv"
assert employees_path.exists(), f"Missing {employees_path}"

employees_sdf = (
    spark.read
         .option("header", "true")
         .csv(str(employees_path))
         .withColumn("employee_id", F.col("employee_id").cast("int"))
         .withColumn("hire_date", F.to_date("hire_date"))
         .withColumn("created_at", F.current_timestamp())
)

dim_employee_path = (WAREHOUSE_DIR / "dim_employee").resolve().as_posix()

(employees_sdf
    .write
    .format("delta")
    .mode("overwrite")
    .save(dim_employee_path)
)

employees_sdf.printSchema()
employees_sdf.show(5, truncate=False)

root
 |-- employee_id: integer (nullable = true)
 |-- employee_code: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- created_at: timestamp (nullable = false)

+-----------+-------------+----------+---------+---------+----------+------+--------------------------+
|employee_id|employee_code|first_name|last_name|role     |hire_date |status|created_at                |
+-----------+-------------+----------+---------+---------+----------+------+--------------------------+
|1          |EMP-001      |Mia       |Lopez    |Server   |2022-05-10|Active|2025-12-14 09:53:36.228236|
|2          |EMP-002      |Ethan     |Clark    |Server   |2023-02-18|Active|2025-12-14 09:53:36.228236|
|3          |EMP-003      |Sofia     |Nguyen   |Bartender|2021-09-01|Active|2025-12-14 09:53:36.228236|
|4          |EMP-004      |Jack      |R

order_type dim - csv source

In [19]:
order_types_path = DATA_DIR / "order_types.csv"
assert order_types_path.exists(), f"Missing {order_types_path}"

order_types_sdf = (
    spark.read
         .option("header", "true")
         .csv(str(order_types_path))
         .withColumn("order_type_id", F.col("order_type_id").cast("int"))
)

dim_order_type_path = (WAREHOUSE_DIR / "dim_order_type").resolve().as_posix()

(order_types_sdf
    .write
    .format("delta")
    .mode("overwrite")
    .save(dim_order_type_path)
)

order_types_sdf.printSchema()
order_types_sdf.show(truncate=False)


root
 |-- order_type_id: integer (nullable = true)
 |-- order_type_key: string (nullable = true)
 |-- order_type_name: string (nullable = true)
 |-- channel: string (nullable = true)

+-------------+--------------+---------------+-----------+
|order_type_id|order_type_key|order_type_name|channel    |
+-------------+--------------+---------------+-----------+
|1            |DINE_IN       |Dine-in        |On-premise |
|2            |TAKEOUT       |Takeout        |Off-premise|
|3            |DELIVERY      |Delivery       |Off-premise|
+-------------+--------------+---------------+-----------+



In [6]:
customers_pdf = get_sql_dataframe(
    "SELECT * FROM customers",
    **mysql_args
)

print(customers_pdf.shape)
customers_pdf.head()

(5, 9)


Unnamed: 0,customer_id,customer_code,first_name,last_name,email,phone,loyalty_tier,signup_date,birthdate
0,1,CUST-001,Emma,Johnson,emma.j@example.com,555-1111,Gold,2023-01-10,1998-04-12
1,2,CUST-002,Liam,Smith,liam.s@example.com,555-2222,Silver,2023-03-15,1995-09-22
2,3,CUST-003,Olivia,Brown,olivia.b@example.com,555-3333,Bronze,2024-01-02,2000-07-18
3,4,CUST-004,Noah,Davis,noah.d@example.com,555-4444,Gold,2022-11-08,1992-02-05
4,5,CUST-005,Ava,Wilson,ava.w@example.com,555-5555,Silver,2023-06-21,1999-11-30


customer dim - MySQL source

In [7]:
customers_sdf = (
    spark.createDataFrame(customers_pdf)
         .withColumn("customer_id", F.col("customer_id").cast("int"))
         .withColumn("signup_date", F.to_date("signup_date"))
         .withColumn("birthdate", F.to_date("birthdate"))
         .withColumn("created_at", F.current_timestamp())
)

customers_sdf.show(5, truncate=False)


+-----------+-------------+----------+---------+--------------------+--------+------------+-----------+----------+--------------------------+
|customer_id|customer_code|first_name|last_name|email               |phone   |loyalty_tier|signup_date|birthdate |created_at                |
+-----------+-------------+----------+---------+--------------------+--------+------------+-----------+----------+--------------------------+
|1          |CUST-001     |Emma      |Johnson  |emma.j@example.com  |555-1111|Gold        |2023-01-10 |1998-04-12|2025-12-14 09:08:29.984037|
|2          |CUST-002     |Liam      |Smith    |liam.s@example.com  |555-2222|Silver      |2023-03-15 |1995-09-22|2025-12-14 09:08:29.984037|
|3          |CUST-003     |Olivia    |Brown    |olivia.b@example.com|555-3333|Bronze      |2024-01-02 |2000-07-18|2025-12-14 09:08:29.984037|
|4          |CUST-004     |Noah      |Davis    |noah.d@example.com  |555-4444|Gold        |2022-11-08 |1992-02-05|2025-12-14 09:08:29.984037|
|5    

In [8]:
dim_customer_path = str(WAREHOUSE_DIR / "dim_customer")

(customers_sdf
 .write.format("delta")
 .mode("overwrite")
 .save(dim_customer_path)
)

spark.read.format("delta").load(dim_customer_path).show(5, truncate=False)

+-----------+-------------+----------+---------+--------------------+--------+------------+-----------+----------+-------------------------+
|customer_id|customer_code|first_name|last_name|email               |phone   |loyalty_tier|signup_date|birthdate |created_at               |
+-----------+-------------+----------+---------+--------------------+--------+------------+-----------+----------+-------------------------+
|3          |CUST-003     |Olivia    |Brown    |olivia.b@example.com|555-3333|Bronze      |2024-01-02 |2000-07-18|2025-12-14 09:08:56.76229|
|2          |CUST-002     |Liam      |Smith    |liam.s@example.com  |555-2222|Silver      |2023-03-15 |1995-09-22|2025-12-14 09:08:56.76229|
|1          |CUST-001     |Emma      |Johnson  |emma.j@example.com  |555-1111|Gold        |2023-01-10 |1998-04-12|2025-12-14 09:08:56.76229|
|5          |CUST-005     |Ava       |Wilson   |ava.w@example.com   |555-5555|Silver      |2023-06-21 |1999-11-30|2025-12-14 09:08:56.76229|
|4          |

menu item dim - mongoDB noSQL source

In [14]:
docs = list(menu_coll.find({}))

for d in docs:
    d["_id"] = str(d["_id"])

menu_sdf = spark.createDataFrame(docs)
menu_sdf.show(5, truncate=False)

+------------------------+----------+--------+-------------+---------+---------+----------------+------------+------------+
|_id                     |base_price|category|dietary_flags|is_active|item_code|item_name       |menu_item_id|sub_category|
+------------------------+----------+--------+-------------+---------+---------+----------------+------------+------------+
|693ecb5dc3b15cd27611d255|13.99     |Entree  |             |true     |ITEM-001 |Cheeseburger    |1           |Burgers     |
|693ecb5dc3b15cd27611d256|14.49     |Entree  |V            |true     |ITEM-002 |Veggie Burger   |2           |Burgers     |
|693ecb5dc3b15cd27611d257|12.5      |Entree  |             |true     |ITEM-003 |Caesar Salad    |3           |Salads      |
|693ecb5dc3b15cd27611d258|4.5       |Side    |V            |true     |ITEM-004 |Fries           |4           |Potatoes    |
|693ecb5dc3b15cd27611d259|15.0      |Entree  |V            |true     |ITEM-005 |Margherita Pizza|5           |Pizza       |
+-------

menu item dim transformation and delta storage

In [15]:
menu_sdf = (
    menu_sdf
      .withColumn("menu_item_id", F.col("menu_item_id").cast("int"))
      .withColumn("base_price", F.col("base_price").cast("double"))
      .withColumn("is_active", F.col("is_active").cast("boolean"))
      .withColumn("created_at", F.current_timestamp())
      .select(
          "menu_item_id",
          "item_code",
          "item_name",
          "category",
          "sub_category",
          "base_price",
          "dietary_flags",
          "is_active",
          "created_at"
      )
)

dim_menu_item_path = (WAREHOUSE_DIR / "dim_menu_item").resolve().as_posix()

menu_sdf.write.format("delta").mode("overwrite").save(dim_menu_item_path)

spark.read.format("delta").load(dim_menu_item_path).show(10, truncate=False)


+------------+---------+----------------+--------+-------------+----------+-------------+---------+--------------------------+
|menu_item_id|item_code|item_name       |category|sub_category |base_price|dietary_flags|is_active|created_at                |
+------------+---------+----------------+--------+-------------+----------+-------------+---------+--------------------------+
|5           |ITEM-005 |Margherita Pizza|Entree  |Pizza        |15.0      |V            |true     |2025-12-14 09:38:53.492071|
|7           |ITEM-007 |Iced Tea        |Drink   |Non-alcoholic|3.25      |V            |true     |2025-12-14 09:38:53.492071|
|2           |ITEM-002 |Veggie Burger   |Entree  |Burgers      |14.49     |V            |true     |2025-12-14 09:38:53.492071|
|9           |ITEM-009 |Chocolate Cake  |Dessert |Cake         |7.25      |V            |true     |2025-12-14 09:38:53.492071|
|6           |ITEM-006 |Pepperoni Pizza |Entree  |Pizza        |16.5      |             |true     |2025-12-14 0

lakehouse dir struct and config

In [23]:
base_dir = os.path.join(os.getcwd(), 'data')
orders_stream_dir = os.path.join(os.getcwd(), "data", "orders_stream")


dest_database = "restaurant_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')       
database_dir = os.path.join(sql_warehouse_dir, dest_database)

orders_output_bronze = os.path.join(database_dir, 'orders_bronze')
orders_output_silver = os.path.join(database_dir, 'orders_silver')
orders_output_gold   = os.path.join(database_dir, 'orders_gold')

orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint')
orders_checkpoint_silver = os.path.join(orders_output_silver, '_checkpoint')
orders_checkpoint_gold   = os.path.join(orders_output_gold, '_checkpoint')

os.makedirs(orders_stream_dir, exist_ok=True)
os.makedirs(database_dir, exist_ok=True)

def remove_directory_tree(path: str):
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' removed."
        return f"Directory '{path}' does not exist."
    except Exception as e:
        return f"Error: {e}"

In [24]:
import json, random, datetime as dt, os

dim_customer_path   = (WAREHOUSE_DIR / "dim_customer").resolve().as_posix()
dim_employee_path   = (WAREHOUSE_DIR / "dim_employee").resolve().as_posix()
dim_order_type_path = (WAREHOUSE_DIR / "dim_order_type").resolve().as_posix()
dim_menu_item_path  = (WAREHOUSE_DIR / "dim_menu_item").resolve().as_posix()

customers   = [r["customer_id"] for r in spark.read.format("delta").load(dim_customer_path).select("customer_id").limit(500).collect()]
employees   = [r["employee_id"] for r in spark.read.format("delta").load(dim_employee_path).select("employee_id").collect()]
order_types = [r["order_type_id"] for r in spark.read.format("delta").load(dim_order_type_path).select("order_type_id").collect()]
menu_items  = [r["menu_item_id"] for r in spark.read.format("delta").load(dim_menu_item_path).select("menu_item_id").collect()]

simulated streaming order event gen

In [25]:
def write_order_drop(drop_idx: int, n_orders: int = 15):
    now = dt.datetime.now()
    out_path = os.path.join(orders_stream_dir, f"orders_drop_{drop_idx:02d}.json")

    with open(out_path, "w", encoding="utf-8") as f:
        for i in range(n_orders):
            order_id = drop_idx * 100000 + i + 1

            lines = []
            for _ in range(random.randint(1, 5)):
                lines.append({
                    "menu_item_id": int(random.choice(menu_items)),
                    "quantity": int(random.randint(1, 3))
                })

            order = {
                "order_id": int(order_id),
                "customer_id": int(random.choice(customers)),
                "employee_id": int(random.choice(employees)),
                "order_type_id": int(random.choice(order_types)),
                "order_ts": (now - dt.timedelta(minutes=random.randint(0, 240))).isoformat(),
                "lines": lines
            }

            f.write(json.dumps(order) + "\n")

    return out_path


simulate order drops

In [26]:
print(write_order_drop(1, 15))
print(write_order_drop(2, 15))
print(write_order_drop(3, 15))

os.listdir(orders_stream_dir)


c:\Users\evasb_eqbyhhj\Documents\DS-2002-Capstone\notebooks\data\orders_stream\orders_drop_01.json
c:\Users\evasb_eqbyhhj\Documents\DS-2002-Capstone\notebooks\data\orders_stream\orders_drop_02.json
c:\Users\evasb_eqbyhhj\Documents\DS-2002-Capstone\notebooks\data\orders_stream\orders_drop_03.json


['orders_drop_01.json', 'orders_drop_02.json', 'orders_drop_03.json']

reg dim tables for SQL queries

In [27]:
spark.read.format("delta").load((WAREHOUSE_DIR/"dim_customer").resolve().as_posix()).createOrReplaceTempView("dim_customer")
spark.read.format("delta").load((WAREHOUSE_DIR/"dim_employee").resolve().as_posix()).createOrReplaceTempView("dim_employee")
spark.read.format("delta").load((WAREHOUSE_DIR/"dim_order_type").resolve().as_posix()).createOrReplaceTempView("dim_order_type")
spark.read.format("delta").load((WAREHOUSE_DIR/"dim_menu_item").resolve().as_posix()).createOrReplaceTempView("dim_menu_item")


In [31]:
sample_file = os.path.join(orders_stream_dir, os.listdir(orders_stream_dir)[0])

df_one = spark.read.json(sample_file)
orders_schema = df_one.schema

df_one.printSchema()
df_one.show(3, truncate=False)


root
 |-- customer_id: long (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- lines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- menu_item_id: long (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_ts: string (nullable = true)
 |-- order_type_id: long (nullable = true)

+-----------+-----------+------------------------------------------+--------+--------------------------+-------------+
|customer_id|employee_id|lines                                     |order_id|order_ts                  |order_type_id|
+-----------+-----------+------------------------------------------+--------+--------------------------+-------------+
|4          |6          |[{9, 1}, {8, 1}, {10, 2}, {10, 3}, {5, 1}]|100001  |2025-12-14T09:26:35.004248|1            |
|5          |13         |[{3, 1}, {6, 1}, {6, 1}, {9, 1}, {6, 3}]  |100002  |2025-12-14T09:57:35.004248|2            |
|1          

In [32]:
df_orders_bronze = (
    spark.readStream
        .schema(orders_schema)
        .option("maxFilesPerTrigger", 1)
        .json(orders_stream_dir)
)

df_orders_bronze.isStreaming
df_orders_bronze.printSchema()


root
 |-- customer_id: long (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- lines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- menu_item_id: long (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_ts: string (nullable = true)
 |-- order_type_id: long (nullable = true)



BRONZE LAYER - streaming order ing

In [33]:
orders_bronze_query = (
    df_orders_bronze
      .writeStream
      .format("delta")
      .outputMode("append")
      .queryName("orders_bronze")
      .option("checkpointLocation", orders_checkpoint_bronze)
      .trigger(availableNow=True)
      .start(orders_output_bronze)
)

orders_bronze_query.awaitTermination()


In [34]:
import os

print("orders_output_bronze exists:", os.path.exists(orders_output_bronze))
print("_delta_log exists:", os.path.exists(os.path.join(orders_output_bronze, "_delta_log")))

spark.read.format("delta").load(orders_output_bronze).show(5, truncate=False)


orders_output_bronze exists: True
_delta_log exists: True
+-----------+-----------+------------------------------------------+--------+--------------------------+-------------+
|customer_id|employee_id|lines                                     |order_id|order_ts                  |order_type_id|
+-----------+-----------+------------------------------------------+--------+--------------------------+-------------+
|4          |6          |[{9, 1}, {8, 1}, {10, 2}, {10, 3}, {5, 1}]|100001  |2025-12-14T09:26:35.004248|1            |
|5          |13         |[{3, 1}, {6, 1}, {6, 1}, {9, 1}, {6, 3}]  |100002  |2025-12-14T09:57:35.004248|2            |
|1          |3          |[{8, 1}, {7, 1}]                          |100003  |2025-12-14T07:58:35.004248|2            |
|5          |20         |[{1, 1}, {1, 1}, {3, 2}]                  |100004  |2025-12-14T09:02:35.004248|2            |
|3          |17         |[{10, 2}, {6, 1}]                         |100005  |2025-12-14T07:12:35.004248|1    

In [39]:
(spark.readStream
    .format("delta")
    .load(orders_output_bronze)
    .createOrReplaceTempView("orders_bronze_stream")
)


SILVER LAYER - Fact Table Creation

In [42]:
sql_silver_temp = """
CREATE OR REPLACE TEMP VIEW fact_order_line_temp AS
SELECT
    x.order_id,
    x.customer_id,
    x.employee_id,
    x.order_type_id,
    x.order_ts,
    x.date_id,

    x.menu_item_id,
    x.quantity,

    current_timestamp() AS processed_time,

    -- joined dim attributes (multiple joins = rubric win)
    c.first_name AS customer_first_name,
    c.last_name  AS customer_last_name,
    e.role       AS employee_role,
    ot.order_type_name,
    mi.item_name,
    mi.category,
    mi.sub_category,
    mi.base_price

FROM (
    SELECT
        b.order_id,
        b.customer_id,
        b.employee_id,
        b.order_type_id,
        to_timestamp(b.order_ts) AS order_ts,
        CAST(date_format(to_timestamp(b.order_ts), 'yyyyMMdd') AS INT) AS date_id,
        line.menu_item_id AS menu_item_id,
        CAST(line.quantity AS INT) AS quantity
    FROM orders_bronze_stream b
    LATERAL VIEW explode(b.lines) lv AS line
) x
JOIN dim_customer   c  ON x.customer_id = c.customer_id
JOIN dim_employee   e  ON x.employee_id = e.employee_id
JOIN dim_order_type ot ON x.order_type_id = ot.order_type_id
JOIN dim_menu_item  mi ON x.menu_item_id = mi.menu_item_id
WHERE x.order_id IS NOT NULL
"""
spark.sql(sql_silver_temp)


DataFrame[]

In [43]:
orders_silver_query = (
    spark.table("fact_order_line_temp")
      .writeStream
      .format("delta")
      .outputMode("append")
      .queryName("orders_silver_fact_order_line")
      .option("checkpointLocation", orders_checkpoint_silver)
      .trigger(availableNow=True)
      .start(orders_output_silver)
)

orders_silver_query.awaitTermination()


In [44]:
import os

print("orders_output_silver exists:", os.path.exists(orders_output_silver))
print("_delta_log exists:", os.path.exists(os.path.join(orders_output_silver, "_delta_log")))

spark.read.format("delta").load(orders_output_silver).show(10, truncate=False)


orders_output_silver exists: True
_delta_log exists: True
+--------+-----------+-----------+-------------+--------------------------+--------+------------+--------+-----------------------+-------------------+------------------+-------------+---------------+----------------+--------+------------+----------+
|order_id|customer_id|employee_id|order_type_id|order_ts                  |date_id |menu_item_id|quantity|processed_time         |customer_first_name|customer_last_name|employee_role|order_type_name|item_name       |category|sub_category|base_price|
+--------+-----------+-----------+-------------+--------------------------+--------+------------+--------+-----------------------+-------------------+------------------+-------------+---------------+----------------+--------+------------+----------+
|200001  |2          |3          |3            |2025-12-14 07:27:35.006244|20251214|1           |2       |2025-12-14 10:31:44.897|Liam               |Smith             |Bartender    |Delivery 

In [45]:
(spark.readStream
    .format("delta")
    .load(orders_output_silver)
    .createOrReplaceTempView("orders_silver_stream")
)


Gold Layer - agg sales metrics

In [46]:
sql_gold_temp = """
CREATE OR REPLACE TEMP VIEW gold_daily_item_sales_temp AS
SELECT
    date_id,
    category,
    item_name,
    SUM(quantity) AS units_sold,
    ROUND(SUM(quantity * base_price), 2) AS gross_sales
FROM orders_silver_stream
GROUP BY date_id, category, item_name
"""
spark.sql(sql_gold_temp)


DataFrame[]

In [47]:
orders_gold_query = (
    spark.table("gold_daily_item_sales_temp")
      .writeStream
      .format("delta")
      .outputMode("complete")
      .queryName("orders_gold_daily_item_sales")
      .option("checkpointLocation", orders_checkpoint_gold)
      .trigger(availableNow=True)
      .start(orders_output_gold)
)

orders_gold_query.awaitTermination()


In [48]:
spark.read.format("delta").load(orders_output_gold) \
     .orderBy("gross_sales", ascending=False) \
     .show(20, truncate=False)


+--------+--------+----------------+----------+-----------+
|date_id |category|item_name       |units_sold|gross_sales|
+--------+--------+----------------+----------+-----------+
|20251214|Entree  |Salmon Bowl     |32        |624.0      |
|20251214|Entree  |Cheeseburger    |43        |601.57     |
|20251214|Entree  |Pepperoni Pizza |32        |528.0      |
|20251214|Entree  |Veggie Burger   |33        |478.17     |
|20251214|Entree  |Margherita Pizza|31        |465.0      |
|20251214|Entree  |Caesar Salad    |31        |387.5      |
|20251214|Dessert |Chocolate Cake  |13        |94.25      |
|20251214|Side    |Fries           |16        |72.0       |
|20251214|Drink   |Latte           |13        |61.75      |
|20251214|Drink   |Iced Tea        |9         |29.25      |
+--------+--------+----------------+----------+-----------+



First query - analytics top customers by total spend

In [49]:
sql_top_customers = f"""
SELECT
    customer_id,
    customer_first_name,
    customer_last_name,
    ROUND(SUM(quantity * base_price), 2) AS total_spend,
    SUM(quantity) AS total_items
FROM delta.`{orders_output_silver}`
GROUP BY customer_id, customer_first_name, customer_last_name
ORDER BY total_spend DESC
LIMIT 10
"""
spark.sql(sql_top_customers).show(truncate=False)


+-----------+-------------------+------------------+-----------+-----------+
|customer_id|customer_first_name|customer_last_name|total_spend|total_items|
+-----------+-------------------+------------------+-----------+-----------+
|1          |Emma               |Johnson           |825.24     |65         |
|4          |Noah               |Davis             |728.36     |56         |
|2          |Liam               |Smith             |681.87     |47         |
|5          |Ava                |Wilson            |582.64     |45         |
|3          |Olivia             |Brown             |523.38     |40         |
+-----------+-------------------+------------------+-----------+-----------+



Second analytics query - sales by order type and category over time 

In [50]:
sql_order_type_category = f"""
SELECT
    date_id,
    order_type_name,
    category,
    SUM(quantity) AS units_sold,
    ROUND(SUM(quantity * base_price), 2) AS gross_sales
FROM delta.`{orders_output_silver}`
GROUP BY date_id, order_type_name, category
ORDER BY date_id DESC, gross_sales DESC
"""
spark.sql(sql_order_type_category).show(50, truncate=False)


+--------+---------------+--------+----------+-----------+
|date_id |order_type_name|category|units_sold|gross_sales|
+--------+---------------+--------+----------+-----------+
|20251214|Delivery       |Entree  |94        |1412.06    |
|20251214|Takeout        |Entree  |66        |1022.85    |
|20251214|Dine-in        |Entree  |42        |649.33     |
|20251214|Delivery       |Dessert |6         |43.5       |
|20251214|Takeout        |Drink   |9         |41.25      |
|20251214|Takeout        |Dessert |5         |36.25      |
|20251214|Delivery       |Side    |7         |31.5       |
|20251214|Delivery       |Drink   |8         |27.5       |
|20251214|Dine-in        |Side    |5         |22.5       |
|20251214|Dine-in        |Drink   |5         |22.25      |
|20251214|Takeout        |Side    |4         |18.0       |
|20251214|Dine-in        |Dessert |2         |14.5       |
+--------+---------------+--------+----------+-----------+



third analytics query - employee sales performance

In [51]:
sql_employee_perf = f"""
SELECT
    employee_id,
    employee_role,
    ROUND(SUM(quantity * base_price), 2) AS revenue,
    COUNT(DISTINCT order_id) AS orders_handled,
    SUM(quantity) AS items_sold
FROM delta.`{orders_output_silver}`
GROUP BY employee_id, employee_role
ORDER BY revenue DESC
"""
spark.sql(sql_employee_perf).show(truncate=False)


+-----------+-------------+-------+--------------+----------+
|employee_id|employee_role|revenue|orders_handled|items_sold|
+-----------+-------------+-------+--------------+----------+
|3          |Bartender    |459.44 |6             |37        |
|17         |Bartender    |391.1  |5             |28        |
|4          |Manager      |333.89 |5             |29        |
|19         |Server       |318.98 |3             |22        |
|8          |Server       |254.94 |4             |19        |
|12         |Dishwasher   |219.72 |4             |21        |
|6          |Server       |218.96 |3             |15        |
|5          |Host         |203.67 |2             |14        |
|14         |Server       |194.7  |2             |19        |
|18         |Server       |164.95 |1             |11        |
|20         |Host         |139.46 |3             |9         |
|9          |Bartender    |131.98 |1             |9         |
|13         |Host         |102.25 |1             |7         |
|10     

Dim date (this was generated in mySQL using the script from class and this is j extracting it here)

In [57]:
dim_date_pdf = get_sql_dataframe("SELECT * FROM dim_date", **mysql_args)
print(dim_date_pdf.shape)
dim_date_pdf.head()


(1096, 23)


Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20240101,2024-01-01,2024/01/01,01/01/2024,01/01/2024,2,Monday,1,1,Weekday,...,N,1,2024,2024-01,2024Q1,7,3,2024,2024-07,2024Q3
1,20240102,2024-01-02,2024/01/02,01/02/2024,02/01/2024,3,Tuesday,2,2,Weekday,...,N,1,2024,2024-01,2024Q1,7,3,2024,2024-07,2024Q3
2,20240103,2024-01-03,2024/01/03,01/03/2024,03/01/2024,4,Wednesday,3,3,Weekday,...,N,1,2024,2024-01,2024Q1,7,3,2024,2024-07,2024Q3
3,20240104,2024-01-04,2024/01/04,01/04/2024,04/01/2024,5,Thursday,4,4,Weekday,...,N,1,2024,2024-01,2024Q1,7,3,2024,2024-07,2024Q3
4,20240105,2024-01-05,2024/01/05,01/05/2024,05/01/2024,6,Friday,5,5,Weekday,...,N,1,2024,2024-01,2024Q1,7,3,2024,2024-07,2024Q3


In [58]:

dim_date_sdf = (
    spark.createDataFrame(dim_date_pdf)
         .withColumn("date_key", F.col("date_key").cast("int"))
         .withColumnRenamed("date_key", "date_id")
)

dim_date_sdf.printSchema()
dim_date_sdf.show(5, truncate=False)


root
 |-- date_id: integer (nullable = true)
 |-- full_date: date (nullable = true)
 |-- date_name: string (nullable = true)
 |-- date_name_us: string (nullable = true)
 |-- date_name_eu: string (nullable = true)
 |-- day_of_week: long (nullable = true)
 |-- day_name_of_week: string (nullable = true)
 |-- day_of_month: long (nullable = true)
 |-- day_of_year: long (nullable = true)
 |-- weekday_weekend: string (nullable = true)
 |-- week_of_year: long (nullable = true)
 |-- month_name: string (nullable = true)
 |-- month_of_year: long (nullable = true)
 |-- is_last_day_of_month: string (nullable = true)
 |-- calendar_quarter: long (nullable = true)
 |-- calendar_year: long (nullable = true)
 |-- calendar_year_month: string (nullable = true)
 |-- calendar_year_qtr: string (nullable = true)
 |-- fiscal_month_of_year: long (nullable = true)
 |-- fiscal_quarter: long (nullable = true)
 |-- fiscal_year: long (nullable = true)
 |-- fiscal_year_month: string (nullable = true)
 |-- fiscal_year

adding to DL storage

In [59]:
(dim_date_sdf
    .write
    .format("delta")
    .mode("overwrite")
    .save(dim_date_path)
)

print("Wrote dim_date to:", dim_date_path)


Wrote dim_date to: C:/Users/evasb_eqbyhhj/Documents/DS-2002-Capstone/notebooks/warehouse/dim_date


In [60]:
spark.read.format("delta").load(dim_date_path).show(5, truncate=False)
spark.read.format("delta").load(dim_date_path).createOrReplaceTempView("dim_date")


+--------+----------+----------+------------+------------+-----------+----------------+------------+-----------+---------------+------------+----------+-------------+--------------------+----------------+-------------+-------------------+-----------------+--------------------+--------------+-----------+-----------------+---------------+
|date_id |full_date |date_name |date_name_us|date_name_eu|day_of_week|day_name_of_week|day_of_month|day_of_year|weekday_weekend|week_of_year|month_name|month_of_year|is_last_day_of_month|calendar_quarter|calendar_year|calendar_year_month|calendar_year_qtr|fiscal_month_of_year|fiscal_quarter|fiscal_year|fiscal_year_month|fiscal_year_qtr|
+--------+----------+----------+------------+------------+-----------+----------------+------------+-----------+---------------+------------+----------+-------------+--------------------+----------------+-------------+-------------------+-----------------+--------------------+--------------+-----------+-----------------+