### **DAY 7 (15/01/26) – Workflows & Job Orchestration**

### Learn:

- Databricks Jobs vs notebooks
- Multi-task workflows
- Parameters & scheduling
- Error handling

### 🛠️ Tasks:

1. Add parameter widgets to notebooks
2. Create multi-task job (Bronze→Silver→Gold)
3. Set up dependencies
4. Schedule execution

In [0]:
dbutils.widgets.text("task", "bronze", "Pipeline Task")
task = dbutils.widgets.get("task")
print("Running task:", task)


Running task: bronze


In [0]:
if task == "bronze":
    df = spark.read.csv(
        "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
        header=True,
        inferSchema=True
    )

    from pyspark.sql import functions as F

    events_bronze = df.withColumn("ingestion_ts", F.current_timestamp())

    events_bronze.write.format("delta").mode("overwrite").saveAsTable("events_bronze")

    print("Bronze stage completed")


In [0]:
if task == "silver":
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window

    bronze = spark.table("events_bronze")

    w = Window.partitionBy("user_session", "event_time", "product_id").orderBy(F.col("ingestion_ts").desc())

    silver = (
        bronze
        .filter(F.col("event_time").isNotNull())
        .filter(F.col("product_id").isNotNull())
        .filter(F.col("user_id").isNotNull())
        .filter(F.col("price").isNotNull())
        .filter((F.col("price") >= 0) & (F.col("price") < 10000))
        .filter(F.col("event_type").isin("view", "cart", "purchase"))
        .withColumn("rn", F.row_number().over(w))
        .filter(F.col("rn") == 1)
        .drop("rn")
        .withColumn("event_date", F.to_date("event_time"))
        .withColumn(
            "price_tier",
            F.when(F.col("price") < 10, "budget")
             .when(F.col("price") < 50, "mid")
             .otherwise("premium")
        )
        .drop("ingestion_ts")
    )

    silver.write.format("delta").mode("overwrite").saveAsTable("events_silver")

    print("Silver stage completed")


In [0]:
if task == "gold":
    from pyspark.sql import functions as F

    silver = spark.table("events_silver")

    gold = (
        silver.groupBy("product_id", "brand", "category_code")
        .agg(
            F.countDistinct(F.when(F.col("event_type") == "view", F.col("user_id"))).alias("views"),
            F.countDistinct(F.when(F.col("event_type") == "cart", F.col("user_id"))).alias("carts"),
            F.countDistinct(F.when(F.col("event_type") == "purchase", F.col("user_id"))).alias("purchases"),
            F.sum(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("revenue")
        )
        .withColumn(
            "conversion_rate_pct",
            F.round(F.col("purchases") / F.col("views") * 100, 2)
        )
        .withColumn(
            "avg_order_value",
            F.round(F.col("revenue") / F.col("purchases"), 2)
        )
    )

    gold.write.format("delta").mode("overwrite").saveAsTable("events_gold_products")

    print("Gold stage completed")


## Databricks Pipeline Execution Logic

This notebook is designed to run as part of a Databricks **multi-task job**.

The behavior of the notebook is controlled by the `task` parameter:

- `task = bronze`  
  Reads new raw CSV files and writes them into the Delta Bronze table (`events_bronze`) with an ingestion timestamp.

- `task = silver`  
  Reads from `events_bronze`, applies data quality rules, deduplication, and enrichment, and writes the cleaned data into `events_silver`.

- `task = gold`  
  Reads from `events_silver` and computes business aggregates, writing results into `events_gold_products`.

The Databricks Job runs this same notebook three times per execution:
Bronze → Silver → Gold, ensuring correct dependency order and automated daily processing.
