In [0]:
import dlt
from pyspark.sql.functions import col, lit, when

# Source tables from the physical Bronze schema
SOURCE_CATALOG = "adb_catalog"
SOURCE_SCHEMA  = "bronze"

# -- CUSTOMERS SILVER --
dlt.create_streaming_table(
    name = "customers_silver",
    comment = "Cleaned and deduplicated customer records. Implements SCD Type 1 logic.",
    expect_all_or_drop = {
        "valid_customer_id": "customerId IS NOT NULL",
        "valid_email": "email LIKE '%@%.%'",
        "no_schema_drift": "_rescued_data IS NULL"
    }
)

dlt.create_auto_cdc_flow(
    target = "customers_silver",
    source = f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.customers_bronze",
    keys = ["customerId"],
    sequence_by = col("ingestion_timestamp"),
    stored_as_scd_type = "1"
)

# -- customers_error_table to capture only failures --
@dlt.table(
    name = "customers_error_table",
    comment = "customers records that failed data quality checks in the Silver layer."
)
def customers_error_table():
    return (
        spark.readStream.table(f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.customers_bronze")
        .filter(
            "(customerId IS NULL) OR "
            "(email NOT LIKE '%@%.%') OR "
            "(_rescued_data IS NOT NULL)"
        )
        .withColumn("error_reason",
            when(col("customerId").isNull(), lit("null_id"))
            .when(~col("email").like("%@%.%"), lit("bad_email"))
            .when(col("_rescued_data").isNotNull(), lit("schema_drift"))
            .otherwise(lit("multiple_issues"))
        )
    )

# -- Orders Silver --
dlt.create_streaming_table(
    name = "orders_silver",
    comment = "Cleaned order transactions. Rows with null IDs or non-positive amounts are dropped.",
    expect_all_or_drop = {
        "valid_order_id": "orderId IS NOT NULL",
        "positive_amount": "order_amount > 0",
        "no_schema_drift": "_rescued_data IS NULL"
    }
)

dlt.create_auto_cdc_flow(
    target = "orders_silver",
    source = f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.orders_bronze",
    keys = ["orderId"],
    sequence_by = col("ingestion_timestamp"),
    stored_as_scd_type = "1"
)

# -- ORDERS orders_error_table to capture only failures --
@dlt.table(
    name = "orders_error_table",
    comment = "orders records that failed data quality checks in the Silver layer."
)

def orders_error_table():
    return (
        spark.readStream.table(f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.orders_bronze")
        .filter(
            "(orderId IS NULL) OR "
            "(order_amount <= 0) OR "
            "(_rescued_data IS NOT NULL)"
        )
        
        .withColumn("error_reason",
            when(col("orderId").isNull(), lit("null_order_id"))
            .when(col("order_amount") <= 0, lit("non_positive_amount"))
            .when(col("_rescued_data").isNotNull(), lit("schema_drift"))
            .otherwise(lit("unknown_error"))
        )
    )