In [0]:
# Import necessary PySpark SQL functions for DataFrame transformations and column operations
from pyspark.sql.functions import col, when, lit, current_timestamp, trim, sha2, concat_ws, row_number

# Import data types for casting columns
from pyspark.sql.types import DoubleType, IntegerType

# Import window specification for deduplication logic
from pyspark.sql.window import Window

# Import DeltaTable for merge operations with Delta Lake
from delta.tables import DeltaTable

In [0]:
%run ../utils/config

In [0]:
# Define table names for each layer in the pipeline
bronze_table_name = "{}.{}".format(raw_uk_schema,raw_products_table)           # Raw product data (Bronze layer)
silver_table_name = "{}.{}".format(enriched_uk_schema,cleaned_products_table)  # Cleaned and enriched product data (Silver layer)
quarantine_table_name = "{}.{}".format(data_quality_uk_schema,data_quality_product_table)     # Invalid or quarantined product records

In [0]:
# Load today's product records from the Bronze table, filtering by creation date
products_bronze_df = spark.table(bronze_table_name).filter(col("created_at").cast("date") == current_timestamp().cast("date"))

In [0]:
# Define a window specification to partition by product_id and order by created_at descending
window_spec = Window.partitionBy("product_id").orderBy(col("created_at").desc())

# Add a row number to each record within the partition to identify the latest record
deduped_df = products_bronze_df.withColumn("row_num", row_number().over(window_spec))

# Filter to keep only the latest record for each product_id (row_num == 1)
products_bronze_df = deduped_df.filter("row_num == 1").drop("row_num")

# Drop unnecessary columns after deduplication
products_bronze_df = products_bronze_df.drop("system_of_record", "created_at")

In [0]:
# Transform deduped Bronze records for Silver layer processing
products_silver_df = (
    products_bronze_df
    # Cast price to DoubleType for numeric operations
    .withColumn("price", col("price").cast(DoubleType()))
    # Cast stock_quantity to IntegerType for consistency
    .withColumn("stock_quantity", col("stock_quantity").cast(IntegerType()))
    # Add processing timestamp for audit and SCD2 logic
    .withColumn("_processing_timestamp", current_timestamp())
    # Rename file_path to _source_file_path for lineage tracking
    .withColumnRenamed("file_path", "_source_file_path")
    # Initialize error message column for validation results
    .withColumn("_error_message", lit(None).cast("string"))
)

In [0]:
# Clean and standardize product_name and category fields
products_silver_df = (
    products_silver_df
    # Set product_name to None if blank, otherwise trim whitespace
    .withColumn(
        "product_name",
        when(
            trim(col("product_name")) == "",
            lit(None)
        ).otherwise(trim(col("product_name")))
    )
    # Set category to None if null or blank, otherwise trim whitespace
    .withColumn(
        "category",
        when(
            col("category").isNull() | (trim(col("category")) == ""),
            lit(None)
        ).otherwise(trim(col("category")))
    )
)

In [0]:
# Define validation rules as tuples for clarity
# Each tuple contains a condition and the corresponding error message
validation_rules = [
    (col("price").isNull() | (col("price") <= 0), "Price is missing or not a positive number."),
    (col("product_id").isNull(), "Product ID is missing."),
    (col("stock_quantity").isNull(), "Stock quantity is missing."),
    (col("product_name").isNull(), "Product name is missing or blank.")
]

# Build error message chain so only the first validation error is captured
error_message_chain = lit(None)
for condition, error in reversed(validation_rules):
    error_message_chain = when(condition, error).otherwise(error_message_chain)

# Add _error_message column to DataFrame based on validation rules
products_silver_df = products_silver_df.withColumn(
    "_error_message",
    error_message_chain
)

In [0]:
# Split records into valid and invalid sets based on validation results
valid_records_df = products_silver_df.filter(col("_error_message").isNull()).drop("_error_message")  # Valid records: no error message
invalid_records_df = products_silver_df.filter(col("_error_message").isNotNull())  # Invalid records: error message present

In [0]:
# Generate a unique hash key for each product based on product_id
valid_records_df = valid_records_df.withColumn("product_hash_key", sha2(concat_ws(
            "^",col("product_id")
            ),
            256
    )
)

# Generate a row-level hash to detect changes in price or stock_quantity
valid_records_df = valid_records_df.withColumn("row_hash",
    sha2(
        concat_ws(
            "^",
            col("price"),
            col("stock_quantity")
        ),
        256
    )
)

In [0]:
# Load the Silver Delta table for SCD2 merge
silver_delta_table = DeltaTable.forName(spark, silver_table_name.strip())

# Define merge condition on product_hash_key
merge_condition = "target.product_hash_key = source.product_hash_key"

(
    silver_delta_table.alias("target")
    # Merge valid records into Silver table using SCD2 logic
    .merge(
        valid_records_df.alias("source"),
        merge_condition
    )
    # When matched and row_hash differs, mark previous record as not current and set end_date
    .whenMatchedUpdate(
        condition="target.row_hash <> source.row_hash",
        set={
            "is_current": "false",
            "end_date": "CAST(source._processing_timestamp AS DATE)"
        }
    )
    # When not matched, insert new record as current with start_date
    .whenNotMatchedInsert(
        values={
            "product_id": "source.product_id",
            "product_name": "source.product_name",
            "category": "source.category",
            "price": "source.price",
            "stock_quantity": "source.stock_quantity",
            "product_hash_key": "source.product_hash_key",
            "row_hash": "source.row_hash",
            "source_system": "source.source_system",
            "_source_file_path": "source._source_file_path",
            "_processing_timestamp":  "CAST(source._processing_timestamp AS DATE)",
            "is_current": "true",
            "start_date":  "CAST(source._processing_timestamp AS DATE)",
            "end_date": "null"
        }
    )
    .execute()
)


In [0]:
# Write invalid product records to the Quarantine Delta table for further review
invalid_records_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable(quarantine_table_name)