
### ðŸ““Notebook Summary

This notebook implements a **robust ETL pipeline** for train and test datasets, including cleaning, deduplication, and silver-layer table creation. 

It enriches the data by joining **stores, holidays, and oil prices**, adds temporal and salary-day features, and encodes categorical columns using **FeatureHasher and StringIndexer**. 

_The_ workflow ensures **consistency, scalability, and Spark Connectâ€“friendly operations**, avoiding large model serialization issues while keeping the data ready for ML modeling.


In [0]:
import yaml
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, IntegerType, StringType, 
    DateType, BooleanType, DoubleType
)

In [0]:
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def load_config(config_path="../config.yaml"):
    """Load configuration from YAML file"""
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)

# Load config
config = load_config("../config.yaml")
config

In [0]:
CATALOG_NAME = config['databricks']['catalog']
SCHEMA_NAME = config['databricks']['schema']

VOLUME_ROOT_PATH = config['databricks']['volume_path'] #"/Volumes/cscie103_catalog/final_project/data"
# place where raw csvs land after download
VOLUME_TARGET_DIR = config['databricks']['volume_path'] # f"{VOLUME_ROOT_PATH}/raw"

spark.sql(f"USE {CATALOG_NAME}.{SCHEMA_NAME}")

class DataframeNames:
    HOLIDAYS = "holidays"
    OIL = "oil"
    STORES = "stores"
    TEST = "test"
    TRAIN = "train"
    TRANSACTIONS = "transactions"
    TRAINING = "training"
    TESTING = "testing"

    ALL = [ HOLIDAYS, OIL, STORES, TEST, TRAIN, TRANSACTIONS, TRAINING ]

class DataTier:
    BRONZE = "bronze"
    SILVER = "silver"
    GOLD = "gold"

    def getBronzeName(tablename):
        return DataTier.BRONZE + "_" + tablename

    def getSilverName(tablename):
        return DataTier.SILVER + "_" + tablename
    
    def getGoldName(tablename):
        return DataTier.GOLD + "_" + tablename

In [0]:
def extractTransformLoad(bronze_tablename, silver_tablename, transform, create_table):
    """
    :param: bronze_tablename - bronze UC table name e.g. bronze_tablename
    :param: silver_tablename - silver UC table name e.g. silver_tablename
    :param: checkpoint_path - volume path to checkpoint e.g. /Volumes/...
    :param: transform - transformation function to apply to bronze table, should accept readStream
    :param: create_table - spark sql call to create table

    :return: streaming query
    """

    print("Reading from bronze table: " + bronze_tablename)
    read_stream_df = spark.read.format("delta").table(bronze_tablename)
    print(f"Read {read_stream_df.count()} records from bronze table.")

    print("Applying transformation(s)...")
    transformed_df = transform(read_stream_df)
    print(f"After transformation, there are {transformed_df.count()} records.")

    print(f"Creating {silver_tablename} table.")
    create_table()

    print("Writing to silver table: " + silver_tablename + "...")
    transformed_df.write.mode("overwrite").format("delta").saveAsTable(silver_tablename)

    print(f"Silver table {silver_tablename} is written. Schema: ")
    transformed_df.printSchema()

    return transformed_df

In [0]:



def extractTransformLoad(bronze_tablename, silver_tablename, transform, create_table):
    """
    Batch ETL from bronze to silver Delta tables
    """

    print(f"Reading from bronze table: {bronze_tablename}")
    df = spark.read.format("delta").table(bronze_tablename)

    print("Applying transformation(s)...")
    transformed_df = transform(df)

    print(f"Creating silver table if needed: {silver_tablename}")
    create_table()

    print(f"Writing to silver table: {silver_tablename}")
    (
        transformed_df
        .write
        .mode("overwrite")
        .format("delta")
        .saveAsTable(silver_tablename)
    )

    print(f"Silver table {silver_tablename} written successfully.")
    transformed_df.printSchema()

    return transformed_df


In [0]:
from pyspark.sql import functions as F

