In [0]:
events_df = spark.read.table("workspace.ecommerce.ecommerce_delta_table")

events_df.show(5)

In [0]:
from pyspark.sql import functions as F
bronze_table = "workspace.ecommerce.ecommerce_delta_table"
events_df = spark.read.table(bronze_table)

In [0]:
events_df = events_df.filter(
    (F.col("user_id").isNotNull()) &
    (F.col("event_time").isNotNull()) &
    (F.col("price").isNotNull()) &
    (F.col("price") >= 0)
)

In [0]:
features_df = events_df.groupBy("user_id").agg(
    F.count("*").alias("total_events"),
    F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("total_purchases"),
    F.sum("price").alias("total_spent"),
    F.avg("price").alias("avg_price"),
    F.countDistinct("product_id").alias("unique_products"),
    F.max("event_time").alias("last_activity")
)

In [0]:
features_df = features_df.fillna({
    "total_spent": 0,
    "avg_price": 0,
    "total_purchases": 0
})

In [0]:
features_df = features_df.dropDuplicates(["user_id"])

In [0]:
features_df.select("user_id").count()
features_df.select("user_id").distinct().count()

In [0]:
spark.sql("""
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.silver_volume
""")

In [0]:
silver_path = "/Volumes/workspace/ecommerce/silver_volume/user_features"

features_df.repartition(200) \
    .write.format("delta") \
    .mode("overwrite") \
    .save(silver_path)

In [0]:
spark.read.format("delta").load(silver_path) \
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.ecommerce.user_features_silver")

In [0]:
spark.sql("""
OPTIMIZE workspace.ecommerce.user_features_silver
""")

In [0]:
spark.sql("""
OPTIMIZE workspace.ecommerce.user_features_silver
ZORDER BY (user_id)
""")

In [0]:
spark.sql("""
SELECT user_id, COUNT(*)
FROM workspace.ecommerce.user_features_silver
GROUP BY user_id
HAVING COUNT(*) > 1
""").show()

In [0]:
silver_table = "workspace.ecommerce.user_features_silver"

spark.sql(f"""
SELECT
SUM(CASE WHEN total_spent IS NULL THEN 1 ELSE 0 END) AS null_spent,
SUM(CASE WHEN avg_price IS NULL THEN 1 ELSE 0 END) AS null_avg
FROM {silver_table}
""").show()

In [0]:
spark.sql("""
SELECT * 
FROM workspace.ecommerce.user_features_silver
LIMIT 10
""").display()