# DS-2002 — Data Project 2 (Capstone)

## Project Overview (Local PySpark Lakehouse)

This notebook implements the Project 2 capstone **locally** using **PySpark** and implements the “Bronze → Silver → Gold” lakehouse architecture with both **batch** and **streaming** ingestion.

### Business process modeled
- **Retail sales transactions** (Fact table grain: one row per order line)

### Dimensional model (Gold)
- **Dimensions**:
  - `dim_date` (from local CSV)
  - `dim_customer` (from MySQL `src_customers`)
  - `dim_product` (from MySQL `src_products`)
  - `dim_category` (derived from product attributes)
- **Facts**:
  - `fact_sales` (streaming-conformed fact)
  - `fact_sales_margin` (gross margin metric derived using MongoDB costs)

### Required source systems (explicitly demonstrated)
- **Relational (SQL)**: **MySQL** accessed via **Spark JDBC** (`src_customers`, `src_products`)
- **NoSQL**: **MongoDB (local or Atlas)** accessed via `pymongo` (`product_costs`)
- **File system**: local CSVs for dates and sales; local JSON files for streaming intervals

### Required processing behaviors (local equivalents)
- **Batch execution + incremental load**: initial batch load plus an explicit incremental batch append (row counts increase)
- **Streaming mini-batches (3 intervals)**: sales fact data is segmented into **3 JSON drops** and ingested using **Structured Streaming** (local equivalent of Databricks AutoLoader)
- **Silver integration**: streaming facts are joined to static dimensions in the **Silver** layer prior to publishing Gold tables

### Outputs
- Lakehouse tables are written under `project2_lakehouse/{bronze,silver,gold}/`.
- Streaming inputs/checkpoints are under `project2_stream/`.

---



In [25]:
import os
import json
import shutil
from pathlib import Path

import pandas as pd
import pymongo
import certifi

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [26]:
BASE_DIR = Path.cwd()
DATA_DIR = BASE_DIR / "data"

LAKEHOUSE_DIR = BASE_DIR / "project2_lakehouse"
BRONZE_DIR = LAKEHOUSE_DIR / "bronze"
SILVER_DIR = LAKEHOUSE_DIR / "silver"
GOLD_DIR   = LAKEHOUSE_DIR / "gold"

STREAM_DIR = BASE_DIR / "project2_stream"
INCOMING_DIR = STREAM_DIR / "incoming"
CHECKPOINT_DIR = STREAM_DIR / "checkpoints" / "bronze_sales"

# Source files reused from Project 1
CUSTOMERS_CSV = DATA_DIR / "customers.csv"
PRODUCTS_CSV  = DATA_DIR / "products.csv"
DATES_CSV     = DATA_DIR / "dates.csv"
SALES_CSV     = DATA_DIR / "sales.csv"
PRODUCT_COSTS_JSON = DATA_DIR / "product_costs.json"

USE_MYSQL = True
USE_MONGO = True

STRICT_SOURCES = True

MYSQL = {
    "host": "localhost",
    "port": 3306,
    "database": "northwind_dw2",
    "user": "root",
    "password": "thevidu",
    "customers_table": "src_customers",
    "products_table": "src_products",
}


MONGO = {
    "uri": "mongodb://localhost:27017",
    "database": "northwind_purchasing",
    "collection": "product_costs",
}

JDBC_URL = f"jdbc:mysql://{MYSQL['host']}:{MYSQL['port']}/{MYSQL['database']}"


## 1) Start Spark (local)

This uses a local Spark session. Storage format is **Parquet** to keep setup simple and aligned with Lab 6.


In [27]:
import shutil
import subprocess

try:
    spark
    spark.stop()
except Exception:
    pass

try:
    from pyspark import SparkContext

    if SparkContext._active_spark_context is not None:
        SparkContext._active_spark_context.stop()

    # Clear stale gateway references so getOrCreate can launch a fresh JVM
    SparkContext._active_spark_context = None
    SparkContext._gateway = None
    SparkContext._jvm = None
except Exception:
    pass

builder = (
    SparkSession.builder
    .appName("DS-2002-Project-2-Local-PySpark")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", str((BASE_DIR / "spark-warehouse").resolve()))
    .config("spark.sql.shuffle.partitions", "4")
)

builder = builder.config("spark.jars.packages", "com.mysql:mysql-connector-j:9.1.0")
print("Using spark.jars.packages for MySQL JDBC: com.mysql:mysql-connector-j:9.1.0")

spark = builder.getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("Spark version:", spark.version)
print("java:", java_path)


Using spark.jars.packages for MySQL JDBC: com.mysql:mysql-connector-j:9.1.0
:: loading settings :: url = jar:file:/Users/collintogher/Documents/ds-2002-proj-1/.venv311/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/collintogher/.ivy2/cache
The jars for the packages stored in: /Users/collintogher/.ivy2/jars
com.mysql#mysql-connector-j added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-64079151-1937-43bc-ad06-ef35d4126f18;1.0
	confs: [default]
	found com.mysql#mysql-connector-j;9.1.0 in central
	found com.google.protobuf#protobuf-java;4.26.1 in central
