In [0]:
# Load your managed Delta table from Day 1 (Bronze layer)
events = spark.table("workspace.ecommerce.events_delta")
#Check schema
events.printSchema()
#Check row count
print("Total rows in Bronze table:", events.count())

In [0]:
total_values = events.count()
total_unique_values = events.distinct().count()
print(f"Total Value: {total_values} Total Unique Values: {total_unique_values}")

In [0]:
events_nodup = events.dropDuplicates() # drop duplicate values

In [0]:
events_nodup.count()

In [0]:
from pyspark.sql.functions import col, to_timestamp
# Drop invalid users
# Cast types
# Standardize timestamps
clean_events = (
    events_nodup
    .filter(col("user_id").isNotNull())
    .withColumn("event_time", to_timestamp("event_time"))
    .withColumn("price",col("price").cast("double"))
    .filter(col("event_time").isNotNull())
)

In [0]:
# build user-level features
from pyspark.sql.functions import count, sum, max, when, avg, round

user_features_df = (
    clean_events
    .groupBy("user_id")
    .agg(
        count("*").alias("total_events"),
        count(when(col("event_type") == "purchase", True)).alias("total_purchases"),
        round(sum(when(col("event_type") == "purchase", col("price")).otherwise(0)),2).alias("total_spent"),
        round(avg(when(col("event_type") == "purchase", col("price")).otherwise(0)),2).alias("avg_spent"),
        max("event_time").alias("last_activity_time")
    )
)

In [0]:
display(user_features_df)

In [0]:
user_features_df.write.mode("overwrite").saveAsTable("workspace.ecommerce.silver_user_features")