def extractTransformLoad(bronze_tablename, silver_tablename, transform, create_table):
    """
    Simple ETL: read bronze table, apply transformation, write silver table,
    with basic metrics reporting.
    """

    print(f"Reading from bronze table: {bronze_tablename}")
    df = spark.read.format("delta").table(bronze_tablename)

    # Simple pre-transform metrics
    print(f"Pre-transform row count: {df.count()}")
    if "sales" in df.columns:
        total_sales = df.agg(F.sum("sales")).collect()[0][0]
        print(f"Pre-transform total sales: {total_sales}")

    print("Applying transformation(s)...")
    df_transformed = transform(df)

    # Simple post-transform metrics
    print(f"Post-transform row count: {df_transformed.count()}")
    if "sales" in df_transformed.columns:
        total_sales_after = df_transformed.agg(F.sum("sales")).collect()[0][0]
        print(f"Post-transform total sales: {total_sales_after}")

    print(f"Creating {silver_tablename} table.")
    create_table()

    print(f"Writing to silver table: {silver_tablename}...")
    df_transformed.write.mode("overwrite").format("delta").saveAsTable(silver_tablename)

    print(f"Silver table {silver_tablename} is written. Schema:")
    df_transformed.printSchema()

    return df_transformed


## Building Out Silver Data

### Stores

In [0]:
from pyspark.sql import functions as F

# Stores Dataframe
bronze_tablename_stores = DataTier.getBronzeName(DataframeNames.STORES)
silver_tablename_stores = DataTier.getSilverName(DataframeNames.STORES)


# Transformation logic for stores
def transform(df):
    return (
        df.select("store_nbr", "city", "state", "type", "cluster")
          .filter(F.col("store_nbr").isNotNull())
          .dropDuplicates(["store_nbr"])
    )

# Create table safely (idempotent)
def create_table():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {silver_tablename_stores} (
            store_nbr INTEGER NOT NULL,
            city STRING,
            state STRING,
            type STRING,
            cluster INTEGER,
            CONSTRAINT pk_silver_store_nbr PRIMARY KEY (store_nbr)
        )
        USING DELTA
        COMMENT 'Cleaned store dimension table'
    """)

# ETL execution
stores_streaming_query = extractTransformLoad(
    bronze_tablename_stores,
    silver_tablename_stores,
    transform,
    create_table
)

print("Stores silver table written.")

### Holidays

In [0]:
# Holidays Dataframe
bronze_tablename_holidays = DataTier.getBronzeName(DataframeNames.HOLIDAYS)
silver_tablename_holidays = DataTier.getSilverName(DataframeNames.HOLIDAYS)

# Preload reference data OUTSIDE the transform

# All available states
bronze_stores = spark.read.table(
    DataTier.getBronzeName(DataframeNames.STORES)
).select("state").distinct()

# Get the continous range of dates 
train_dates_df = spark.read.table(
    DataTier.getBronzeName(DataframeNames.HOLIDAYS)
).select("date")

date_bounds = train_dates_df.agg(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).collect()[0]

min_date = date_bounds["min_date"]
max_date = date_bounds["max_date"]

print(f"Date bounds: {min_date} - {max_date}")

date_spine = (
    spark
    .range(1)
    .select(
        F.explode(
            F.sequence(F.lit(min_date), F.lit(max_date))
        ).alias("date")
    )
)


In [0]:
df = spark.read.format("delta").table(bronze_tablename_holidays)
display(df)

In [0]:
def transform(holidays_events_df):
    """
    Holiday calendar preparation steps:

    1. Remove transferred holidays
       - Drop rows where the holiday was transferred to another date
       - Identified by column `transferred = 'true'`

    2. Expand holidays to state level
       a. Nationwide holidays
          - Rows where `locale_name = 'Ecuador'`
          - Expanded to one row per state using cross join with `bronze_stores`
       b. State-specific holidays
          - Rows where `locale_name != 'Ecuador'`
          - `locale_name` is treated as the state

    3. Construct holiday flag
       - Output schema after expansion:
         (date, state, is_holiday)
       - `is_holiday` is set to 1 for all holidays

    4. Deduplicate holiday records
       - Ensure a single row per (date, state)
       - Handles overlaps between nationwide and regional holidays

    5. Build full calendar spine
       - Cross join all continuous dates (`bronze_train_dates`)
         with all states (`bronze_stores`)
       - Guarantees complete (date, state) coverage

    6. Mark non-holidays
       - Left join expanded holidays onto the full calendar
       - Missing matches are filled with `is_holiday = 0`
    """


    # 1. Remove transferred holidays
    holidays_df = holidays_events_df.filter(F.col("transferred") != "true")

    # 2a. Nationwide holidays -> explode to all states
    nationwide = (
        holidays_df
        .filter(F.col("locale_name") == "Ecuador")
        .crossJoin(bronze_stores)
        .select(
            "date",
            F.col("state"),
            F.lit(1).alias("is_holiday")
        )
    )

    # 2b. State-specific holidays
    regional = (
        holidays_df
        .filter(F.col("locale_name") != "Ecuador")
        .select(
            "date",
            F.col("locale_name").alias("state"),
            F.lit(1).alias("is_holiday")
        )
    )

    # 3 & 4. Combine and deduplicate holidays
    holidays_expanded = (
        nationwide
        .unionByName(regional)
        .dropDuplicates(["date", "state"])
    )

    # 5 & 6. Build full calendar and mark non-holidays
    holidays_calendar = (
        date_spine
        .crossJoin(bronze_stores)
        .join(
            holidays_expanded,
            on=["date", "state"],
            how="left"
        )
        .fillna(0, subset=["is_holiday"])
    )

    return holidays_calendar


def create_table():
    spark.sql(f"DROP TABLE IF EXISTS {silver_tablename_holidays}")
    spark.sql(f"""
        CREATE OR REPLACE TABLE {silver_tablename_holidays} (
            date DATE NOT NULL,
            state STRING NOT NULL,
            is_holiday INTEGER NOT NULL,
            
            CONSTRAINT pk_silver_holiday 
            PRIMARY KEY (date, state)
        )
        USING DELTA
        COMMENT 'Processed holiday calendar exploded by state with binary holiday flag. Includes all training dates with is_holiday=0 for non-holidays.';
    """)


extractTransformLoad(
    bronze_tablename_holidays,
    silver_tablename_holidays,
    transform,
    create_table
)
print("Holidays silver table written.")

In [0]:
# To visualize the holidays
pdf = spark.sql("""
    SELECT date, state, is_holiday
    FROM silver_holidays
    ORDER BY date, state
