In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

bronze_df = spark.table("bronze_db.transactions_raw")

PIPELINE_NAME = "transactions_pipeline"

In [0]:
control_df = spark.table("silver_db.pipeline_control") \
    .filter(F.col("pipeline_name") == PIPELINE_NAME)

last_processed_date = control_df.select("last_processed_date").collect()[0][0]

print("Last processed ingestion_date:", last_processed_date)

In [0]:
bronze_incremental_df = spark.table("bronze_db.transactions_raw") \
    .filter(F.col("ingestion_date") > F.lit(last_processed_date))

In [0]:
valid_statuses = ["SUCCESS", "FAILED", "PENDING"]

validated_df = bronze_incremental_df.withColumn(
    "is_valid",
    F.when(
        (F.col("transaction_id").isNull()) |
        (F.col("transaction_date").isNull()) |
        (F.col("partner_id").isNull()) |
        (F.col("amount").isNull()) |
        (F.col("amount") <= 0) |
        (~F.col("transaction_status").isin(valid_statuses)),
        F.lit(False)
    ).otherwise(F.lit(True))
)

In [0]:
window_spec = Window.partitionBy(
    "transaction_id", "ingestion_date"
).orderBy(F.col("load_timestamp").desc())

deduped_df = validated_df.withColumn(
    "row_num", F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).drop("row_num")

In [0]:
clean_df = deduped_df.filter(F.col("is_valid") == True).drop("is_valid")

rejected_df = deduped_df.filter(F.col("is_valid") == False) \
    .withColumn(
        "rejection_reason",
        F.lit("Failed mandatory field / value validation")
    ).drop("is_valid")

In [0]:
from delta.tables import DeltaTable

clean_table = DeltaTable.forName(spark, "silver_db.transactions_clean")

clean_table.alias("t").merge(
    clean_df.alias("s"),
    "t.transaction_id = s.transaction_id AND t.ingestion_date = s.ingestion_date"
).whenNotMatchedInsertAll().execute()

In [0]:
rejected_table = DeltaTable.forName(spark, "silver_db.transactions_rejected")

rejected_table.alias("t").merge(
    rejected_df.alias("s"),
    "t.transaction_id = s.transaction_id AND t.ingestion_date = s.ingestion_date"
).whenNotMatchedInsertAll().execute()

In [0]:
max_ingestion_date = bronze_incremental_df \
    .select(F.max("ingestion_date")).collect()[0][0]

spark.sql(f"""
UPDATE silver_db.pipeline_control
SET
    last_processed_date = DATE('{max_ingestion_date}'),
    updated_at = current_timestamp()
WHERE pipeline_name = 'transactions_pipeline'
""")