In [0]:
# Reset all tables for a clean demo run

spark.sql("DROP TABLE IF EXISTS bronze_events")
spark.sql("DROP TABLE IF EXISTS silver_events")
spark.sql("DROP TABLE IF EXISTS gold_events")
spark.sql("DROP TABLE IF EXISTS quarantine_schema")
spark.sql("DROP TABLE IF EXISTS quarantine_late")

DataFrame[]

In [0]:
#Load Batch 1 (Initial Schema)
from pyspark.sql import Row
from pyspark.sql import functions as F

batch1 = [
    Row(1, 101, "2024-03-01 10:00:00", "2024-03-01 10:05:00", 120.0),
    Row(2, 102, "2024-03-01 11:00:00", "2024-03-01 11:08:00", 80.0),
    Row(3, 103, "2024-03-01 12:00:00", "2024-03-01 12:05:00", 90.0),
]

batch1_df = spark.createDataFrame(
    batch1,
    ["event_id", "user_id", "event_time", "ingest_time", "amount"]
)

batch1_df.show()

+--------+-------+-------------------+-------------------+------+
|event_id|user_id|         event_time|        ingest_time|amount|
+--------+-------+-------------------+-------------------+------+
|       1|    101|2024-03-01 10:00:00|2024-03-01 10:05:00| 120.0|
|       2|    102|2024-03-01 11:00:00|2024-03-01 11:08:00|  80.0|
|       3|    103|2024-03-01 12:00:00|2024-03-01 12:05:00|  90.0|
+--------+-------+-------------------+-------------------+------+



In [0]:
# Write Batch 1 to Bronze & Silver
batch1_df.write.format("delta").mode("overwrite").saveAsTable("bronze_events")
batch1_df.write.format("delta").mode("overwrite").saveAsTable("silver_events")

In [0]:
#Load Batch 2 (Schema Drift + Late Records)
batch2 = [
    (2, 102, "2024-03-01 11:00:00", "2024-03-01 12:00:00", 85.0, "mobile"),  # update + new column
    (4, 104, "2024-02-20 09:00:00", "2024-03-05 08:00:00", 100.0, "web"),   # late record
]

batch2_df = spark.createDataFrame(
    batch2,
    ["event_id", "user_id", "event_time", "ingest_time", "amount", "source"]
)

batch2_df.show()

+--------+-------+-------------------+-------------------+------+------+
|event_id|user_id|         event_time|        ingest_time|amount|source|
+--------+-------+-------------------+-------------------+------+------+
|       2|    102|2024-03-01 11:00:00|2024-03-01 12:00:00|  85.0|mobile|
|       4|    104|2024-02-20 09:00:00|2024-03-05 08:00:00| 100.0|   web|
+--------+-------+-------------------+-------------------+------+------+



In [0]:
# Schema Drift Check
bronze_cols = set(spark.table("bronze_events").columns)
incoming_cols = set(batch2_df.columns)

missing_cols = bronze_cols - incoming_cols   # breaking drift
extra_cols   = incoming_cols - bronze_cols   # safe, additive drift

if missing_cols:
    print("❌ Breaking schema drift detected. Sending batch to quarantine.")
    batch2_df.write.mode("append").saveAsTable("quarantine_schema")
else:
    print("✔ Additive drift only (safe). Proceeding with processing.")

✔ Additive drift only (safe). Proceeding with processing.


In [0]:
#Late Arrival Check (Watermark-Style)

# Simulated watermark cutoff (instead of current_date for reproducibility)
cutoff_date = F.to_date(F.lit("2024-03-06"))
late_threshold_days = 5

batch2_processed = (
    batch2_df
    .withColumn("event_dt", F.to_date(F.col("event_time")))
    .withColumn("too_late", F.datediff(cutoff_date, F.col("event_dt")) > late_threshold_days)
)

very_late = batch2_processed.filter("too_late = true")
valid_updates = batch2_processed.filter("too_late = false").drop("event_dt", "too_late")