""").toPandas()

pivot_df = pdf.pivot(
    index="date",
    columns="state",
    values="is_holiday"
)
pivot_df


### Oil

In [0]:
# Oil Dataframe
bronze_tablename_oil = DataTier.getBronzeName(DataframeNames.OIL)
silver_tablename_oil = DataTier.getSilverName(DataframeNames.OIL)

# Moves I/O outside transform
oil_raw = spark.read.table(bronze_tablename_oil).select("date")

bounds = oil_raw.agg(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).collect()[0]

min_date = bounds["min_date"]
max_date = bounds["max_date"]
print(f"Oil data covers dates {min_date} to {max_date}")

# This guarantees one row per calendar day.
oil_date_spine = (
    spark
    .range(1)
    .select(
        F.explode(
            F.sequence(F.lit(min_date), F.lit(max_date))
        ).alias("date")
    )
)


def transform(oil_df):
    import pyspark.sql.functions as F
    from pyspark.sql.window import Window

    oil_df = (
        oil_df
        .select("date", "dcoilwtico")
        .withColumn("date", F.to_date("date"))
        .withColumn("dcoilwtico", F.col("dcoilwtico").cast("double"))
    )

    # Join to continuous date spine
    oil_df = oil_date_spine.join(oil_df, on="date", how="left")

    # Forward fill
    window_ffill = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 0)
    oil_df = oil_df.withColumn(
        "dcoilwtico",
        F.last("dcoilwtico", ignorenulls=True).over(window_ffill)
    )

    # Backward fill for initial nulls
    window_bfill = Window.orderBy("date").rowsBetween(0, Window.unboundedFollowing)
    oil_df = oil_df.withColumn(
        "dcoilwtico",
        F.first("dcoilwtico", ignorenulls=True).over(window_bfill)
    )

    # Final safety: enforce NOT NULL contract
    oil_df = oil_df.filter(F.col("dcoilwtico").isNotNull())

    return oil_df

def create_table():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {silver_tablename_oil} (
            date DATE NOT NULL,
            dcoilwtico DOUBLE NOT NULL,
            CONSTRAINT pk_silver_oil PRIMARY KEY (date)
        )
        USING DELTA
        COMMENT 'Daily WTI Crude Oil prices with continuous dates and forward-filled values'
    """)


extractTransformLoad(
    bronze_tablename_oil,
    silver_tablename_oil,
    transform,
    create_table
)
print("Oil silver table written.")

### Train

