#Day 07 of 14 Days Challenge

In [0]:
from pyspark.sql import functions as F
from datetime import datetime

def log(msg):
    print(f"[{datetime.now()}] üîπ {msg}")


In [0]:
# Clean existing widgets
dbutils.widgets.removeAll()

# Source file path widget
dbutils.widgets.text(
    "source_path",
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    "Source File Path"
)

# Layer selector
dbutils.widgets.dropdown(
    "layer",
    "bronze",
    ["bronze", "silver", "gold"],
    "Select Processing Layer"
)

# Read widget values
source_file = dbutils.widgets.get("source_path")
active_layer = dbutils.widgets.get("layer")

log(f"Job started for layer: {active_layer}")
log(f"Processing source: {source_file}")


In [0]:
def process_bronze():
    log("Running BRONZE layer")

    raw_df = spark.read.csv(
        source_file,
        header=True,
        inferSchema=True
    )

    log(f"Raw records read: {raw_df.count():,}")

    bronze_df = raw_df.withColumn(
        "ingestion_ts",
        F.current_timestamp()
    )

    bronze_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("bronze_events")

    log("BRONZE write completed ‚Üí table: bronze_events")
    return "Bronze Success"


In [0]:
def process_silver():
    log("Running SILVER layer")

    bronze_df = spark.read.table(source_file)
    log(f"Bronze records loaded: {bronze_df.count():,}")

    silver_df = (
        bronze_df
        .withColumn("price_clean", F.expr("try_cast(price as double)"))
        .filter(F.col("price_clean").isNotNull())
        .filter(F.col("price_clean") > 0)
        .dropDuplicates(["user_session", "event_time"])
        .withColumn(
            "product_name",
            F.coalesce(
                F.element_at(F.split(F.col("category_code"), r"\."), -1),
                F.lit("Other")
            )
        )
        .drop("price")
        .withColumnRenamed("price_clean", "price")
    )

    log(f"Silver records after cleaning: {silver_df.count():,}")

    silver_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("silver_events")

    log("SILVER write completed ‚Üí table: silver_events")
    return "Silver Success"


In [0]:
def process_gold():
    log("Running GOLD layer")

    silver_df = spark.read.table(source_file)
    log(f"Silver records loaded: {silver_df.count():,}")

    gold_df = (
        silver_df
        .groupBy("product_id", "product_name")
        .agg(
            F.sum("price").alias("total_revenue")
        )
    )

    log(f"Gold records generated: {gold_df.count():,}")

    gold_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("gold_product_revenue")

    log("GOLD write completed ‚Üí table: gold_product_revenue")
    return "Gold Success"


In [0]:
try:
    if active_layer == "bronze":
        result = process_bronze()

    elif active_layer == "silver":
        result = process_silver()

    elif active_layer == "gold":
        result = process_gold()

    log(f"{active_layer.upper()} layer finished successfully")
    log(f"Result: {result}")

except Exception as e:
    log(f"‚ùå ERROR in {active_layer.upper()} layer")
    log(str(e))
    raise
