###  Ingest → Bronze (keep it raw but queryable)


In [0]:
from pyspark.sql.functions import current_timestamp, col, to_date

# Catalog/Schema setup
catalog = "hive_metastore"      # leave as is
schema  = "retail"              # custom DB name
base    = "/mnt/delta/retail"   # Delta storage
rawdir  = "/FileStore/tables/retail"  # where your uploaded CSVs live

# 1. Create the database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema}")

# 2. Read raw CSVs (use correct filenames from Kaggle)
sales_raw = (spark.read.option("header","true").option("inferSchema","true")
             .csv(f"{rawdir}/sales.csv"))

features_raw = (spark.read.option("header","true").option("inferSchema","true")
                .csv(f"{rawdir}/features.csv"))

stores_raw = (spark.read.option("header","true").option("inferSchema","true")
              .csv(f"{rawdir}/stores.csv"))

# 3. Bronze transformations (minimal typing + ingest timestamp)
sales_bronze = (sales_raw
    .withColumn("ingest_ts", current_timestamp())
    .withColumn("Date", to_date(col("Date")))   # ensure date type
)

features_bronze = (features_raw
    .withColumn("ingest_ts", current_timestamp())
    .withColumn("Date", to_date(col("Date")))
)

stores_bronze = stores_raw.withColumn("ingest_ts", current_timestamp())

# 4. Write out to Delta Lake (overwrite safely)
sales_bronze.write.format("delta").mode("overwrite").save(f"{base}/bronze/sales")
features_bronze.write.format("delta").mode("overwrite").save(f"{base}/bronze/features")
stores_bronze.write.format("delta").mode("overwrite").save(f"{base}/bronze/stores")

# 5. Register external tables for SQL access
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {schema}.bronze_sales
USING DELTA
LOCATION '{base}/bronze/sales'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {schema}.bronze_features
USING DELTA
LOCATION '{base}/bronze/features'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {schema}.bronze_stores
USING DELTA
LOCATION '{base}/bronze/stores'
""")

# 6. Test by showing a sample
display(spark.table(f"{schema}.bronze_sales").limit(5))


Store,Dept,Date,Weekly_Sales,IsHoliday,ingest_ts
36,7,2011-02-18,381.79,False,2025-08-31T06:33:14.02032Z
36,7,2011-02-25,451.69,False,2025-08-31T06:33:14.02032Z
36,7,2011-03-04,290.64,False,2025-08-31T06:33:14.02032Z
36,7,2011-03-11,355.31,False,2025-08-31T06:33:14.02032Z
36,7,2011-03-18,449.41,False,2025-08-31T06:33:14.02032Z


### Transform → Silver (clean, type-safe, de-duplicated)

In [0]:
import re
from pyspark.sql import functions as F
from pyspark.sql import types as T

def snakeify(df):
    """rename columns to snake_case"""
    new = df
    for c in df.columns:
        nc = re.sub(r'[^0-9a-zA-Z]+','_', c).strip('_').lower()
        if nc != c:
            new = new.withColumnRenamed(c, nc)
    return new

# Load bronze
b_sales    = spark.table(f"{schema}.bronze_sales")
b_features = spark.table(f"{schema}.bronze_features")
b_stores   = spark.table(f"{schema}.bronze_stores")  

# 2.1 Clean SALES
s_sales = (snakeify(b_sales)
    .select(
        F.col("store").cast("int").alias("store"),
        F.col("dept").cast("int").alias("dept"),
        F.col("date").cast("date").alias("date"),
        F.col("weekly_sales").cast("double").alias("weekly_sales"),
        F.col("isholiday").cast("boolean").alias("is_holiday"),
        F.col("ingest_ts")
    )
    .dropDuplicates()
)

# 2.2 Clean FEATURES (env/price/holiday signals)
markdown_cols = [f"markdown{i}" for i in range(1,6)]
selected_markdowns = [c for c in markdown_cols if c in b_features.columns]

s_features = (
    snakeify(b_features)
    .select(
        F.col("store").cast("int").alias("store"),
        F.col("date").cast("date").alias("date"),
        F.col("temperature").cast("double").alias("temperature"),
        F.col("fuel_price").cast("double").alias("fuel_price"),
        *[F.col(c).cast("double").alias(c) for c in selected_markdowns],
        F.col("cpi").cast("double").alias("cpi"),
        F.col("unemployment").cast("double").alias("unemployment"),
        F.col("isholiday").cast("boolean").alias("is_holiday"),
        F.col("ingest_ts")
    )
    # only fill the markdowns we actually selected
    .na.fill({c: 0.0 for c in selected_markdowns})
    .dropDuplicates()
)

# 2.3 Clean STORES (dimension)
s_stores = (snakeify(b_stores)
    .select(
        F.col("store").cast("int").alias("store"),
        F.col("type").alias("type"),
        F.col("size").cast("int").alias("size"),
        F.col("ingest_ts")
    )
    .dropDuplicates(["store"])
)

# Write Silver
# 1. Write cleaned Silver tables to Delta
s_sales.write.format("delta").mode("overwrite").save(f"{base}/silver/sales")
s_features.write.format("delta").mode("overwrite").save(f"{base}/silver/features")
s_stores.write.format("delta").mode("overwrite").save(f"{base}/silver/stores")

# 2. Register them in the metastore for SQL access
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {schema}.silver_sales
USING DELTA
LOCATION '{base}/silver/sales'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {schema}.silver_features
USING DELTA
LOCATION '{base}/silver/features'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {schema}.silver_stores
USING DELTA
LOCATION '{base}/silver/stores'
""")