In [0]:
# Train Dataframe
bronze_tablename_train = DataTier.getBronzeName(DataframeNames.TRAIN)
silver_tablename_train = DataTier.getSilverName(DataframeNames.TRAIN)
# transform = lambda df: df.dropna()

from pyspark.sql import functions as F

def transform(df):
    """
    Train data cleaning:

    1. Keep only rows where mandatory columns are not null
    2. Cast to correct types
    3. Optional: deduplicate by primary key (id)
    """
    df_clean = (
        df.filter(
            F.col("id").isNotNull() &
            F.col("date").isNotNull() &
            F.col("store_nbr").isNotNull() &
            F.col("family").isNotNull() &
            F.col("sales").isNotNull() &
            F.col("onpromotion").isNotNull()
        )
        .withColumn("sales", F.col("sales").cast("double"))
        .withColumn("onpromotion", F.col("onpromotion").cast("integer"))
        .withColumn("date", F.to_date("date"))
        .dropDuplicates(["id"])
    )
    return df_clean



def create_table():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {silver_tablename_train} (
            id INTEGER NOT NULL,
            date DATE NOT NULL,
            store_nbr INTEGER NOT NULL,
            family STRING NOT NULL,
            sales DOUBLE NOT NULL,
            onpromotion INTEGER NOT NULL,

            year INTEGER GENERATED ALWAYS AS (YEAR(date)),
            month INTEGER GENERATED ALWAYS AS (MONTH(date)),

            CONSTRAINT pk_train PRIMARY KEY (id),
            CONSTRAINT fk_train_store FOREIGN KEY (store_nbr) REFERENCES {silver_tablename_stores} (store_nbr)
        )
        USING DELTA
        PARTITIONED BY (year, month)
        COMMENT 'Silver layer train data - historical sales transactions with store and product family information. Partitioned by year/month for efficient time-based queries.'
    """)



train_streaming_query = extractTransformLoad(
    bronze_tablename_train,
    silver_tablename_train,
    transform,
    create_table
)

print("Train silver table written.")

In [0]:
# Test Dataframe
bronze_tablename_test = DataTier.getBronzeName(DataframeNames.TEST)
silver_tablename_test = DataTier.getSilverName(DataframeNames.TEST)

from pyspark.sql import functions as F

def transform_test(df):
    """
    Test data cleaning:

    1. Keep only rows where mandatory columns are not null
    2. Cast to correct types
    3. Drop duplicates if needed
    """
    df_clean = (
        df.filter(
            F.col("id").isNotNull() &
            F.col("date").isNotNull() &
            F.col("store_nbr").isNotNull() &
            F.col("family").isNotNull() &
            F.col("onpromotion").isNotNull()
        )
        .withColumn("onpromotion", F.col("onpromotion").cast("integer"))
        .withColumn("date", F.to_date("date"))
        .dropDuplicates(["id"])
    )
    return df_clean

def create_test_table():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {silver_tablename_test} (
            id INTEGER NOT NULL,
            date DATE NOT NULL,
            store_nbr INTEGER NOT NULL,
            family STRING NOT NULL,
            onpromotion INTEGER NOT NULL,

            year INTEGER GENERATED ALWAYS AS (YEAR(date)),
            month INTEGER GENERATED ALWAYS AS (MONTH(date)),

            CONSTRAINT pk_test PRIMARY KEY (id),
            CONSTRAINT fk_test_store FOREIGN KEY (store_nbr) REFERENCES {silver_tablename_stores} (store_nbr)
        )
        USING DELTA
        PARTITIONED BY (year, month)
        COMMENT 'Silver layer test data - store and product family information, partitioned by year/month.'
    """)

# Run ETL
test_streaming_query = extractTransformLoad(
    bronze_tablename_test,
    silver_tablename_test,
    transform_test,
    create_test_table
)

print("Test silver table written.")


## The function enrichTrainTest
Allow to add store, oil and holidays to both train and test

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import FeatureHasher, StringIndexer

