In [0]:
from pyspark.sql.functions import *
from delta.tables import *

In [0]:
df = spark.read.table("inventory_project.bronze.wms_shipments_raw")
display(df)

In [0]:
df_std = df.select(
    trim(col("shipment_id")).alias("shipment_id"),
    trim(col("order_id")).alias("order_id"),
    trim(col("product_id")).alias("product_id"),
    trim(col("lot_id")).alias("lot_id"),
    trim(col("serial_id")).alias("serial_id"),
    trim(col("ship_date")).alias("ship_date"),
    col("quantity").cast("int").alias("quantity"),
    trim(col("carrier")).alias("carrier"),
    upper(trim(col("status"))).alias("status"),
    upper(trim(col("cdc_op"))).alias("cdc_op")
)

df_std = df_std.withColumn("ship_date", to_date(col("ship_date"), "yyyy-MM-dd"))
df_std = df_std.withColumn("cdc_op",
        when(col("cdc_op").contains("U"),"U")\
        .when(col("cdc_op").contains("I"),"I")\
        .when(col("cdc_op").contains("D"),"D")\
        .otherwise("U").alias("cdc_op")
)

In [0]:
# Step 5: Validation rules
valid_condition = (
    col("shipment_id").isNotNull() &
    col("order_id").isNotNull() &
    col("product_id").isNotNull() &
    col("ship_date").isNotNull() &
    (col("quantity") > 0) &
    col("cdc_op").isin("I", "U", "D")
)

# Default missing status â†’ UNKNOWN
df_std = df_std.withColumn(
    "status",
    when(col("status").isNull() | (col("status") == ""), "UNKNOWN").otherwise(col("status"))
)

df_valid = df_std.filter(valid_condition).dropDuplicates(["shipment_id", "cdc_op"])
df_invalid = df_std.filter(~valid_condition)

In [0]:
# Step 6: Merge valid rows into Silver (CDC idempotent logic)
if spark.catalog.tableExists("inventory_project.silver.wms_shipment"):
    delta_table = DeltaTable.forName(spark, "inventory_project.silver.wms_shipment")
    (
        delta_table.alias("t")
        .merge(
            df_valid.alias("s"),
            "t.shipment_id = s.shipment_id"
        )
        .whenMatchedUpdate(
            condition="s.cdc_op = 'U'",
            set={
                "order_id": "s.order_id",
                "product_id": "s.product_id",
                "lot_id": "s.lot_id",
                "serial_id": "s.serial_id",
                "ship_date": "s.ship_date",
                "quantity": "s.quantity",
                "carrier": "s.carrier",
                "status": "s.status",
                "cdc_op": "s.cdc_op"
            }
        )
        .whenMatchedDelete(condition="s.cdc_op = 'D'")
        .whenNotMatchedInsertAll(condition="s.cdc_op = 'I'")
        .execute()
    )
else:
    df_valid.write.format("delta").mode("overwrite").saveAsTable("inventory_project.silver.wms_shipment")

# Step 7: Write invalid rows to error table
df_invalid.write.format("csv").mode("overwrite").save("/Volumes/inventory_project/silver/quarantine_layer/wms_shipment")
dbutils.notebook.exit("SUCCESS")