:: resolution report :: resolve 82ms :: artifacts dl 2ms
	:: modules in use:
	com.google.protobuf#protobuf-java;4.26.1 from central in [default]
	com.mysql#mysql-connector-j;9.1.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------

Spark version: 3.5.4
java: /usr/bin/java


## 1b) Source system setup (MySQL + MongoDB)

Reads from:
- **MySQL** (SQL) for `src_customers` and `src_products`
- **MongoDB** (NoSQL) for `product_costs`
- **Local CSV files** for date and fact data


In [28]:
mysql_props = {
    "user": MYSQL["user"],
    "password": MYSQL["password"],
    "driver": "com.mysql.cj.jdbc.Driver",
}

customers_seed = [
    {"customer_id": "CUST-001", "first_name": "Ava", "last_name": "Nguyen", "email": "ava.nguyen@example.com", "city": "Austin", "state": "TX", "country": "USA", "phone": "512-555-1001"},
    {"customer_id": "CUST-002", "first_name": "Liam", "last_name": "Patel", "email": "liam.patel@example.com", "city": "Seattle", "state": "WA", "country": "USA", "phone": "206-555-1002"},
    {"customer_id": "CUST-003", "first_name": "Sophia", "last_name": "Kim", "email": "sophia.kim@example.com", "city": "New York", "state": "NY", "country": "USA", "phone": "212-555-1003"},
    {"customer_id": "CUST-004", "first_name": "Noah", "last_name": "Garcia", "email": "noah.garcia@example.com", "city": "Miami", "state": "FL", "country": "USA", "phone": "305-555-1004"},
    {"customer_id": "CUST-005", "first_name": "Mia", "last_name": "Johnson", "email": "mia.johnson@example.com", "city": "Chicago", "state": "IL", "country": "USA", "phone": "773-555-1005"},
]

products_seed = [
    {"product_id": "SKU-100", "product_name": "Wireless Mouse", "category": "Electronics", "sub_category": "Accessories", "list_price": 19.99, "supplier_code": "SUP-01"},
    {"product_id": "SKU-101", "product_name": "Mechanical Keyboard", "category": "Electronics", "sub_category": "Accessories", "list_price": 79.50, "supplier_code": "SUP-01"},
    {"product_id": "SKU-102", "product_name": "USB-C Charger", "category": "Electronics", "sub_category": "Power", "list_price": 24.00, "supplier_code": "SUP-02"},
    {"product_id": "SKU-200", "product_name": "Water Bottle 1L", "category": "Home & Kitchen", "sub_category": "Drinkware", "list_price": 12.75, "supplier_code": "SUP-03"},
    {"product_id": "SKU-300", "product_name": "Notebook Set (3)", "category": "Office", "sub_category": "Stationery", "list_price": 9.99, "supplier_code": "SUP-03"},
]

spark.createDataFrame(customers_seed).write.jdbc(
    url=JDBC_URL,
    table=MYSQL["customers_table"],
    mode="overwrite",
    properties=mysql_props,
)

spark.createDataFrame(products_seed).write.jdbc(
    url=JDBC_URL,
    table=MYSQL["products_table"],
    mode="overwrite",
    properties=mysql_props,
)

print("Seeded MySQL tables:", MYSQL["customers_table"], ",", MYSQL["products_table"])


                                                                                

Seeded MySQL tables: src_customers , src_products


In [29]:
product_costs_seed = [
    {"ProductID": "SKU-100", "UnitCost": 12.00},
    {"ProductID": "SKU-101", "UnitCost": 55.00},
    {"ProductID": "SKU-102", "UnitCost": 14.00},
    {"ProductID": "SKU-200", "UnitCost": 7.25},
    {"ProductID": "SKU-300", "UnitCost": 4.50},
]

if MONGO["uri"].startswith("mongodb+srv://"):
    mongo_client = pymongo.MongoClient(MONGO["uri"], tlsCAFile=certifi.where())
else:
    mongo_client = pymongo.MongoClient(MONGO["uri"])

db = mongo_client[MONGO["database"]]
db.drop_collection(MONGO["collection"])
db[MONGO["collection"]].insert_many(product_costs_seed)
mongo_client.close()

print("Seeded MongoDB:", MONGO["database"], ".", MONGO["collection"])


Seeded MongoDB: northwind_purchasing . product_costs


In [30]:
# Connectivity checks (prints counts so it's obvious which sources are being used)

# 1) SQL (MySQL) check
test_customers = (
    spark.read.format("jdbc")
    .option("url", JDBC_URL)
    .option("dbtable", MYSQL["customers_table"])
    .option("user", MYSQL["user"])
    .option("password", MYSQL["password"])
    .option("driver", "com.mysql.cj.jdbc.Driver")
    .load()
)
print("MySQL customers rows:", test_customers.count())

# 2) NoSQL (MongoDB Atlas/local) check
if MONGO["uri"].startswith("mongodb+srv://"):
    mongo_client = pymongo.MongoClient(MONGO["uri"], tlsCAFile=certifi.where())
else:
    mongo_client = pymongo.MongoClient(MONGO["uri"])

n_docs = mongo_client[MONGO["database"]][MONGO["collection"]].count_documents({})
mongo_client.close()
print("MongoDB product_costs docs:", n_docs)