def enrichTrainTest(df, indexer_models=None, include_features=True):
    if indexer_models is None:
        indexer_models = {}

    # Merge with stores
    stores_df = spark.read.format("delta").table(DataTier.getSilverName(DataframeNames.STORES))
    df = df.join(stores_df, on="store_nbr", how="left")

    # Merge with holidays
    holidays_df = spark.read.format("delta").table(DataTier.getSilverName(DataframeNames.HOLIDAYS))
    df = df.join(holidays_df, on=["date", "state"], how="left").fillna({"is_holiday": 0})

    # Merge with oil
    oil_df = spark.read.format("delta").table(DataTier.getSilverName(DataframeNames.OIL))
    df = df.join(oil_df, on="date", how="left")

    # Drop remaining nulls
    df = df.dropna()

    if include_features:
        categorical_columns = ["family", "city", "state", "type"]

        # FeatureHasher
        hasher = FeatureHasher(inputCols=categorical_columns, outputCol="hash_features", numFeatures=1024)
        df = hasher.transform(df)

        # 6. Encode categorical features using StringIndexer
        def stringEncoding(df, colname):
            all_unq_vals = df.select(colname).distinct().collect()
            print(f"Found {len(all_unq_vals)} unique values for {colname}:")
            
            encoding_dict = {} # {value: index}
            for i in range(len(all_unq_vals)):
                encoding_dict[all_unq_vals[i][0]] = i

            encoding_df = spark.createDataFrame(
            [(key, value) for key, value in encoding_dict.items()],
            schema=[colname, 'strIndxer_' + colname]
            )
            encoding_df_name = 'strIndxer_' + colname
            print(f"Encoding table {encoding_df_name} created.")

            encoding_df.write.mode("overwrite").format("delta").saveAsTable(encoding_df_name)
            print(f"Encoding table {encoding_df_name} saved.")

            df = df.join(
            encoding_df,
            on=colname
            )
            print(f"Encoding joined with source table on {colname}, new column {encoding_df_name} added.")
            return df
            
        for colname in categorical_columns:
            df = stringEncoding(df, colname)
        # df = stringEncoding(df, categorical_columns) 

        # # StringIndexer with reuse for train/test
        # for col in categorical_columns:
        #     output_col = f"{col}_idx"
        #     if col in indexer_models:
        #         model = indexer_models[col]
        #     else:
        #         indexer = StringIndexer(inputCol=col, outputCol=output_col, handleInvalid="keep")
        #         model = indexer.fit(df)
        #         indexer_models[col] = model
        #     df = model.transform(df)

        # Temporal features
        df = df.withColumn("day_of_week", F.dayofweek("date")) \
               .withColumn("day_of_month", F.dayofmonth("date")) \
               .withColumn("month", F.month("date")) \
               .withColumn("year", F.year("date"))

        # Salary day
        df = df.withColumn("is_salary_day",
                           F.when((F.dayofmonth("date") == 15) | (F.dayofmonth("date") == 30), 1).otherwise(0))

    return df, indexer_models


In [0]:
%sql
DROP TABLE IF EXISTS portfolio_catalog.databricks_pipeline.silver_training;
DROP TABLE IF EXISTS portfolio_catalog.databricks_pipeline.silver_testing;

In [0]:
train_df = spark.read.format("delta") \
    .table(DataTier.getSilverName(DataframeNames.TRAIN)) \
    .select("date", "store_nbr", "family", "sales", "onpromotion")

train_df, indexer_models = enrichTrainTest(train_df)
train_df.write.mode("overwrite").option("mergeSchema", "true") \
    .format("delta").saveAsTable(DataTier.getSilverName(DataframeNames.TRAINING))


In [0]:
test_df = spark.read.format("delta") \
    .table(DataTier.getSilverName(DataframeNames.TEST)) \
    .select("date", "store_nbr", "family", "onpromotion")  # no sales

test_df, _ = enrichTrainTest(test_df, indexer_models=indexer_models)
test_df.write.mode("overwrite").option("mergeSchema", "true") \
    .format("delta").saveAsTable(DataTier.getSilverName(DataframeNames.TESTING))


In [0]:
%sql
SELECT * from silver_training LIMIT 2;

In [0]:
%sql
SELECT * from silver_testing LIMIT 2;

In [0]:
DataTier.getSilverName(DataframeNames.TRAINING)

In [0]:
VOLUME_TARGET_DIR + '/silver_parquet'

In [0]:
silver_df = spark.table("silver_training")
silver_df.write.format("parquet").mode("overwrite").save(VOLUME_TARGET_DIR + "/silver_parquet")
display(silver_df)

In [0]:
silver_df = spark.table("silver_testing")
silver_df.write.format("parquet").mode("overwrite").save(VOLUME_TARGET_DIR + "/silver_test_parquet")
display(silver_df)