In [1]:
# Read watermark
watermark = spark.sql("""
SELECT last_processed_date
FROM gold.pipeline_metadata
WHERE pipeline_name = 'fact_appointments_pipeline'
""").collect()[0][0] #row 0, col 0

print("Current watermark:", watermark)


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 3, Finished, Available, Finished, False)

Current watermark: 2025-01-08


In [2]:
# Add sliding windown filter
from pyspark.sql.functions import col, date_sub, lit

df_fact_base = (
    spark.table("silver.appointments_clean")
    .filter(
        col("appointment_date") >= date_sub(lit(watermark), 7)
    )
)



StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 4, Finished, Available, Finished, False)

In [3]:
# Building Incremental Aggregation
from pyspark.sql.functions import col, count, sum, when

df_fact_base = spark.table("silver.appointments_clean")
df_practice = spark.table("gold.dim_practice")
df_date = spark.table("gold.dim_date")

df_incremental = (
    df_fact_base
    .groupBy("practice_code", "appointment_date")
    .agg(
        count("*").alias("total_appointments"),
        sum(when(col("current_slot_status") == "DNA", 1).otherwise(0)).alias("dna_count"),
        sum(when(col("current_slot_status") == "Walked Out", 1).otherwise(0)).alias("walked_out_count"),
        sum(when(col("current_slot_status") == "Left", 1).otherwise(0)).alias("completed_count")
    )
    .withColumn("dna_rate", col("dna_count") / col("total_appointments"))
    .withColumn("walked_out_rate", col("walked_out_count") / col("total_appointments"))
    .join(df_practice, "practice_code")
    .join(df_date, "appointment_date")
    .select(
        "practice_key",
        "practice_code",
        "date_key",
        "total_appointments",
        "dna_count",
        "walked_out_count",
        "completed_count"
    )
)



StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 5, Finished, Available, Finished, False)

In [4]:
# DQ Checks/GRs
# Check 1: Surrogate keys must not be null
assert df_incremental.filter(col("practice_key").isNull()).count() == 0, \
    "Null practice_key detected."

assert df_incremental.filter(col("date_key").isNull()).count() == 0, \
    "Null date_key detected."

# Check 2: Status counts must sum to total
assert df_incremental.filter(
    col("dna_count") + col("walked_out_count") + col("completed_count")
    != col("total_appointments")
).count() == 0, "Status counts do not sum to total_appointments."


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 6, Finished, Available, Finished, False)

In [5]:
# Create temporary view
df_incremental.createOrReplaceTempView("fact_updates")


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 7, Finished, Available, Finished, False)

In [6]:
#DQ Guardrail. Detect "bad appointments"

if df_incremental.filter(col("total_appointments") <= 0).count() > 0:
    raise Exception("Invalid total_appointments detected.")


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 8, Finished, Available, Finished, False)

In [7]:
# Merging

spark.sql("""
MERGE INTO gold.fact_appointments AS target
USING fact_updates AS source
ON target.practice_key = source.practice_key
AND target.date_key = source.date_key

WHEN MATCHED THEN
  UPDATE SET
    target.total_appointments = source.total_appointments,
    target.dna_count = source.dna_count,
    target.walked_out_count = source.walked_out_count,
    target.completed_count = source.completed_count

WHEN NOT MATCHED THEN
  INSERT (
    practice_key,
    practice_code,
    date_key,
    total_appointments,
    dna_count,
    walked_out_count,
    completed_count
  )
  VALUES (
    source.practice_key,
    source.practice_code,
    source.date_key,
    source.total_appointments,
    source.dna_count,
    source.walked_out_count,
    source.completed_count
  )
""")


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 9, Finished, Available, Finished, False)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [8]:
### Update metadata so the pipeline knows where it stopped

if df_fact_base.count() > 0:
    max_date = df_fact_base.agg({"appointment_date": "max"}).collect()[0][0]
    spark.sql(f"""
        UPDATE gold.pipeline_metadata
        SET last_processed_date = '{max_date}'
        WHERE pipeline_name = 'fact_appointments_pipeline'
    """)


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 10, Finished, Available, Finished, False)

In [10]:
#proves idempotent behaviour (positive thing) if you twice and same number of rows
spark.table("gold.fact_appointments").count()


StatementMeta(, c8ccea0f-fcff-4e64-ba37-2bd3222dface, 12, Finished, Available, Finished, False)

6