MySQL customers rows: 5
MongoDB product_costs docs: 5


## 2) Build dimensions (Batch)

**Local-source mapping (REQUIRED: SQL + NoSQL + file system)**
- **DimDate**: file system (`data/dates.csv`)
- **DimCustomer**: **MySQL (JDBC)** (`src_customers`) — required
- **DimProduct**: **MySQL (JDBC)** (`src_products`) — required
- **DimCategory**: derived from products (**3rd additional dimension**)


In [31]:
# Read source data (SQL + NoSQL + file system)

# 1) File system (CSV): DimDate from file
df_dates_src = spark.read.option("header", True).csv(str(DATES_CSV))

# 2) SQL (MySQL): customers/products via Spark JDBC

def read_mysql_table(table: str):
    return (
        spark.read.format("jdbc")
        .option("url", JDBC_URL)
        .option("dbtable", table)
        .option("user", MYSQL["user"])
        .option("password", MYSQL["password"])
        .option("driver", "com.mysql.cj.jdbc.Driver")
        .load()
    )

if USE_MYSQL:
    try:
        df_customers_src = (
            read_mysql_table(MYSQL["customers_table"])
            .selectExpr(
                "customer_id as CustomerID",
                "first_name as FirstName",
                "last_name as LastName",
                "email as Email",
                "city as City",
                "state as State",
                "country as Country"
            )
        )

        df_products_src = (
            read_mysql_table(MYSQL["products_table"])
            .selectExpr(
                "product_id as ProductID",
                "product_name as ProductName",
                "category as Category",
                "sub_category as SubCategory",
                "CAST(list_price AS DOUBLE) as UnitPrice"
            )
        )

        print("Source for customers/products: MySQL (JDBC)")
    except Exception as e:
        print("MySQL read failed — MySQL is REQUIRED for this assignment.")
        print("Error:", str(e)[:400])
        raise
else:
    raise RuntimeError("USE_MYSQL must be True for this assignment.")

# Basic typing

dim_date = (
    df_dates_src
    .withColumn("DateKey", F.col("DateKey").cast("int"))
    .withColumn("Year", F.col("Year").cast("int"))
    .withColumn("Quarter", F.col("Quarter").cast("int"))
    .withColumn("Month", F.col("Month").cast("int"))
    .withColumn("Day", F.col("Day").cast("int"))
)

dim_customer = (
    df_customers_src
    .select(
        "CustomerID", "FirstName", "LastName", "Email", "City", "State", "Country"
    )
    .withColumn(
        "CustomerKey",
        F.row_number().over(Window.orderBy(F.col("CustomerID")))
    )
)

# Category dimension derived from products
dim_category = (
    df_products_src
    .select("Category")
    .dropna()
    .dropDuplicates(["Category"])
    .withColumn(
        "CategoryKey",
        F.row_number().over(Window.orderBy(F.col("Category")))
    )
)

# Product dimension (with CategoryKey FK)

dim_product = (
    df_products_src
    .select("ProductID", "ProductName", "Category", "SubCategory", "UnitPrice")
    .withColumn("UnitPrice", F.col("UnitPrice").cast("double"))
    .join(dim_category, on="Category", how="left")
    .withColumn(
        "ProductKey",
        F.row_number().over(Window.orderBy(F.col("ProductID")))
    )
    .select("ProductKey", "ProductID", "ProductName", "CategoryKey", "Category", "SubCategory", "UnitPrice")
)

print("Dim rows:")
print("- dim_date:", dim_date.count())
print("- dim_customer:", dim_customer.count())
print("- dim_category:", dim_category.count())
print("- dim_product:", dim_product.count())


Source for customers/products: MySQL (JDBC)
Dim rows:
- dim_date: 10
- dim_customer: 5
- dim_category: 3
- dim_product: 5


## 3) Batch load: Bronze → Silver → Gold

This satisfies the **batch execution** requirement and creates a baseline lakehouse state before streaming increments.


In [32]:
def write_parquet(df, path: Path, mode: str = "overwrite"):
    df.write.mode(mode).parquet(str(path))

def read_parquet(path: Path):
    return spark.read.parquet(str(path))

# Where each table lives
paths = {
    "bronze_dim_customer": BRONZE_DIR / "dim_customer",
    "bronze_dim_product":  BRONZE_DIR / "dim_product",
    "bronze_dim_category": BRONZE_DIR / "dim_category",
    "bronze_dim_date":     BRONZE_DIR / "dim_date",
    "bronze_sales_batch":  BRONZE_DIR / "sales_batch",
    "bronze_sales_stream": BRONZE_DIR / "sales_stream",

    "silver_fact_sales":   SILVER_DIR / "fact_sales",

    "gold_dim_customer":   GOLD_DIR / "dim_customer",
    "gold_dim_product":    GOLD_DIR / "dim_product",
    "gold_dim_category":   GOLD_DIR / "dim_category",
    "gold_dim_date":       GOLD_DIR / "dim_date",
    "gold_fact_sales":     GOLD_DIR / "fact_sales",
    "gold_fact_sales_margin": GOLD_DIR / "fact_sales_margin",
}

