In [0]:
import dlt
from pyspark.sql.functions import col, trim
 
CATALOG = "workspace"
SCHEMA  = "damg7370"
VOLUME  = "datastore"   
 
SOURCE_CSV = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/material_master_1k.csv"
 
@dlt.table(
    name="material_master_bronze",
    comment="Bronze: raw pipe-delimited Material Master from Volume (as-is)."
)
def material_master_bronze():
    return (
        spark.read
             .option("header", True)
             .option("delimiter", "|")
             .csv(SOURCE_CSV)
    )
 
@dlt.table(
    name="material_master_silver",
    comment="Silver: cleaned & standardized Material Master."
)
@dlt.expect_or_drop("not_null_material_id", "material_id IS NOT NULL")
def material_master_silver():
    df = dlt.read("material_master_bronze")
    return (
        df.select([col(c).alias(c.lower()) for c in df.columns])
          .withColumn("material_id", trim(col("material_id")))
    )