In [0]:
from pyspark.sql.functions import col, current_timestamp, to_date, count, desc

# --- CONFIGURATION ---
catalog = "azure_databricks_personal"
schema = "default"
volume_name = "volume_checkpoints"

# 1. Create the Volume if it doesn't exist (THE FIX)
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.{volume_name}")

# 2. Define paths
source_path = "/databricks-datasets/structured-streaming/events/"
checkpoint_base = f"/Volumes/{catalog}/{schema}/{volume_name}/"

# --- 1. BRONZE LAYER (Ingest Raw Data) ---
print("Starting Bronze Load...")

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    # This now points to a valid Volume
    .option("cloudFiles.schemaLocation", f"{checkpoint_base}/bronze_schema") 
    .load(source_path)
    .withColumn("ingestion_time", current_timestamp())
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base}/bronze_adf")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.{schema}.bronze_events_adf"))

print("✅ Bronze Load Complete.")

# --- 2. SILVER LAYER ---
print("Starting Silver Load...")

(spark.readStream
    .table(f"{catalog}.{schema}.bronze_events_adf")
    .select(
        col("time").cast("timestamp").alias("event_time"),
        col("action"),
        to_date(col("time").cast("timestamp")).alias("event_date"),
        col("_rescued_data")
    )
    .where("action IS NOT NULL")
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base}/silver_adf")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.{schema}.silver_events_adf"))

print("✅ Silver Load Complete.")

# --- 3. GOLD LAYER ---
print("Starting Gold Load...")

df_silver = spark.table(f"{catalog}.{schema}.silver_events_adf")

df_gold = (df_silver
    .groupBy("event_date", "action")
    .agg(count("*").alias("action_count"))
    .orderBy(desc("event_date")))

df_gold.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.gold_daily_summary_adf")

print("✅ Pipeline Finished Successfully!")

In [0]:
%sh
cd "/Workspace/Repos/tithirbakshi@outlook.com/azure-databricks-retail-lakehouse"

# 2. Force push
git push -f origin main