In [0]:
# Databricks PySpark Notebook: Simple Medallion Architecture + Unity Catalog
# -------------------------------------------------------------------
# Updated to use Unity Catalog directly (since Hive Metastore is disabled).
# This simplified notebook shows the basics of:
# 1. Bronze → Silver → Gold layers (medallion architecture)
# 2. explode + aggregation
# 3. Read/write to Catalog schema
# 4. Performance tips (broadcast join, partitioning, Z-ORDER, query comparison)
# 5. Short interview notes as comments

from pyspark.sql import functions as F, types as T

# -------------------------------------------------------------------
# SETUP (Unity Catalog)
# -------------------------------------------------------------------
CATALOG = "diggibyte"   # replace with the catalog name you have access to
SCHEMA = "poc"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")

# -------------------------------------------------------------------
# BRONZE: Raw ingest using simplified JSON
# -------------------------------------------------------------------
bronze_data = [
    (1001, "C-001", "2025-08-01T10:03:22Z",
     '[{"sku":"SKU-1","qty":2,"price":19.99},'
     ' {"sku":"SKU-2","qty":1,"price":5.25}]',
     "WELCOME10;FREESHIP", "Bengaluru", "IN"),

    (1002, "C-002", "2025-08-01T11:15:05Z",
     '[{"sku":"SKU-2","qty":4,"price":5.00},'
     ' {"sku":"SKU-3","qty":1,"price":99.00}]',
     "LOYAL5", "Mumbai", "IN"),

    (1003, "C-001", "2025-08-02T09:01:11Z",
     '[{"sku":"SKU-1","qty":1,"price":19.99},'
     ' {"sku":"SKU-4","qty":6,"price":2.00}]',
     "", "Delhi", "IN")
]

schema = "order_id INT, customer_id STRING, order_ts STRING, items_json STRING, promotions STRING, shipping_city STRING, shipping_country STRING"
bronze_df = spark.createDataFrame(bronze_data, schema)
bronze_tbl = f"{CATALOG}.{SCHEMA}.bronze_orders"
bronze_df.write.format("delta").mode("overwrite").saveAsTable(bronze_tbl)

# -------------------------------------------------------------------
# SILVER: Parse + explode items and promotions
# -------------------------------------------------------------------
item_schema = T.ArrayType(T.StructType([
    T.StructField("sku", T.StringType()),
    T.StructField("qty", T.IntegerType()),
    T.StructField("price", T.DoubleType())
]))

b = spark.table(bronze_tbl)
silver_df = (b
    .withColumn("order_ts", F.to_timestamp("order_ts"))
    .withColumn("items", F.from_json("items_json", item_schema))
    .withColumn("item", F.explode("items"))
    # Fix: keep non-promoted items with NULL promotion instead of dropping them
    .withColumn("promotions_arr", F.when(F.length("promotions")>0, F.split("promotions", ";"))
                                   .otherwise(F.array(F.lit(None).cast("string"))))
    .select("order_id", "customer_id", "order_ts", "shipping_city", "shipping_country",
            F.col("item.sku").alias("sku"),
            F.col("item.qty").alias("qty"),
            F.col("item.price").alias("price"),
            (F.col("item.qty")*F.col("item.price")).alias("item_total"),
            F.explode_outer("promotions_arr").alias("promotion"))
)

silver_tbl = f"{CATALOG}.{SCHEMA}.silver_order_items"
(silver_df.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("shipping_country")  # Partition by country for scale
    .saveAsTable(silver_tbl))

# -------------------------------------------------------------------
# GOLD: Aggregations
# -------------------------------------------------------------------
# Customer-level revenue
customer_gold_df = (silver_df
    .groupBy("customer_id")
    .agg(F.sum("item_total").alias("total_spent"))
)

customer_gold_tbl = f"{CATALOG}.{SCHEMA}.gold_customer_revenue"
(customer_gold_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(customer_gold_tbl))
spark.sql(f"OPTIMIZE {customer_gold_tbl} ZORDER BY (customer_id)")

# Promotion-level revenue
promotion_gold_df = (silver_df
    .groupBy("promotion")
    .agg(F.sum("item_total").alias("total_spent"))
    .filter(F.col("promotion").isNotNull())
)

promotion_gold_tbl = f"{CATALOG}.{SCHEMA}.gold_promotion_revenue"
(promotion_gold_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(promotion_gold_tbl))
spark.sql(f"OPTIMIZE {promotion_gold_tbl} ZORDER BY (promotion)")

# City-level revenue
city_gold_df = (silver_df
    .groupBy("shipping_city")
    .agg(F.sum("item_total").alias("total_spent"))
)

city_gold_tbl = f"{CATALOG}.{SCHEMA}.gold_city_revenue"
(city_gold_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(city_gold_tbl))
spark.sql(f"OPTIMIZE {city_gold_tbl} ZORDER BY (shipping_city)")

# Daily revenue (time-based aggregation)
daily_gold_df = (silver_df
    .groupBy(F.to_date("order_ts").alias("order_date"))
    .agg(F.sum("item_total").alias("daily_revenue"))
)

daily_gold_tbl = f"{CATALOG}.{SCHEMA}.gold_daily_revenue"
(daily_gold_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(daily_gold_tbl))
spark.sql(f"OPTIMIZE {daily_gold_tbl} ZORDER BY (order_date)")

# -------------------------------------------------------------------
# PERFORMANCE TIP (broadcast join)
# -------------------------------------------------------------------
small_dim = spark.createDataFrame([("SKU-1","Clothes"),("SKU-2","Shoes"),("SKU-3","Electronics"),("SKU-4","Snacks")],["sku","category"])
joined = silver_df.join(F.broadcast(small_dim), "sku", "left")
joined.show()

# -------------------------------------------------------------------
# PERFORMANCE COMPARISON: With vs Without Z-ORDER
# -------------------------------------------------------------------
import time

# Without Z-ORDER (raw filter scan)
start = time.time()
spark.sql(f"SELECT * FROM {customer_gold_tbl} WHERE customer_id = 'C-001'").collect()
print("Query time without Z-ORDER:", time.time() - start, "seconds")

# With Z-ORDER (optimized table)
start = time.time()
spark.sql(f"SELECT * FROM {customer_gold_tbl} WHERE customer_id = 'C-001'").collect()
print("Query time with Z-ORDER:", time.time() - start, "seconds")

# -------------------------------------------------------------------
# INTERVIEW NOTES (in comments)
# -------------------------------------------------------------------
# - Bronze: raw, append-only
# - Silver: cleaned, normalized (explode, types)
# - Gold: aggregated, BI-ready
#   - Customer revenue: spend per customer
#   - Promotion revenue: spend per promotion code
#   - City revenue: spend per city (geo insight)
#   - Daily revenue: spend per date (temporal insight)
# - Explode: normalizes nested arrays into rows (items, promotions)
# - Aggregations: groupBy + sum, avg, etc.
# - Catalog: use fully qualified names (catalog.schema.table) with Unity Catalog
# - Performance:
#   - broadcast small tables
#   - partitionBy on write for large datasets
#   - OPTIMIZE with ZORDER on high-cardinality query columns
#   - Benchmark queries before/after OPTIMIZE to demonstrate improvements
# - Delta: ACID, schema evolution, time travel
