In [0]:
# First batch
data1 = """{"user_id": 1, "event": "login", "ts": "2025-06-30T10:00:00Z"}
{"user_id": 2, "event": "logout", "ts": "2025-06-30T10:05:00Z"}"""

# Second batch adds a new column `device`
data2 = """{"user_id": 3, "event": "purchase", "ts": "2025-06-30T10:10:00Z", "device": "mobile"}
{"user_id": 4, "event": "login", "ts": "2025-06-30T10:15:00Z", "device": "desktop"}"""

dbutils.fs.put("dbfs:/mnt/raw/events/batch1.json", data1, overwrite=True)
dbutils.fs.put("dbfs:/mnt/raw/events/batch2.json", data2, overwrite=True)


In [0]:
from pyspark.sql.functions import *

bronze_input = "dbfs:/mnt/raw/events"
bronze_output = "dbfs:/mnt/bronze/events"
schema_loc = "dbfs:/mnt/schemas/events"

df_bronze = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", schema_loc)
    .option("mergeSchema", "true")
    .load(bronze_input)
    .withColumn("ingestion_time", current_timestamp())
)

df_bronze.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{bronze_output}/_checkpoint") \
    .outputMode("append") \
    .start(bronze_output)


In [0]:
bronze_df = spark.read.format("delta").load(bronze_output)

silver_df = (
    bronze_df.filter("event is not null and user_id is not null")
    .withColumn("event_date", to_date("ts"))
)

silver_output = "dbfs:/mnt/silver/events_clean"
silver_df.write.format("delta").mode("overwrite").save(silver_output)


In [0]:
silver_df = spark.read.format("delta").load(silver_output)

gold_df = (
    silver_df.groupBy("event_date", "event")
    .agg(countDistinct("user_id").alias("unique_users"))
)

gold_output = "dbfs:/mnt/gold/event_summary"
gold_df.write.format("delta").mode("overwrite").save(gold_output)


In [0]:
display(spark.read.format("delta").load(gold_output))
