# Retail Data Analytics Platform — Simplified Databricks Version

This notebook builds a Bronze → Silver → Gold pipeline using existing files in `/FileStore/tables/`.

Tables are registered under the schema **`retail_demo`**.

In [None]:
# Paths and setup
RAW_PATH = '/FileStore/tables/'
DELTA_BASE = '/dbfs/FileStore/delta/retail_project/'
BRONZE_PATH = DELTA_BASE + 'bronze/'
SILVER_PATH = DELTA_BASE + 'silver/'
GOLD_PATH = DELTA_BASE + 'gold/'

spark.sql('CREATE DATABASE IF NOT EXISTS retail_demo')

## 1. Bronze Layer — Data Ingestion

In [None]:
customers_df = spark.read.option('header', True).csv(RAW_PATH + 'customers.csv')
orders_day1_df = spark.read.option('header', True).csv(RAW_PATH + 'orders_day1.csv')
products_df = spark.read.option('multiLine', True).json(RAW_PATH + 'products.json')

customers_df.write.format('delta').mode('overwrite').save(BRONZE_PATH + 'customers')
orders_day1_df.write.format('delta').mode('overwrite').save(BRONZE_PATH + 'orders')
products_df.write.format('delta').mode('overwrite').save(BRONZE_PATH + 'products')

spark.sql("DROP TABLE IF EXISTS retail_demo.bronze_customers")
spark.sql(f"CREATE TABLE retail_demo.bronze_customers USING DELTA LOCATION '{BRONZE_PATH}customers'")
spark.sql("DROP TABLE IF EXISTS retail_demo.bronze_orders")
spark.sql(f"CREATE TABLE retail_demo.bronze_orders USING DELTA LOCATION '{BRONZE_PATH}orders'")
spark.sql("DROP TABLE IF EXISTS retail_demo.bronze_products")
spark.sql(f"CREATE TABLE retail_demo.bronze_products USING DELTA LOCATION '{BRONZE_PATH}products'")

## 2. Silver Layer — Cleansing & Transformation

In [None]:
from pyspark.sql.functions import col

customers = spark.read.format('delta').load(BRONZE_PATH + 'customers')
orders = spark.read.format('delta').load(BRONZE_PATH + 'orders')
products = spark.read.format('delta').load(BRONZE_PATH + 'products')

orders = orders.withColumn('quantity', col('quantity').cast('int'))\
               .withColumn('price', col('price').cast('double'))\
               .filter(col('status') == 'Completed')\
               .withColumn('total_amount', col('quantity') * col('price'))

silver = orders.join(customers, 'customer_id', 'left')\
               .join(products, orders.product == products.product_name, 'left')\
               .select('order_id','customer_id','name','region','email','product','quantity','price','total_amount','status','order_date','product_id','category')

silver.write.format('delta').mode('overwrite').save(SILVER_PATH + 'orders')
spark.sql("DROP TABLE IF EXISTS retail_demo.silver_orders")
spark.sql(f"CREATE TABLE retail_demo.silver_orders USING DELTA LOCATION '{SILVER_PATH}orders'")

## 3. Gold Layer — Aggregations

In [None]:
from pyspark.sql.functions import sum as _sum, row_number, desc
from pyspark.sql.window import Window

silver_df = spark.read.format('delta').load(SILVER_PATH + 'orders')

revenue_by_region = silver_df.groupBy('region').agg(_sum('total_amount').alias('total_revenue'))
revenue_by_region.write.format('delta').mode('overwrite').save(GOLD_PATH + 'revenue_by_region')

product_sales = silver_df.groupBy('product','product_id','category').agg(_sum('total_amount').alias('revenue'))
ranked = product_sales.withColumn('rank', row_number().over(Window.orderBy(desc('revenue'))))
ranked.write.format('delta').mode('overwrite').save(GOLD_PATH + 'sales_summary')

spark.sql("DROP TABLE IF EXISTS retail_demo.gold_sales_summary")
spark.sql(f"CREATE TABLE retail_demo.gold_sales_summary USING DELTA LOCATION '{GOLD_PATH}sales_summary'")

## 4. Incremental Load (MERGE)

In [None]:
from delta.tables import DeltaTable

orders_day2 = spark.read.option('header', True).csv(RAW_PATH + 'orders_day2.csv')\
               .withColumn('quantity', col('quantity').cast('int'))\
               .withColumn('price', col('price').cast('double'))\
               .filter(col('status') == 'Completed')\
               .withColumn('total_amount', col('quantity') * col('price'))

incoming = orders_day2.join(customers, 'customer_id', 'left')\
                       .join(products, orders_day2.product == products.product_name, 'left')\
                       .select('order_id','customer_id','name','region','email','product','quantity','price','total_amount','status','order_date','product_id','category')

silver_delta = DeltaTable.forPath(spark, SILVER_PATH + 'orders')

silver_delta.alias('t').merge(
    incoming.alias('s'),
    't.order_id = s.order_id'
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

## 5. Time Travel & VACUUM

In [None]:
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, GOLD_PATH + 'sales_summary')
display(dt.history())

# Example: query version 0
old_version = spark.read.format('delta').option('versionAsOf', 0).load(GOLD_PATH + 'sales_summary')
display(old_version)

# VACUUM (for demo only)
# spark.sql('SET spark.databricks.delta.retentionDurationCheck.enabled = false')
# spark.sql(f"VACUUM delta.`{GOLD_PATH}sales_summary` RETAIN 0 HOURS")