# 3.1 Bronze: write dims + batch sales
write_parquet(dim_customer, paths["bronze_dim_customer"], mode="overwrite")
write_parquet(dim_product,  paths["bronze_dim_product"],  mode="overwrite")
write_parquet(dim_category, paths["bronze_dim_category"], mode="overwrite")
write_parquet(dim_date,     paths["bronze_dim_date"],     mode="overwrite")

sales_batch = (
    spark.read.option("header", True).csv(str(SALES_CSV))
    .withColumn("DateKey", F.col("DateKey").cast("int"))
    .withColumn("Quantity", F.col("Quantity").cast("double"))
    .withColumn("UnitPrice", F.col("UnitPrice").cast("double"))
    .withColumn("TotalAmount", F.col("TotalAmount").cast("double"))
)
write_parquet(sales_batch, paths["bronze_sales_batch"], mode="overwrite")

print("Bronze written:")
for k in [
    "bronze_dim_customer","bronze_dim_product","bronze_dim_category","bronze_dim_date","bronze_sales_batch"
]:
    print(f"- {k}: {paths[k]}")


25/12/19 21:25:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 2

Bronze written:
- bronze_dim_customer: /Users/collintogher/Documents/ds-2002-proj-1/project2_lakehouse/bronze/dim_customer
- bronze_dim_product: /Users/collintogher/Documents/ds-2002-proj-1/project2_lakehouse/bronze/dim_product
- bronze_dim_category: /Users/collintogher/Documents/ds-2002-proj-1/project2_lakehouse/bronze/dim_category
- bronze_dim_date: /Users/collintogher/Documents/ds-2002-proj-1/project2_lakehouse/bronze/dim_date
- bronze_sales_batch: /Users/collintogher/Documents/ds-2002-proj-1/project2_lakehouse/bronze/sales_batch


### 3.1b) Incremental batch load

“Batch execution + incremental load” for PySpark


In [33]:
before_cnt = read_parquet(paths["bronze_sales_batch"]).count()

inc_rows = [
    {
        "DateKey": 20250904,
        "CustomerID": "CUST-002",
        "ProductID": "SKU-101",
        "OrderID": "ORD-99001",
        "Quantity": 1,
        "UnitPrice": 79.50,
        "TotalAmount": 79.50,
    },
    {
        "DateKey": 20250906,
        "CustomerID": "CUST-003",
        "ProductID": "SKU-200",
        "OrderID": "ORD-99002",
        "Quantity": 2,
        "UnitPrice": 12.75,
        "TotalAmount": 25.50,
    },
]

inc_pd = pd.DataFrame(inc_rows)
inc_path = BASE_DIR / "project2_incremental" / "sales_incremental.csv"
inc_path.parent.mkdir(parents=True, exist_ok=True)
inc_pd.to_csv(inc_path, index=False)

inc_df = (
    spark.read.option("header", True).csv(str(inc_path))
    .withColumn("DateKey", F.col("DateKey").cast("int"))
    .withColumn("Quantity", F.col("Quantity").cast("double"))
    .withColumn("UnitPrice", F.col("UnitPrice").cast("double"))
    .withColumn("TotalAmount", F.col("TotalAmount").cast("double"))
)

# Append to Bronze
inc_df.write.mode("append").parquet(str(paths["bronze_sales_batch"]))

after_cnt = read_parquet(paths["bronze_sales_batch"]).count()
print("Bronze sales_batch rows before:", before_cnt)
print("Bronze sales_batch rows after: ", after_cnt)
print("Incremental batch file:", inc_path)


Bronze sales_batch rows before: 10
Bronze sales_batch rows after:  12
Incremental batch file: /Users/collintogher/Documents/ds-2002-proj-1/project2_incremental/sales_incremental.csv


In [34]:
# 3.2 Silver: conform fact to dims + compute margin

# Read dims from Bronze so downstream stages are explicitly layered
b_dim_customer = read_parquet(paths["bronze_dim_customer"]).select("CustomerKey","CustomerID")
b_dim_product  = read_parquet(paths["bronze_dim_product"]).select("ProductKey","ProductID","CategoryKey")
b_dim_date     = read_parquet(paths["bronze_dim_date"]).select("DateKey","Date")

# Costs (NoSQL): MongoDB Atlas/local via pymongo 

def load_costs_from_mongo() -> list[dict]:
    if MONGO["uri"].startswith("mongodb+srv://"):
        client = pymongo.MongoClient(MONGO["uri"], tlsCAFile=certifi.where())
    else:
        client = pymongo.MongoClient(MONGO["uri"])

    docs = list(client[MONGO["database"]][MONGO["collection"]].find({}))
    client.close()

    # Drop Mongo's _id field for Spark
    for d in docs:
        d.pop("_id", None)
    return docs

costs_payload = load_costs_from_mongo()
print("Source for product_costs: MongoDB")

costs_pd = pd.DataFrame(costs_payload)
dim_costs = spark.createDataFrame(costs_pd).withColumn("UnitCost", F.col("UnitCost").cast("double"))