if very_late.count() > 0:
    very_late.write.mode("append").saveAsTable("quarantine_late")
    print("⚠ Some late events quarantined.")
else:
    print("✔ No excessively late events detected.")

⚠ Some late events quarantined.


In [0]:
# Append Valid Data into Bronze (with Schema Merge)
valid_updates.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("bronze_events")

In [0]:
# Merge Updates into Silver (Upsert), Customised for Databricks Free editon


from delta.tables import DeltaTable
bronze_cols = spark.table("bronze_events").columns
silver_cols = spark.table("silver_events").columns

new_cols = [c for c in bronze_cols if c not in silver_cols]

for col_name in new_cols:
    spark.sql(f"ALTER TABLE silver_events ADD COLUMN ({col_name} STRING)")

silver = DeltaTable.forName(spark, "silver_events")

silver.alias("s").merge(
    valid_updates.alias("b"),
    "s.event_id = b.event_id"
).whenMatchedUpdate(
    condition="b.ingest_time > s.ingest_time",
    set={
        c: f"b.{c}" for c in bronze_cols
    }
).whenNotMatchedInsertAll().execute()


print("Silver updated successfully via MERGE.")

Silver updated successfully via MERGE.


In [0]:
"""
===========================================================
OPTIONAL — ENTERPRISE DATABRICKS ONLY
-----------------------------------------------------------
This block demonstrates advanced Delta Lake features:

  • spark.databricks.delta.schema.autoMerge.enabled = true
  • whenMatchedUpdateAll() for full schema evolution
  • Automatic column propagation into Silver

Notes:
- NOT supported in Databricks Community Edition
- Running these commands in CE will raise:
    CONFIG_NOT_AVAILABLE or FEATURE_NOT_SUPPORTED errors
- Uncomment the whole block ONLY on paid Databricks clusters
===========================================================

# Enable auto-merge of evolving schemas
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Load Silver as DeltaTable
silver_tbl = DeltaTable.forName(spark, "silver_events")

(
    silver_tbl.alias("s")
    .merge(
        valid_updates.alias("b"),
        "s.event_id = b.event_id"
    )
    # Automatically update all columns (including new schema)
    .whenMatchedUpdateAll(
        condition="b.ingest_time > s.ingest_time"
    )
    # Insert when new key found
    .whenNotMatchedInsertAll()
    .execute()
)
"""



In [0]:
# Build Gold Aggregates
silver_df = spark.table("silver_events")

gold_df = (
    silver_df
    .withColumn(
        "latency_min",
        (
            F.unix_timestamp(F.col("ingest_time")) -
            F.unix_timestamp(F.col("event_time"))
        ) / 60
    )
    .groupBy("user_id")
    .agg(
        F.count("*").alias("total_events"),
        F.sum("amount").alias("total_amount"),
        F.sum(F.when(F.col("latency_min") > 10, 1).otherwise(0)).alias("late_events")
    )
)

gold_df.write.mode("overwrite").saveAsTable("gold_events")
gold_df.show()

+-------+------------+------------+-----------+
|user_id|total_events|total_amount|late_events|
+-------+------------+------------+-----------+
|    101|           1|       120.0|          0|
|    103|           1|        90.0|          0|
|    102|           1|        85.0|          1|
+-------+------------+------------+-----------+



In [0]:
%sql
select * from quarantine_late;

event_id,user_id,event_time,ingest_time,amount,source,event_dt,too_late
4,104,2024-02-20 09:00:00,2024-03-05 08:00:00,100.0,web,2024-02-20,True


In [0]:
%sql
select * from silver_events;

event_id,user_id,event_time,ingest_time,amount,source
1,101,2024-03-01 10:00:00,2024-03-01 10:05:00,120.0,
3,103,2024-03-01 12:00:00,2024-03-01 12:05:00,90.0,
2,102,2024-03-01 11:00:00,2024-03-01 12:00:00,85.0,mobile
