In [0]:
from pyspark.sql import functions as F
# Add widgets for parameters
dbutils.widgets.text("source_path", "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")
dbutils.widgets.dropdown("layer", "bronze", ["bronze","silver","gold"])

# Use parameters
source = dbutils.widgets.get("source_path")
layer = dbutils.widgets.get("layer")

In [0]:
BRONZE_PATH = "/Volumes/workspace/ecommerce/bronze/events/"
SILVER_PATH = "/Volumes/workspace/ecommerce/silver"
GOLD_PATH   = "/Volumes/workspace/ecommerce/gold/products/"

In [0]:
def run_bronze():
    df = (
        spark.read
             .option("header", True)
             .option("inferSchema", True)
             .csv(source)
    )

    df = df.withColumn("ingestion_ts", F.current_timestamp())

    (
        df.write
          .format("delta")
          .mode("overwrite")   # append if streaming / incremental
          .save(BRONZE_PATH)
    )

In [0]:
def run_silver():
    df = spark.read.format("delta").load(BRONZE_PATH)

    # Example cleaning logic
    df = (
        df.dropDuplicates(["event_id"])
          .filter(F.col("event_id").isNotNull())
    )

    (
        df.write
          .format("delta")
          .mode("overwrite")
          .save(SILVER_PATH)
    )

In [0]:
def run_gold():
    df = spark.read.format("delta").load(SILVER_PATH)

    # Example aggregation
    gold_df = (
        df.groupBy("event_type")
          .agg(
              F.count("*").alias("event_count"),
              F.max("event_time").alias("last_event_time")
          )
    )

    (
        gold_df.write
               .format("delta")
               .mode("overwrite")
               .save(GOLD_PATH)
    )

In [0]:
def run_layer(layer_name):
    if layer_name == "bronze":
        run_bronze()
    elif layer_name == "silver":
        run_silver()
    elif layer_name == "gold":
        run_gold()
    else:
        raise ValueError(f"Unknown layer: {layer_name}")