# Fact source for batch baseline
b_sales = read_parquet(paths["bronze_sales_batch"]).select(
    "DateKey","CustomerID","ProductID","OrderID","Quantity","UnitPrice","TotalAmount"
)

fact_silver = (
    b_sales
    .join(b_dim_customer, on="CustomerID", how="left")
    .join(b_dim_product,  on="ProductID",  how="left")
    .withColumn(
        "SalesKey",
        F.row_number().over(Window.orderBy(F.col("OrderID"), F.col("CustomerID"), F.col("ProductID")))
    )
    .select(
        "SalesKey",
        "DateKey",
        "CustomerKey",
        "ProductKey",
        "OrderID",
        "Quantity",
        "UnitPrice",
        "TotalAmount"
    )
)

# Enrich with margin
fact_silver_margin = (
    fact_silver
    .join(b_dim_product.select("ProductKey","ProductID"), on="ProductKey", how="left")
    .join(dim_costs, on="ProductID", how="left")
    .withColumn("UnitCost", F.coalesce(F.col("UnitCost"), F.lit(0.0)))
    .withColumn("GrossMarginAmount", F.col("TotalAmount") - (F.col("UnitCost") * F.col("Quantity")))
    .select("SalesKey","GrossMarginAmount")
)

write_parquet(fact_silver, paths["silver_fact_sales"], mode="overwrite")
write_parquet(fact_silver_margin, paths["gold_fact_sales_margin"], mode="overwrite")

print("Silver fact rows:", fact_silver.count())
print("Gold margin rows:", fact_silver_margin.count())


Source for product_costs: MongoDB


25/12/19 21:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 2

Silver fact rows: 12
Gold margin rows: 12


In [35]:
# 3.3 Gold: publish star schema tables

write_parquet(read_parquet(paths["bronze_dim_customer"]), paths["gold_dim_customer"], mode="overwrite")
write_parquet(read_parquet(paths["bronze_dim_product"]),  paths["gold_dim_product"],  mode="overwrite")
write_parquet(read_parquet(paths["bronze_dim_category"]), paths["gold_dim_category"], mode="overwrite")
write_parquet(read_parquet(paths["bronze_dim_date"]),     paths["gold_dim_date"],     mode="overwrite")

# Gold fact = silver fact (serving layer)
write_parquet(read_parquet(paths["silver_fact_sales"]), paths["gold_fact_sales"], mode="overwrite")

print("Gold tables written.")


Gold tables written.


## 4) Streaming requirement: 3 mini-batches (local file stream)

**Requirement mapping**
- Mimics Databricks AutoLoader using **Structured Streaming** reading from a local folder.
- Uses **3 intervals** by dropping 3 JSON files into `project2_stream/incoming/`.
- Uses **checkpointing** so each run only processes new files.

Implementation approach (simple + deterministic):
- Split `data/sales.csv` into 3 parts.
- For each part: write a JSON file into `incoming/`, run a streaming query with `trigger(once=True)`, and stop.
- Checkpoint state guarantees exactly-once file processing across the 3 intervals.


In [36]:
# 4.1 Create 3 streaming JSON drops from the same Project-1 sales.csv

sales_pd = pd.read_csv(SALES_CSV)

# Split into 3 approximately-equal parts (deterministic order)
sales_pd = sales_pd.sort_values(["DateKey","OrderID","CustomerID","ProductID"]).reset_index(drop=True)
parts = [
    sales_pd.iloc[0:4].copy(),
    sales_pd.iloc[4:8].copy(),
    sales_pd.iloc[8:].copy(),
]

def write_drop(df_part: pd.DataFrame, drop_idx: int) -> Path:
    out_path = INCOMING_DIR / f"sales_drop_{drop_idx:02d}.json"
    records = df_part.to_dict(orient="records")

    with open(out_path, "w") as f:
        for rec in records:
            f.write(json.dumps(rec) + "\n")

    return out_path

[len(p) for p in parts]


[4, 4, 2]

In [37]:
# 4.2 Streaming ingestion (Bronze): read JSON drops from incoming/ and append to bronze/sales_stream
# This is the local equivalent of Databricks AutoLoader ingestion.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

