In [0]:
# ============================================================
# PROJECT: Late-arriving Events Pipeline (Bronze → Silver → Gold)
# PLATFORM: Databricks Free Edition
# GOAL:
#   - Handle late-arriving data
#   - Avoid full recomputation of Gold
#   - Be interview-explainable
# ============================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ------------------------------------------------------------
# STEP 0: Create dummy incoming event data (simulating ingestion)
# ------------------------------------------------------------

data = [
    (1, "u1", "2024-03-01 10:00:00", "2024-03-01 10:01:00", 100),
    (2, "u1", "2024-03-01 11:00:00", "2024-03-01 11:05:00", 50),
    (3, "u2", "2024-03-02 09:00:00", "2024-03-02 09:02:00", 70),

    # Late-arriving event (event_time old, ingest_time new)
    (4, "u1", "2024-03-01 09:30:00", "2024-03-05 12:00:00", 30)
]

columns = ["event_id", "user_id", "event_time", "ingest_time", "amount"]

raw_df = spark.createDataFrame(data, columns) \
    .withColumn("event_time", F.to_timestamp("event_time")) \
    .withColumn("ingest_time", F.to_timestamp("ingest_time")) \
    .withColumn("event_date", F.to_date("event_time"))

raw_df.show()

# ============================================================
# BRONZE LAYER
# - Raw append-only data
# - No transformations
# ============================================================

raw_df.write \
    .mode("overwrite") \
    .saveAsTable("bronze_events")

# ------------------------------------------------------------
# Silver Layer
# - Deduplicate on event_id
# - Keep latest version using ingest_time
# ------------------------------------------------------------

bronze_df = spark.table("bronze_events")

dedup_window = Window.partitionBy("event_id").orderBy(F.col("ingest_time").desc())

silver_df = (
    bronze_df
    .withColumn("rnk", F.row_number().over(dedup_window))
    .filter(F.col("rnk") == 1)
    .drop("rnk")
)

silver_df.write \
    .mode("overwrite") \
    .saveAsTable("silver_events")

# ============================================================
# GOLD LAYER (SELECTIVE REBUILD)
# ============================================================

# ------------------------------------------------------------
# STEP 1: Identify affected dates
# (dates impacted by new or late-arriving data)
# ------------------------------------------------------------

affected_dates = (
    silver_df
    .select("event_date")
    .distinct()
)

affected_dates.show()

# ------------------------------------------------------------
# STEP 2: Recompute Gold ONLY for affected dates
# ------------------------------------------------------------

silver_filtered = silver_df.join(
    affected_dates,
    on="event_date",
    how="inner"
)

gold_updates = (
    silver_filtered
    .withColumn(
        "latency_min",
        F.timestamp_diff("minute", "event_time", "ingest_time")
    )
    .groupBy("event_date", "user_id")
    .agg(
        F.count("*").alias("total_events"),
        F.sum("amount").alias("total_amount"),
        F.sum(
            F.when(F.col("latency_min") > 10, 1).otherwise(0)
        ).alias("late_events")
    )
    .withColumn(
        "late_pct",
        F.round(F.col("late_events") * 100.0 / F.col("total_events"), 2)
    )
)

gold_updates.show()

# ------------------------------------------------------------
# STEP 3: Overwrite ONLY affected partitions in Gold
# (Databricks CE-friendly approach)
# ------------------------------------------------------------

gold_updates.write \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .saveAsTable("gold_event_metrics")

# ============================================================
# FINAL RESULT
# ============================================================

spark.table("gold_event_metrics").orderBy("event_date", "user_id").show()

+--------+-------+-------------------+-------------------+------+----------+
|event_id|user_id|         event_time|        ingest_time|amount|event_date|
+--------+-------+-------------------+-------------------+------+----------+
|       1|     u1|2024-03-01 10:00:00|2024-03-01 10:01:00|   100|2024-03-01|
|       2|     u1|2024-03-01 11:00:00|2024-03-01 11:05:00|    50|2024-03-01|
|       3|     u2|2024-03-02 09:00:00|2024-03-02 09:02:00|    70|2024-03-02|
|       4|     u1|2024-03-01 09:30:00|2024-03-05 12:00:00|    30|2024-03-01|
+--------+-------+-------------------+-------------------+------+----------+

+----------+
|event_date|
+----------+
|2024-03-01|
|2024-03-02|
+----------+

+----------+-------+------------+------------+-----------+--------+
|event_date|user_id|total_events|total_amount|late_events|late_pct|
+----------+-------+------------+------------+-----------+--------+
|2024-03-01|     u1|           3|         180|          1|   33.33|
|2024-03-02|     u2|           1

In [0]:
spark.sql("DROP TABLE IF EXISTS bronze_events")
spark.sql("DROP TABLE IF EXISTS silver_events")
spark.sql("DROP TABLE IF EXISTS gold_event_metrics")

DataFrame[]