# Spark Data Processing (Optional Scaling Layer)

This notebook demonstrates an optional Spark-based ETL layer for the SunnyBest pipeline.
The goal is to show how the project can scale beyond pandas when data volumes grow.

> **Note on Spark:**  
> This project does not strictly require Spark at its current scale. I included Spark as an optional processing layer to demonstrate how the pipeline could evolve in production as data volumes grow. The core modelling remains in pandas to support faster iteration during development.


## What this notebook covers

- Create a local Spark session (or validate Spark availability)
- Load SunnyBest raw CSVs using Spark
- Run basic aggregations (revenue trends, category performance)
- Write a feature-ready aggregated table to `data/processed/`
- Outline how Spark output can feed a warehouse (e.g., Snowflake)


In [1]:
import os
import pandas as pd


In [12]:
#Spark availability check
def spark_available():
    try:
        import pyspark  # noqa: F401
        return True
    except Exception:
        return False

spark_available()


False

## Start a Spark session

If `pyspark` is not installed locally, this notebook still documents the intended Spark workflow.


In [13]:
# create spark session(runs only if available)
if spark_available():
    from pyspark.sql import SparkSession

    spark = (
        SparkSession.builder
        .appName("SunnyBest-Spark-ETL")
        .getOrCreate()
    )
    spark
else:
    print("pyspark is not installed in this environment. Spark steps will be documented but not executed.")


pyspark is not installed in this environment. Spark steps will be documented but not executed.


## Load raw SunnyBest datasets

We load the same raw CSVs used in pandas notebooks, but through Spark I/O.


In [14]:
# load csvs
RAW_DIR = "../data/raw"

if spark_available():
    sales_spark = spark.read.csv(f"{RAW_DIR}/sunnybest_sales.csv", header=True, inferSchema=True)
    products_spark = spark.read.csv(f"{RAW_DIR}/sunnybest_products.csv", header=True, inferSchema=True)
    stores_spark = spark.read.csv(f"{RAW_DIR}/sunnybest_stores.csv", header=True, inferSchema=True)
    calendar_spark = spark.read.csv(f"{RAW_DIR}/sunnybest_calendar.csv", header=True, inferSchema=True)

    print("Sales rows:", sales_spark.count())
    sales_spark.printSchema()
else:
    print("Skipping Spark CSV load (pyspark not available).")


Skipping Spark CSV load (pyspark not available).


## Join datasets (Spark version of the pandas merge)

We join sales with product, store and calendar attributes to build an enriched table suitable for aggregation.


In [6]:
if spark_available():
    from pyspark.sql.functions import col

    df_spark = (
        sales_spark
        .join(products_spark, on="product_id", how="left")
        .join(stores_spark, on="store_id", how="left")
        .join(calendar_spark, on="date", how="left")
    )

    print("Joined rows:", df_spark.count())
    df_spark.select("date", "store_id", "product_id", "category", "revenue").show(5)
else:
    print("Skipping Spark joins (pyspark not available).")


Skipping Spark joins (pyspark not available).


## Aggregations for reporting

Spark is often used upstream to generate:
- daily revenue by store
- daily revenue by category
- promo-period performance tables
These aggregated outputs can feed dashboards and warehouses.


In [15]:
# Revenue by category (Spark)
if spark_available():
    from pyspark.sql.functions import sum as spark_sum

    revenue_by_category = (
        df_spark
        .groupBy("category")
        .agg(spark_sum(col("revenue")).alias("total_revenue"))
        .orderBy(col("total_revenue").desc())
    )

    revenue_by_category.show(20, truncate=False)
else:
    print("Skipping Spark aggregation (pyspark not available).")


Skipping Spark aggregation (pyspark not available).


In [8]:

if spark_available():
    from pyspark.sql.functions import to_date

    daily_revenue = (
        df_spark
        .withColumn("date", to_date(col("date")))
        .groupBy("date")
        .agg(spark_sum(col("revenue")).alias("daily_revenue"))
        .orderBy("date")
    )

    daily_revenue.show(10)
else:
    print("Skipping Spark daily aggregation (pyspark not available).")


Skipping Spark daily aggregation (pyspark not available).


## Write aggregated tables to data/processed/

In a production pipeline, Spark writes curated tables to object storage or a warehouse.
Here we write to `data/processed/` to keep the project self-contained.


In [10]:
# write to processed(safe)
PROCESSED_DIR = "../data/processed"
os.makedirs(PROCESSED_DIR, exist_ok=True)

if spark_available():
    # Convert to pandas for simple CSV write in this project
    revenue_cat_pd = revenue_by_category.toPandas()
    revenue_cat_pd.to_csv(f"{PROCESSED_DIR}/spark_revenue_by_category.csv", index=False)

    print("Saved:", f"{PROCESSED_DIR}/spark_revenue_by_category.csv")
else:
    print("Skipping write (pyspark not available).")


Skipping write (pyspark not available).


## How Spark would connect to a warehouse (Snowflake)

In a production environment, Spark outputs would be loaded into a warehouse such as Snowflake:

1. **Raw tables** land in object storage (S3)
2. Spark performs joins + feature engineering
3. Curated outputs are written as **staging tables**
4. Snowflake transforms staging tables into **analytics marts**
5. Models and dashboards query marts for consistent business logic

This project includes example SQL files under:
`src/warehouse/`