display(spark.table(f"{schema}.silver_sales").limit(5))



store,dept,date,weekly_sales,is_holiday,ingest_ts
1,5,2010-10-22,19525.85,False,2025-08-31T06:33:14.02032Z
1,6,2010-05-28,6165.59,False,2025-08-31T06:33:14.02032Z
1,6,2010-12-31,5878.25,True,2025-08-31T06:33:14.02032Z
1,6,2011-08-26,3296.22,False,2025-08-31T06:33:14.02032Z
1,6,2011-10-21,5111.85,False,2025-08-31T06:33:14.02032Z


### Model → Gold (Star Schema)

In [0]:
from pyspark.sql.functions import year, month, weekofyear, quarter, dayofweek, col

# Load Silver tables
sales    = spark.table(f"{schema}.silver_sales")
features = spark.table(f"{schema}.silver_features")
stores   = spark.table(f"{schema}.silver_stores")

# ============================================================
# 3.1 DIMENSIONS
# ============================================================

# Store dimension
dim_store = stores.select(
    col("store").alias("store_key"),
    col("type"),
    col("size")
).dropDuplicates()

# Dept dimension (since 'dept' is like a product category)
dim_dept = sales.select(
    col("dept").alias("dept_key")
).distinct()

# Date dimension (from distinct sales dates)
dim_date = (sales
    .select(col("date").alias("date_key")).distinct()
    .withColumn("year", year("date_key"))
    .withColumn("month", month("date_key"))
    .withColumn("quarter", quarter("date_key"))
    .withColumn("week_of_year", weekofyear("date_key"))
    .withColumn("day_of_week", dayofweek("date_key"))
)

# ============================================================
# 3.2 FACT TABLE
# ============================================================

fact_sales = (sales.alias("s")
    .join(features.alias("f"), on=["store","date"], how="left")
    .select(
        col("s.store").alias("store_key"),
        col("s.dept").alias("dept_key"),
        col("s.date").alias("date_key"),
        col("s.weekly_sales"),
        col("s.is_holiday").alias("is_holiday"),
        col("f.temperature"),
        col("f.fuel_price"),
        col("f.cpi"),
        col("f.unemployment")
        # markdowns removed because not present in silver_features
    )
)


# ============================================================
# 3.3 WRITE GOLD LAYER (Delta Lake)
# ============================================================

(dim_store.write
    .format("delta").mode("overwrite")
    .save(f"{base}/gold/dim_store"))

