In [0]:
from pyspark.sql import functions as F

# Add widgets for parameters
dbutils.widgets.text("source_path", "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")
dbutils.widgets.dropdown("layer", "bronze", ["bronze","silver","gold"])

In [0]:
# Use parameters
source = dbutils.widgets.get("source_path")
layer = dbutils.widgets.get("layer")

In [0]:
def run_bronze(source_path: str):
    raw = spark.read.csv(
        source_path,
        header=True,
        inferSchema=True
    )

    (
        raw.withColumn("ingestion_ts", F.current_timestamp())
           .write
           .format("delta")
           .mode("overwrite")
           .save("/Volumes/workspace/ecommerce/ecommerce_data/bronze/events_v1")
    )

    print("✅ Bronze layer completed successfully")


In [0]:
def run_silver():
    bronze = spark.read.format("delta").load(
        "/Volumes/workspace/ecommerce/ecommerce_data/bronze/events_v1"
    )

    silver = (
        bronze
        .filter(F.col("price") > 0)
        .filter(F.col("price") < 10000)
        .dropDuplicates(["user_session", "event_time"])
        .withColumn("event_date", F.to_date("event_time"))
        .withColumn(
            "price_tier",
            F.when(F.col("price") < 10, "budget")
             .when(F.col("price") < 50, "mid")
             .otherwise("premium")
        )
    )

    (
        silver.write
              .format("delta")
              .mode("overwrite")
              .save("/Volumes/workspace/ecommerce/ecommerce_data/silver/events_v1")
    )

    print("✅ Silver layer completed successfully")


In [0]:
def run_gold():
    silver = spark.read.format("delta").load(
        "/Volumes/workspace/ecommerce/ecommerce_data/silver/events_v1"
    )

    product_perf = (
        silver.groupBy("product_id", "brand")
        .agg(
            F.countDistinct(
                F.when(F.col("event_type") == "view", F.col("user_id"))
            ).alias("views"),
            F.countDistinct(
                F.when(F.col("event_type") == "purchase", F.col("user_id"))
            ).alias("purchases"),
            F.sum(
                F.when(
                    F.col("event_type") == "purchase",
                    F.col("price").cast("double")
                )
            ).alias("revenue")
        )
        .withColumn(
            "conversion_rate",
            F.when(
                F.col("views") != 0,
                (F.col("purchases") / F.col("views")) * 100
            )
        )
    )

    (
        product_perf.write
                     .format("delta")
                     .mode("overwrite")
                     .save("/Volumes/workspace/ecommerce/ecommerce_data/gold/products_v1")
    )

    print("✅ Gold layer completed successfully")


In [0]:
def run_layer(layer_name: str, source_path: str):
    layer_name = layer_name.lower()

    if layer_name == "bronze":
        run_bronze(source_path)

    elif layer_name == "silver":
        run_silver()

    elif layer_name == "gold":
        run_gold()

    else:
        raise ValueError(f"❌ Invalid layer provided: {layer_name}")


In [0]:
run_layer(layer, source)


✅ Bronze layer completed successfully