sales_schema = StructType([
    StructField("DateKey", IntegerType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("OrderID", StringType(), True),
    StructField("Quantity", DoubleType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("TotalAmount", DoubleType(), True),
])

bronze_sales_stream_path = paths["bronze_sales_stream"]

def run_stream_once() -> None:
    stream_df = (
        spark.readStream
        .schema(sales_schema)
        # Tolerate either NDJSON (preferred) or multi-line JSON arrays
        .option("multiLine", "true")
        .json(str(INCOMING_DIR))
        .withColumn("ingest_ts", F.current_timestamp())
    )

    q = (
        stream_df.writeStream
        .format("parquet")
        .outputMode("append")
        .option("checkpointLocation", str(CHECKPOINT_DIR))
        .trigger(once=True)
        .start(str(bronze_sales_stream_path))
    )
    q.awaitTermination()

# Interval 1
p1 = write_drop(parts[0], 1)
run_stream_once()
print("After interval 1, bronze/sales_stream rows:", read_parquet(bronze_sales_stream_path).count())

# Interval 2
p2 = write_drop(parts[1], 2)
run_stream_once()
print("After interval 2, bronze/sales_stream rows:", read_parquet(bronze_sales_stream_path).count())

# Interval 3
p3 = write_drop(parts[2], 3)
run_stream_once()
print("After interval 3, bronze/sales_stream rows:", read_parquet(bronze_sales_stream_path).count())

print("Drops written:", p1.name, p2.name, p3.name)


25/12/19 21:25:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


After interval 1, bronze/sales_stream rows: 3
After interval 2, bronze/sales_stream rows: 3


25/12/19 21:25:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/12/19 21:25:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


After interval 3, bronze/sales_stream rows: 3
Drops written: sales_drop_01.json sales_drop_02.json sales_drop_03.json


In [38]:
# Sanity check: if this is 0, downstream Silver/Gold/queries will be empty.

bronze_cnt = read_parquet(paths["bronze_sales_stream"]).count() if paths["bronze_sales_stream"].exists() else 0
print("Bronze stream row count:", bronze_cnt)

if bronze_cnt > 0:
    read_parquet(paths["bronze_sales_stream"]).orderBy("ingest_ts").show(20, truncate=False)


Bronze stream row count: 3
+--------+----------+---------+---------+--------+---------+-----------+-----------------------+
|DateKey |CustomerID|ProductID|OrderID  |Quantity|UnitPrice|TotalAmount|ingest_ts              |
+--------+----------+---------+---------+--------+---------+-----------+-----------------------+
|20250901|CUST-001  |SKU-100  |ORD-90001|2.0     |19.99    |39.98      |2025-12-19 14:19:25.569|
|20250905|CUST-005  |SKU-300  |ORD-90005|4.0     |9.99     |39.96      |2025-12-19 14:19:26.089|
|20250910|CUST-004  |SKU-300  |ORD-90009|2.0     |9.99     |19.98      |2025-12-19 14:19:26.451|
+--------+----------+---------+---------+--------+---------+-----------+-----------------------+



## 5) Silver: join streaming facts to static dimensions (required)

This section satisfies the Project 2 requirement to **illustrate relationships** between near real-time fact data and static reference data by joining at the Silver layer.


In [39]:
# Build a conformed fact table from the streaming bronze output

stream_bronze = read_parquet(paths["bronze_sales_stream"]).select(
    "DateKey","CustomerID","ProductID","OrderID","Quantity","UnitPrice","TotalAmount","ingest_ts"
)

# Join to static dims (from Bronze) to get surrogate keys
conformed_stream = (
    stream_bronze
    .join(b_dim_customer, on="CustomerID", how="left")
    .join(b_dim_product,  on="ProductID",  how="left")
)

# Create a deterministic SalesKey for this streaming dataset
conformed_stream = conformed_stream.withColumn(
    "SalesKey",
    F.row_number().over(Window.orderBy(F.col("ingest_ts"), F.col("OrderID"), F.col("CustomerID"), F.col("ProductID")))
)

silver_stream_fact = conformed_stream.select(
    "SalesKey",
    "DateKey",
    "CustomerKey",
    "ProductKey",
    "OrderID",
    "Quantity",
    "UnitPrice",
    "TotalAmount",
    "ingest_ts"
)

# Overwrite Silver fact with the full conformed stream result (simple replay semantics)
write_parquet(silver_stream_fact, paths["silver_fact_sales"], mode="overwrite")

print("Silver (from stream) fact rows:", silver_stream_fact.count())


25/12/19 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/19 2

Silver (from stream) fact rows: 3


### 5b) Publish Gold fact from streaming + recompute margin


In [40]:
# Overwrite Gold fact with the conformed streaming fact
write_parquet(read_parquet(paths["silver_fact_sales"]), paths["gold_fact_sales"], mode="overwrite")

# Recompute margin aligned to streaming SalesKey
# Join ProductKey -> ProductID -> UnitCost, then compute GrossMarginAmount.

gold_fact = read_parquet(paths["gold_fact_sales"]).select("SalesKey","ProductKey","Quantity","TotalAmount")
prod_key_to_id = read_parquet(paths["gold_dim_product"]).select("ProductKey","ProductID")

fact_for_margin = gold_fact.join(prod_key_to_id, on="ProductKey", how="left")

fact_margin_stream = (
    fact_for_margin
    .join(dim_costs, on="ProductID", how="left")
    .withColumn("UnitCost", F.coalesce(F.col("UnitCost"), F.lit(0.0)))
    .withColumn("GrossMarginAmount", F.col("TotalAmount") - (F.col("UnitCost") * F.col("Quantity")))
    .select("SalesKey","GrossMarginAmount")
)

write_parquet(fact_margin_stream, paths["gold_fact_sales_margin"], mode="overwrite")

print("Gold fact rows:", read_parquet(paths["gold_fact_sales"]).count())
print("Gold margin rows:", read_parquet(paths["gold_fact_sales_margin"]).count())


Gold fact rows: 3
Gold margin rows: 3


## 6) Source from MySQL and/or MongoDB


In [41]:
def ensure_gold_views() -> None:
    view_to_gold_path_key = {
        "dim_date": "gold_dim_date",
        "dim_customer": "gold_dim_customer",
        "dim_product": "gold_dim_product",
        "dim_category": "gold_dim_category",
        "fact_sales": "gold_fact_sales",
        "fact_sales_margin": "gold_fact_sales_margin",
    }

    for view_name, path_key in view_to_gold_path_key.items():
        if not spark.catalog.tableExists(view_name):
            read_parquet(paths[path_key]).createOrReplaceTempView(view_name)

ensure_gold_views()

checks = {
    "dim_date": spark.table("dim_date").count(),
    "dim_product": spark.table("dim_product").count(),
    "dim_category": spark.table("dim_category").count(),
    "fact_sales": spark.table("fact_sales").count(),
    "fact_sales_margin": spark.table("fact_sales_margin").count(),
}
print(checks)

# How many fact rows survive each join?
print("fact + product:", spark.sql("""
SELECT COUNT(*) AS cnt
FROM fact_sales f
JOIN dim_product p ON f.ProductKey = p.ProductKey
""").collect()[0][0])

print("fact + product + category:", spark.sql("""
SELECT COUNT(*) AS cnt
FROM fact_sales f
JOIN dim_product p  ON f.ProductKey = p.ProductKey
JOIN dim_category c ON p.CategoryKey = c.CategoryKey
""").collect()[0][0])

print("fact + margin:", spark.sql("""
SELECT COUNT(*) AS cnt
FROM fact_sales f
JOIN fact_sales_margin m ON f.SalesKey = m.SalesKey
""").collect()[0][0])

print("fact + product + category + margin:", spark.sql("""
SELECT COUNT(*) AS cnt
FROM fact_sales f
JOIN dim_product p        ON f.ProductKey = p.ProductKey
JOIN dim_category c       ON p.CategoryKey = c.CategoryKey
JOIN fact_sales_margin m  ON f.SalesKey = m.SalesKey
""").collect()[0][0])


{'dim_date': 10, 'dim_product': 5, 'dim_category': 3, 'fact_sales': 3, 'fact_sales_margin': 3}
fact + product: 3
fact + product + category: 3
fact + margin: 3
fact + product + category + margin: 3


## 7) Gold analytics + queries (business value)

All queries below read from the **Gold** star schema tables and demonstrate aggregation across **Fact + 2+ dimensions**.


In [42]:
# Register Gold tables as temp views for Spark SQL queries

g_dim_date = read_parquet(paths["gold_dim_date"])
g_dim_customer = read_parquet(paths["gold_dim_customer"])
g_dim_product = read_parquet(paths["gold_dim_product"])
g_dim_category = read_parquet(paths["gold_dim_category"])

g_fact_sales = read_parquet(paths["gold_fact_sales"])  # from Silver (stream)
g_fact_margin = read_parquet(paths["gold_fact_sales_margin"])  # computed earlier


# Publish views
for name, df in {
    "dim_date": g_dim_date,
    "dim_customer": g_dim_customer,
    "dim_product": g_dim_product,
    "dim_category": g_dim_category,
    "fact_sales": g_fact_sales,
    "fact_sales_margin": g_fact_margin,
}.items():
    df.createOrReplaceTempView(name)

print("Views created:", "dim_date, dim_customer, dim_product, dim_category, fact_sales, fact_sales_margin")


Views created: dim_date, dim_customer, dim_product, dim_category, fact_sales, fact_sales_margin


In [43]:
# Query 1: Daily revenue + gross margin (Fact + DimDate + Margin)

q1 = """
SELECT
  d.Date,
  ROUND(SUM(f.TotalAmount), 2) AS Revenue,
  ROUND(SUM(m.GrossMarginAmount), 2) AS GrossMargin
FROM fact_sales f
JOIN dim_date d ON f.DateKey = d.DateKey
JOIN fact_sales_margin m ON f.SalesKey = m.SalesKey
GROUP BY d.Date
ORDER BY d.Date
"""

spark.sql(q1).show(truncate=False)


+----------+-------+-----------+
|Date      |Revenue|GrossMargin|
+----------+-------+-----------+
|2025-09-01|39.98  |15.98      |
|2025-09-05|39.96  |21.96      |
|2025-09-10|19.98  |10.98      |
+----------+-------+-----------+



In [44]:
# Query 2: Revenue + margin by Date + Category (Fact + DimDate + DimProduct + DimCategory)

q2 = """
SELECT
  d.Date,
  c.Category,
  ROUND(SUM(f.TotalAmount), 2) AS Revenue,
  ROUND(SUM(m.GrossMarginAmount), 2) AS GrossMargin
FROM fact_sales f
JOIN dim_date d      ON f.DateKey = d.DateKey
JOIN dim_product p   ON f.ProductKey = p.ProductKey
JOIN dim_category c  ON p.CategoryKey = c.CategoryKey
JOIN fact_sales_margin m ON f.SalesKey = m.SalesKey
GROUP BY d.Date, c.Category
ORDER BY d.Date, c.Category
"""

spark.sql(q2).show(truncate=False)


+----------+-----------+-------+-----------+
|Date      |Category   |Revenue|GrossMargin|
+----------+-----------+-------+-----------+
|2025-09-01|Electronics|39.98  |15.98      |
|2025-09-05|Office     |39.96  |21.96      |
|2025-09-10|Office     |19.98  |10.98      |
+----------+-----------+-------+-----------+



In [45]:
# Query 3: Top customers by gross margin
q3 = """
SELECT
  c.CustomerID,
  CONCAT(c.FirstName, ' ', c.LastName) AS CustomerName,
  ROUND(SUM(f.TotalAmount), 2) AS Revenue,
  ROUND(SUM(m.GrossMarginAmount), 2) AS GrossMargin,
  COUNT(DISTINCT f.OrderID) AS OrderCount
FROM fact_sales f
JOIN dim_customer c       ON f.CustomerKey = c.CustomerKey
JOIN fact_sales_margin m  ON f.SalesKey = m.SalesKey
GROUP BY c.CustomerID, CustomerName
ORDER BY GrossMargin DESC, Revenue DESC
LIMIT 10
"""

print("Query 3 — Top customers by margin")
spark.sql(q3).show(truncate=False)

Query 3 — Top customers by margin
+----------+------------+-------+-----------+----------+
|CustomerID|CustomerName|Revenue|GrossMargin|OrderCount|
+----------+------------+-------+-----------+----------+
|CUST-005  |Mia Johnson |39.96  |21.96      |1         |
|CUST-001  |Ava Nguyen  |39.98  |15.98      |1         |
|CUST-004  |Noah Garcia |19.98  |10.98      |1         |
+----------+------------+-------+-----------+----------+



In [46]:
# Query 4: Product leaderboard (units, revenue, margin)
q4 = """
SELECT
  p.ProductID,
  p.ProductName,
  c.Category,
  ROUND(SUM(f.Quantity), 2) AS UnitsSold,
  ROUND(SUM(f.TotalAmount), 2) AS Revenue,
  ROUND(SUM(m.GrossMarginAmount), 2) AS GrossMargin
FROM fact_sales f
JOIN dim_product p        ON f.ProductKey = p.ProductKey
JOIN dim_category c       ON p.CategoryKey = c.CategoryKey
JOIN fact_sales_margin m  ON f.SalesKey = m.SalesKey
GROUP BY p.ProductID, p.ProductName, c.Category
ORDER BY GrossMargin DESC, Revenue DESC, UnitsSold DESC
"""

print("Query 4 — Product leaderboard")
spark.sql(q4).show(truncate=False)

Query 4 — Product leaderboard
+---------+----------------+-----------+---------+-------+-----------+
|ProductID|ProductName     |Category   |UnitsSold|Revenue|GrossMargin|
+---------+----------------+-----------+---------+-------+-----------+
|SKU-300  |Notebook Set (3)|Office     |6.0      |59.94  |32.94      |
|SKU-100  |Wireless Mouse  |Electronics|2.0      |39.98  |15.98      |
+---------+----------------+-----------+---------+-------+-----------+



In [47]:
# Query 5: Average order value (AOV) + average order margin (AOM) by weekday
q5 = """
SELECT
  d.DayOfWeek,
  ROUND(SUM(f.TotalAmount) / COUNT(DISTINCT f.OrderID), 2) AS AvgOrderValue,
  ROUND(SUM(m.GrossMarginAmount) / COUNT(DISTINCT f.OrderID), 2) AS AvgOrderMargin,
  COUNT(DISTINCT f.OrderID) AS Orders
FROM fact_sales f
JOIN dim_date d          ON f.DateKey = d.DateKey
JOIN fact_sales_margin m ON f.SalesKey = m.SalesKey
GROUP BY d.DayOfWeek
ORDER BY Orders DESC, AvgOrderMargin DESC
"""

print("Query 5 — AOV/AOM by weekday")
spark.sql(q5).show(truncate=False)

Query 5 — AOV/AOM by weekday
+---------+-------------+--------------+------+
|DayOfWeek|AvgOrderValue|AvgOrderMargin|Orders|
+---------+-------------+--------------+------+
|Friday   |39.96        |21.96         |1     |
|Monday   |39.98        |15.98         |1     |
|Wednesday|19.98        |10.98         |1     |
+---------+-------------+--------------+------+



In [48]:
# Query 6: Monthly revenue + margin rollup
q6 = """
SELECT
  d.Year,
  d.Month,
  ROUND(SUM(f.TotalAmount), 2) AS Revenue,
  ROUND(SUM(m.GrossMarginAmount), 2) AS GrossMargin,
  COUNT(DISTINCT f.OrderID) AS Orders
FROM fact_sales f
JOIN dim_date d          ON f.DateKey = d.DateKey
JOIN fact_sales_margin m ON f.SalesKey = m.SalesKey
GROUP BY d.Year, d.Month
ORDER BY d.Year, d.Month
"""

print("Query 6 — Monthly rollup")
spark.sql(q6).show(truncate=False)

Query 6 — Monthly rollup
+----+-----+-------+-----------+------+
|Year|Month|Revenue|GrossMargin|Orders|
+----+-----+-------+-----------+------+
|2025|9    |99.92  |48.92      |3     |
+----+-----+-------+-----------+------+

