In [0]:
import dlt
from pyspark.sql import functions as F

SOURCE_PATH = "/Volumes/workspace/quiz-teja/quiz-data/material_master_1k.csv"

@dlt.table(
    name="material_master_bronze",
    comment="Bronze layer: Raw pipe-delimited CSV data with audit metadata",
    table_properties={"quality": "bronze", "layer": "raw"}
)
def material_master_bronze():
    df = (
        spark.read
            .option("header", True)
            .option("delimiter", "|")
            .option("inferSchema", True)
            .csv(SOURCE_PATH)
            .withColumn("ingestion_timestamp", F.current_timestamp())
            .withColumn("source_file", F.lit("material_master_1k.csv"))
    )
    return df

@dlt.table(
    name="material_master_silver",
    comment="Silver layer: Cleaned, validated, and standardized material master data",
    table_properties={"quality": "silver", "layer": "curated"}
)
@dlt.expect_or_drop("valid_material_id", "material_id IS NOT NULL")
@dlt.expect_or_drop("valid_material_name", "material_name IS NOT NULL")
@dlt.expect_or_drop("valid_unit_cost", "unit_cost IS NOT NULL AND unit_cost RLIKE '^[0-9]+\\.?[0-9]*$'")
@dlt.expect_or_drop("valid_date", "last_updated IS NOT NULL")
@dlt.expect("cost_in_range", "CAST(unit_cost AS DOUBLE) > 0 AND CAST(unit_cost AS DOUBLE) <= 500")
def material_master_silver():
    df = dlt.read("material_master_bronze")
    
    return (
        df
        .withColumn("unit_cost_clean", 
            F.when(F.col("unit_cost").rlike("^[0-9]+\\.?[0-9]*$"), 
                   F.col("unit_cost").cast("double"))
             .otherwise(None))
        .withColumn("last_updated_date", 
            F.to_date(F.col("last_updated"), "yyyy-MM-dd"))
        .withColumn("lead_time_days_int", 
            F.col("lead_time_days").cast("int"))
        .withColumn("safety_stock_int", 
            F.col("safety_stock").cast("int"))
        .withColumn("reorder_level_int", 
            F.col("reorder_level").cast("int"))
        .withColumn("status_clean", 
            F.upper(F.trim(F.col("status"))))
        .withColumn("plant_upper", 
            F.upper(F.trim(F.col("plant"))))
        .withColumn("country_proper", 
            F.initcap(F.trim(F.col("country"))))
        .withColumn("material_name_proper", 
            F.initcap(F.col("material_name")))
        .withColumn("category_key", 
            F.concat(F.col("category"), F.lit("_"), F.col("sub_category")))
        .withColumn("year_month", 
            F.date_format(F.to_date(F.col("last_updated"), "yyyy-MM-dd"), "yyyy-MM"))
        .withColumn("processing_timestamp", 
            F.current_timestamp())
        .select(
            "material_id",
            F.col("material_name_proper").alias("material_name"),
            "category",
            "sub_category",
            "category_key",
            "uom",
            F.col("unit_cost_clean").alias("unit_cost"),
            F.col("last_updated_date").alias("last_updated"),
            F.col("lead_time_days_int").alias("lead_time_days"),
            F.col("safety_stock_int").alias("safety_stock"),
            F.col("reorder_level_int").alias("reorder_level"),
            "supplier_name",
            F.col("country_proper").alias("country"),
            F.col("plant_upper").alias("plant"),
            F.col("status_clean").alias("status"),
            "remarks",
            "year_month",
            "ingestion_timestamp",
            "processing_timestamp",
            "source_file"
        )
        .dropDuplicates(["material_id"])
    )