(dim_dept.write
    .format("delta").mode("overwrite")
    .save(f"{base}/gold/dim_dept"))

(dim_date.write
    .format("delta").mode("overwrite")
    .partitionBy("year")
    .save(f"{base}/gold/dim_date"))

(fact_sales
    .withColumn("year", year("date_key"))
    .write.format("delta").mode("overwrite")
    .partitionBy("year")
    .save(f"{base}/gold/fact_sales"))

# ============================================================
# 3.4 REGISTER GOLD TABLES
# ============================================================

spark.sql(f"CREATE TABLE IF NOT EXISTS {schema}.dim_store   USING DELTA LOCATION '{base}/gold/dim_store'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {schema}.dim_dept    USING DELTA LOCATION '{base}/gold/dim_dept'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {schema}.dim_date    USING DELTA LOCATION '{base}/gold/dim_date'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {schema}.fact_sales  USING DELTA LOCATION '{base}/gold/fact_sales'")

# ============================================================
# 3.5 DELTA MAINTENANCE
# ============================================================

spark.sql("OPTIMIZE retail.fact_sales ZORDER BY (store_key, date_key)")


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

### Analytics (SQL KPIs + dashboard-ready)

_Top departments by revenue_

In [0]:
%sql
USE retail;

SELECT d.dept_key AS dept,
       ROUND(SUM(f.weekly_sales), 2) AS revenue
FROM fact_sales f
JOIN dim_dept d ON f.dept_key = d.dept_key
GROUP BY d.dept_key
ORDER BY revenue DESC
LIMIT 15;


dept,revenue
92,483943341.87
95,449320162.52
38,393118136.92
72,305725152.21
90,291068463.68
40,288936022.05
2,280611174.43
91,216781705.73
13,197321569.95
8,194280780.73


_Store revenue trend (weekly)_

In [0]:
%sql
SELECT f.store_key AS store,
       d.year, d.week_of_year,
       SUM(f.weekly_sales) AS weekly_revenue
FROM fact_sales f
JOIN dim_date d ON f.date_key = d.date_key
GROUP BY store, d.year, d.week_of_year
ORDER BY d.year, d.week_of_year, store;


store,year,week_of_year,weekly_revenue
1,2010,5,1643690.9000000004
2,2010,5,2136989.4599999995
3,2010,5,461622.22
4,2010,5,2135143.87
5,2010,5,317173.10000000003
6,2010,5,1652635.1
7,2010,5,496725.44
8,2010,5,1004137.0900000004
9,2010,5,549505.55
10,2010,5,2193048.749999999


_Holiday impact_

In [0]:
%sql
SELECT CASE WHEN f.is_holiday THEN 'Holiday Week' ELSE 'Non-Holiday' END AS period,
       ROUND(AVG(f.weekly_sales),2) AS avg_sales
FROM fact_sales f
GROUP BY f.is_holiday
ORDER BY avg_sales DESC;


period,avg_sales
Holiday Week,17035.82
Non-Holiday,15901.45


_Store type & size performance_

In [0]:
%sql
SELECT s.type,
       APPROX_PERCENTILE(s.size, 0.5) AS median_size,
       ROUND(SUM(f.weekly_sales),2)   AS total_sales
FROM fact_sales f
JOIN dim_store s ON f.store_key = s.store_key
GROUP BY s.type
ORDER BY total_sales DESC;


type,median_size,total_sales
A,202505,4331014722.75
B,114533,2000700736.82
C,39910,405503527.54


_Quick sanity checks_

In [0]:
%sql
-- row counts
SELECT 'sales' AS table, COUNT(*) FROM silver_sales
UNION ALL SELECT 'features', COUNT(*) FROM silver_features
UNION ALL SELECT 'stores', COUNT(*) FROM silver_stores
UNION ALL SELECT 'fact_sales', COUNT(*) FROM fact_sales;

-- key distributions
SELECT MIN(date_key), MAX(date_key) FROM fact_sales;
SELECT COUNT(DISTINCT store_key) AS stores, COUNT(DISTINCT dept_key) AS depts FROM fact_sales;


stores,depts
45,81
