In [None]:
#Cell 1: Load Bronze files from Lakehouse

df_raw = spark.read.option("multiline", "true").json("Files/Bronze/")
display(df_raw.limit(10))


In [None]:
#Cell 2: Flatten + Clean Schema
from pyspark.sql.functions import col

df_cleaned = df_raw.select(
    col("user_id").cast("int"),
    col("event_time").cast("timestamp"),
    col("event_type").cast("string"),
    col("device_type").cast("string")  # might be null for older files
).dropna(subset=["user_id", "event_time"])


In [None]:
#Cell 3: Deduplicate Based on Row Content
from pyspark.sql.functions import sha2, concat_ws

df_deduped = df_cleaned.withColumn(
    "dedup_key",
    sha2(concat_ws("||", "user_id", "event_time", "event_type", "device_type"), 256)
).dropDuplicates(["dedup_key"])


In [None]:
#Cell 4: Write to Silver Delta Table (with schema evolution)
df_deduped.write.mode("append") \
    .option("mergeSchema", "true") \
    .format("delta") \
    .saveAsTable("silver_user_events")
