In [0]:
# Create a widget for process_date
dbutils.widgets.text("process_date", "")

# Get widget value
process_date = dbutils.widgets.get("process_date")

# If no date passed, default to yesterday
from pyspark.sql.functions import current_date, date_sub

if process_date == "":
    process_date = spark.sql("SELECT date_sub(current_date(),1)").collect()[0][0]

print(f"Running pipeline for the processing date: {process_date}")

In [0]:
from pyspark.sql import functions as F

def build_user_features(events_df):
    features_df = events_df.groupBy("user_id").agg(
        F.count("*").alias("total_events"),
        F.count(F.when(F.col("event_type") == "purchase", True)).alias("purchase_count"),
        F.count(F.when(F.col("event_type") == "cart", True)).alias("cart_count"),
        F.count(F.when(F.col("event_type") == "view", True)).alias("view_count"),
        F.sum("price").alias("total_spent"),
        F.avg(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("avg_purchase_price"),
        F.min("event_time").alias("first_event_time"),
        F.max("event_time").alias("last_event_time")
    )
    return features_df

In [0]:
#reading Broze->to build Silver based on process date

bronze_df = spark.read.format("delta") \
    .table("workspace.ecommerce.events_delta") \
    .filter(f"date(event_time) = '{process_date}'")

# Build Silver features
silver_df = build_user_features(bronze_df)

In [0]:
#Write to silver
silver_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("workspace.ecommerce.user_features_silver")

In [0]:
# reading from Silver->Gold
silver_df = spark.read.format("delta") \
    .table("workspace.ecommerce.user_features_silver")

In [0]:
from pyspark.sql import functions as F

gold_df = silver_df \
    .withColumn(
        "avg_spend_per_event",
        F.col("total_spent") / F.col("total_events")
    ) \
    .withColumn(
        "is_high_value_user",
        F.when(F.col("total_spent") > 1000, 1).otherwise(0)
    ) \
    .fillna({
        "total_spent": 0,
        "avg_price": 0
    })

In [0]:
from pyspark.sql import functions as F

gold_df = silver_df \
    .withColumn(
        "avg_spend_per_event",
        F.col("total_spent") / F.col("total_events")
    ) \
    .withColumn(
        "is_high_value_user",
        F.when(F.col("total_spent") > 1000, 1).otherwise(0)
    ) \
    .fillna({
        "total_spent": 0,
        "avg_price": 0
    })