In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import uuid
from datetime import datetime

# -----------------------
# Config
# -----------------------
SILVER_DB = "silver"
QUARANTINE_TABLE = f"{SILVER_DB}.quarantine"
VALID_FERT_TYPES = ["Heavy Metal", "Organic", "Inorganic", "Dry", "Toxic"]
JOB_RUN_ID = str(uuid.uuid4())
INGEST_TS = F.current_timestamp()

# Columns to drop after renaming
COLUMN_RENAMES = {
    "consumerid": "consumer_id",
    "purchaseid": "purchase_id",
    "fertilizerid": "fertilizer_id"
}

spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_DB}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {QUARANTINE_TABLE} (
    source_table STRING,
    pk_value STRING,
    fk_values STRING,
    full_row STRING,
    failure_reason STRING,
    ingest_timestamp TIMESTAMP,
    job_run_id STRING,
    occurrence_count INT
) USING DELTA
""")

# -----------------------
# Helpers
# -----------------------
def df_has_rows(df):
    return bool(df.take(1))

def write_to_quarantine(df, source_table, pk_col, fk_expr=None, failure_reason="invalid"):
    if not df_has_rows(df):
        return
    if fk_expr is None:
        df = df.withColumn("fk_values", F.lit(None).cast("string"))
    else:
        df = df.withColumn("fk_values", fk_expr.cast("string"))

    full_row_col = F.to_json(F.struct([F.col(c) for c in df.columns])).alias("full_row")
    df_to_write = df.select(
        F.lit(source_table).alias("source_table"),
        F.col(pk_col).cast("string").alias("pk_value"),
        F.col("fk_values"),
        full_row_col,
        F.lit(failure_reason).alias("failure_reason")
    ).withColumn("ingest_timestamp", INGEST_TS).withColumn("job_run_id", F.lit(JOB_RUN_ID))

    df_to_write.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(QUARANTINE_TABLE)

def dedupe(df, pk_cols, orderby=None):
    if orderby:
        window_spec = Window.partitionBy(pk_cols).orderBy(orderby)
        return df.withColumn("_rn", F.row_number().over(window_spec)).filter(F.col("_rn") == 1).drop("_rn")
    else:
        return df.dropDuplicates(pk_cols)

def upsert_delta(target_table: str, df, key_cols):
    if not df_has_rows(df):
        return

    if not spark.catalog.tableExists(target_table):
        df.write.format("delta").option("mergeSchema", "true").saveAsTable(target_table)
        return

    # Align source DF to target schema
    target_schema = spark.table(target_table).schema
    target_cols = [f.name for f in target_schema.fields]
    df_aligned = df.select(
        [F.col(c).alias(c) if c in df.columns else F.lit(None).alias(c) for c in target_cols]
    )

    merge_cond = " AND ".join([f"t.{k} = s.{k}" for k in key_cols if k in df_aligned.columns])

    DeltaTable.forName(spark, target_table).alias("t")\
        .merge(df_aligned.alias("s"), merge_cond)\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

def drop_replaced_columns(df, rename_map):
    old_cols = [old for old,new in rename_map.items() if old in df.columns and old != new]
    if old_cols:
        df = df.drop(*old_cols)
    return df

# -----------------------
# Load bronze tables
# -----------------------
bronze_tables = {
    "consumer": "bronze.consumer",
    "purchase": "bronze.purchase",
    "avocado": "bronze.avocado",
    "fertilizer": "bronze.fertilizer"
}
bronze_df = {k: spark.table(v) for k,v in bronze_tables.items()}

# -----------------------
# Transformations
# -----------------------

## Consumer
consumer = bronze_df["consumer"].withColumn("consumer_id", F.col("consumerid").cast("long"))\
                                 .withColumn("sex", F.initcap(F.trim(F.col("Sex"))))\
                                 .withColumn("age", F.col("age").cast("int"))

consumer_valid = consumer.filter(F.col("consumer_id").isNotNull() & F.col("age").between(0,130))
consumer_valid = drop_replaced_columns(consumer_valid, COLUMN_RENAMES)
consumer_malformed = consumer.filter(F.col("consumer_id").isNull() | ~F.col("age").between(0,130))

upsert_delta(f"{SILVER_DB}.validated_consumer", consumer_valid, ["consumer_id"])
write_to_quarantine(consumer_malformed, "consumer", "consumer_id", failure_reason="invalid consumer")

## Purchase
purchase = bronze_df["purchase"].withColumn("purchase_id", F.col("purchaseid").cast("long"))\
                                 .withColumn("consumer_id", F.col("consumerid").cast("long"))\
                                 .withColumn("graphed_date", F.try_to_date(F.substring("graphed_date",1,10), "yyyy-MM-dd"))\
                                 .withColumn("avocado_bunch_id", F.col("avocado_bunch_id").cast("int"))\
                                 .withColumn("price_index", F.col("price_index").cast("int"))\
                                 .withColumn("reporting_year", F.col("reporting_year").cast("int"))\
                                 .withColumn("grocery_store_id", F.col("grocery_store_id").cast("int"))

purchase_valid = purchase.filter(F.col("purchase_id").isNotNull() & F.col("consumer_id").isNotNull())
window_spec = Window.partitionBy("purchase_id").orderBy(F.col("graphed_date").desc_nulls_last())
purchase_valid_deduped = purchase_valid.withColumn("_rn", F.row_number().over(window_spec))\
                                      .filter(F.col("_rn") == 1).drop("_rn")
purchase_valid_deduped = drop_replaced_columns(purchase_valid_deduped, COLUMN_RENAMES)
purchase_malformed = purchase.filter(F.col("purchase_id").isNull() | F.col("consumer_id").isNull())

upsert_delta(f"{SILVER_DB}.validated_purchase", purchase_valid_deduped, ["purchase_id"])
write_to_quarantine(purchase_malformed, "purchase", "purchase_id",
                    F.concat(F.lit("consumer_id="), F.col("consumerid").cast("string")),
                    "invalid purchase")

## Avocado
avocado = bronze_df["avocado"].withColumn("purchase_id", F.col("purchaseid").cast("long"))\
                               .withColumn("consumer_id", F.col("consumerid").cast("long"))\
                               .withColumn("born_date", F.try_to_date(F.substring("born_date",1,10), "yyyy-MM-dd"))\
                               .withColumn("picked_date", F.try_to_date(F.substring("picked_date",1,10), "yyyy-MM-dd"))\
                               .withColumn("sold_date", F.try_to_date(F.substring("sold_date",1,10), "yyyy-MM-dd"))\
                               .withColumn("avocado_ripe_index", F.col("ripe_index_when_picked").cast("int"))\
                               .withColumn("avocado_bunch_id", F.col("avocado_bunch_id").cast("int"))

avocado_valid = avocado.filter(
    F.col("purchase_id").isNotNull() & F.col("consumer_id").isNotNull() &
    (F.col("picked_date") >= F.col("born_date")) &
    (F.col("sold_date") >= F.col("picked_date"))
)
avocado_valid = drop_replaced_columns(avocado_valid, COLUMN_RENAMES)

avocado_malformed = avocado.filter(
    F.col("purchase_id").isNull() | F.col("consumer_id").isNull() |
    (F.col("picked_date") < F.col("born_date")) |
    (F.col("sold_date") < F.col("picked_date"))
)

upsert_delta(f"{SILVER_DB}.validated_avocado", avocado_valid, ["purchase_id"])
write_to_quarantine(avocado_malformed, "avocado", "purchase_id",
                    F.concat(F.lit("consumer_id="), F.col("consumerid").cast("string")),
                    "invalid avocado dates or missing identifiers")

## Fertilizer
fert = bronze_df["fertilizer"].withColumn("purchase_id", F.col("purchaseid").cast("long"))\
                               .withColumn("consumer_id", F.col("consumerid").cast("long"))\
                               .withColumn("fertilizer_id", F.col("fertilizerid").cast("long"))\
                               .withColumn("fertilizer_type", F.initcap(F.trim(F.col("type"))))\
                               .withColumn("mg", F.col("mg"))

fert_valid = fert.filter(F.col("fertilizer_id").isNotNull() &
                         F.col("purchase_id").isNotNull() &
                         F.col("fertilizer_type").isin(*VALID_FERT_TYPES))\
                 .dropDuplicates(["fertilizer_id"])
fert_valid = drop_replaced_columns(fert_valid, COLUMN_RENAMES)

fert_malformed = fert.filter(F.col("fertilizer_id").isNull() |
                             F.col("purchase_id").isNull() |
                             ~F.col("fertilizer_type").isin(*VALID_FERT_TYPES))

upsert_delta(f"{SILVER_DB}.validated_fertilizer", fert_valid, ["fertilizer_id"])
write_to_quarantine(fert_malformed, "fertilizer", "fertilizer_id",
                    F.concat(F.lit("purchase_id="), F.col("purchaseid").cast("string"),
                             F.lit(",consumer_id="), F.col("consumerid").cast("string")),
                    "invalid fertilizer type or missing ids")

# -----------------------
# Referential Integrity
# -----------------------
if spark.catalog.tableExists(f"{SILVER_DB}.validated_purchase") and spark.catalog.tableExists(f"{SILVER_DB}.validated_consumer"):
    val_purchase = spark.table(f"{SILVER_DB}.validated_purchase")
    val_consumer = spark.table(f"{SILVER_DB}.validated_consumer").select("consumer_id").distinct()
    missing_consumers = val_purchase.join(val_consumer, on="consumer_id", how="left_anti")
    write_to_quarantine(missing_consumers, "purchase", "purchase_id",
                        F.concat(F.lit("consumer_id="), F.col("consumer_id").cast("string")),
                        "referential integrity: missing consumer")

# -----------------------
# Aggregate Quarantine
# -----------------------
quarantine_agg_df = (spark.table(QUARANTINE_TABLE)
                     .groupBy("full_row","failure_reason")
                     .agg(F.count("*").alias("occurrence_count")))

quarantine_agg_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(QUARANTINE_TABLE)

print("Silver transformation complete.")
print(f"job_run_id = {JOB_RUN_ID}; time = {datetime.utcnow().isoformat()}Z")
