In [0]:
import dlt
from pyspark.sql.functions import col, lit, to_timestamp, when, current_timestamp,to_date,concat



# Utility: Apply DQ Checks

def apply_dq(df, table_name, rules):
    bad_df = df.withColumn("error_reason", lit(None))
    for cond, msg in rules:
        bad_df = bad_df.withColumn(
            "error_reason",
            when(~cond, msg).otherwise(col("error_reason"))
        )
    good_df = bad_df.filter(col("error_reason").isNull())
    bad_df = (
        bad_df.filter(col("error_reason").isNotNull())
              .withColumn("source_table", lit(table_name))
              .withColumn("dq_run_timestamp", current_timestamp())
    )
    return good_df, bad_df



# Cleaning Functions

def clean_address(df):
    df = df.withColumn("AddressID", col("AddressID").cast("int")) \
           .withColumn("AddressLine1", col("AddressLine1").cast("string")) \
           .withColumn("City", col("City").cast("string")) \
           .withColumn("StateProvinceID", col("StateProvinceID").cast("int")) \
           .withColumn("PostalCode", col("PostalCode").cast("int")) \
           .withColumn("ModifiedDate", to_date(col("ModifiedDate"))) \
           .withColumn("year", col("year").cast("int")) 
    rules = [
        (col("AddressID").isNotNull(), "AddressID cannot be null"),
        # (col("PostalCode").isNotNull(), "PostalCode cannot be null"),
        (col("ModifiedDate").isNotNull(), "ModifiedDate IS NOT NULL")
    ]
    return apply_dq(df, "address", rules)


def clean_product(df):
    df = df.withColumn("ProductID", col("ProductID").cast("int")) \
           .withColumn("Name", col("Name").cast("string")) \
           .withColumn("ProductNumber", col("ProductNumber").cast("string")) \
           .withColumn("StandardCost", col("StandardCost").cast("double")) \
           .withColumn("ListPrice", col("ListPrice").cast("double")) \
           .withColumn("SellStartDate", to_timestamp(col("SellStartDate"))) \
           .withColumn("SellEndDate", to_timestamp(col("SellEndDate"))) \
           .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate")))
    rules = [
        (col("ProductID").isNotNull(), "ProductID cannot be null"),
        (col("Name").isNotNull(), "Name cannot be null"),
        (col("StandardCost") >= 0, "StandardCost must be non-negative"),
        (col("ListPrice") >= 0, "ListPrice must be non-negative")
    ]
    return apply_dq(df, "product", rules)


def clean_addresstype(df):
    return apply_dq(
        df.withColumn("AddressTypeID", col("AddressTypeID").cast("int"))
          .withColumn("Name", col("Name").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "addresstype",
        [(col("AddressTypeID").isNotNull(), "AddressTypeID cannot be null")]
    )


def clean_businessentityaddress(df):
    return apply_dq(
        df.withColumn("AddressID", col("AddressID").cast("int"))
          .withColumn("BusinessEntityID", col("BusinessEntityID").cast("int"))
          .withColumn("AddressTypeID", col("AddressTypeID").cast("int"))
          .withColumn("ModifiedDate", to_date(col("ModifiedDate")))
          .withColumn("Year", col("Year").cast("int")),
        "businessentityaddress",
        [(col("AddressID").isNotNull(), "AddressID cannot be null"),
         (col("BusinessEntityID").isNotNull(), "BusinessEntityID cannot be null")]
    )


def clean_culture(df):
    return apply_dq(
        df.withColumn("CultureID", col("CultureID").cast("string"))
          .withColumn("Name", col("Name").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "culture",
        [(col("CultureID").isNotNull(), "CultureID cannot be null")]
    )


def clean_customer(df):
    return apply_dq(
        df.withColumn("CustomerID", col("CustomerID").cast("int"))
          .withColumn("PersonID", col("PersonID").cast("Double"))
          .withColumn("StoreID", col("StoreID").cast("Double"))
          .withColumn("TerritoryID", col("TerritoryID").cast("int"))
          .withColumn("ModifiedDate", to_date(col("ModifiedDate"))),
        "customer",
        [(col("CustomerID").isNotNull(), "CustomerID cannot be null"),
         (col("PersonID").isNotNull(), "PersonID cannot be null")]
    )


def clean_emailaddress(df):
    df = df.withColumn("EmailAddressID", col("EmailAddressID").cast("int")) \
           .withColumn("BusinessEntityID", col("BusinessEntityID").cast("int")) \
           .withColumn("EmailAddress", col("EmailAddress").cast("string")) \
           .withColumn("ModifiedDate", to_date(col("ModifiedDate"))) \
           .withColumn("year" , col("year").cast("int"))
    rules = [
        (col("EmailAddressID").isNotNull(), "EmailAddressID cannot be null"),
        # (col("EmailAddress").rlike("^[A-Za-z0-9+_.-]+@[A-Za-z0-9.-]+$"), "Invalid email format"),
        (col("ModifiedDate").isNotNull(), "ModifiedDate IS NOT NULL")
    ]
    return apply_dq(df, "emailaddress", rules)


def clean_person(df):
    return apply_dq(
        df.withColumn("BusinessEntityID", col("BusinessEntityID").cast("int"))
          .withColumn("FirstName", col("FirstName").cast("string"))
          .withColumn("LastName", col("LastName").cast("string"))
          .withColumn("ModifiedDate", to_date(col("ModifiedDate")))
          .withColumn("NameStyle", col("NameStyle").cast("boolean"))
          .withColumn("Year", col("Year").cast("int"))
          .withColumn("EmailPromotion", col("EmailPromotion").cast("int")),
        "person",
        [
            (col("BusinessEntityID").isNotNull(), "BusinessEntityID cannot be null")
        ]
    )


def clean_productcategory(df):
    return apply_dq(
        df.withColumn("ProductCategoryID", col("ProductCategoryID").cast("int"))
          .withColumn("Name", col("Name").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "productcategory",
        [(col("ProductCategoryID").isNotNull(), "ProductCategoryID cannot be null")]
    )


def clean_productdescription(df):
    return apply_dq(
        df.withColumn("ProductDescriptionID", col("ProductDescriptionID").cast("int"))
          .withColumn("Description", col("Description").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "productdescription",
        [(col("ProductDescriptionID").isNotNull(), "ProductDescriptionID cannot be null")]
    )


def clean_productmodel(df):
    return apply_dq(
        df.withColumn("ProductModelID", col("ProductModelID").cast("int"))
          .withColumn("Name", col("Name").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "productmodel",
        [(col("ProductModelID").isNotNull(), "ProductModelID cannot be null")]
    )


def clean_productmodelproductdescriptionculture(df):
    return apply_dq(
        df.withColumn("ProductModelID", col("ProductModelID").cast("int"))
          .withColumn("ProductDescriptionID", col("ProductDescriptionID").cast("int"))
          .withColumn("CultureID", col("CultureID").cast("int"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "productmodelproductdescriptionculture",
        [(col("ProductModelID").isNotNull(), "ProductModelID cannot be null")]
    )


def clean_productsubcategory(df):
    return apply_dq(
        df.withColumn("ProductSubcategoryID", col("ProductSubcategoryID").cast("int"))
          .withColumn("ProductCategoryID", col("ProductCategoryID").cast("int"))
          .withColumn("Name", col("Name").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "productsubcategory",
        [(col("ProductSubcategoryID").isNotNull(), "ProductSubcategoryID cannot be null")]
    )


def clean_salesorderdetail(df):
    return apply_dq(
        df.withColumn("SalesOrderID", col("SalesOrderID").cast("int"))
          .withColumn("SalesOrderDetailID", col("SalesOrderDetailID").cast("int"))
          .withColumn("ProductID", col("ProductID").cast("int"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate"))),
        "salesorderdetail",
        [(col("SalesOrderID").isNotNull(), "SalesOrderID cannot be null")]
    )
def clean_salesorderheader(df):
    return apply_dq(
        df.withColumn("SalesOrderID", col("SalesOrderID").cast("int"))
          .withColumn("RevisionNumber", col("RevisionNumber").cast("int"))
          .withColumn("OrderDate", to_timestamp(col("OrderDate")))
          .withColumn("DueDate", to_timestamp(col("DueDate")))
          .withColumn("ShipDate", to_timestamp(col("ShipDate")))
          .withColumn("Status", col("Status").cast("int"))
          .withColumn("OnlineOrderFlag", col("OnlineOrderFlag").cast("boolean"))
          .withColumn("SalesOrderNumber", col("SalesOrderNumber").cast("string"))
          .withColumn("PurchaseOrderNumber", col("PurchaseOrderNumber").cast("string"))
          .withColumn("AccountNumber", col("AccountNumber").cast("string"))
          .withColumn("CustomerID", col("CustomerID").cast("int"))
          .withColumn("SalesPersonID", col("SalesPersonID").cast("int"))
          .withColumn("TerritoryID", col("TerritoryID").cast("int"))
          .withColumn("BillToAddressID", col("BillToAddressID").cast("int"))
          .withColumn("ShipToAddressID", col("ShipToAddressID").cast("int"))
          .withColumn("ShipMethodID", col("ShipMethodID").cast("int"))
          .withColumn("CreditCardID", col("CreditCardID").cast("int"))
          .withColumn("CreditCardApprovalCode", col("CreditCardApprovalCode").cast("string"))
          .withColumn("CurrencyRateID", col("CurrencyRateID").cast("int"))
          .withColumn("SubTotal", col("SubTotal").cast("decimal(18,2)"))
          .withColumn("TaxAmt", col("TaxAmt").cast("decimal(18,2)"))
          .withColumn("Freight", col("Freight").cast("decimal(18,2)"))
          .withColumn("TotalDue", col("TotalDue").cast("decimal(18,2)"))
          .withColumn("Comment", col("Comment").cast("string"))
          .withColumn("rowguid", col("rowguid").cast("string"))
          .withColumn("ModifiedDate", to_timestamp(col("ModifiedDate")))
          .withColumn("_rescued_data", col("_rescued_data").cast("string"))
          .withColumn("Entity", col("Entity").cast("string"))
          .withColumn("File_Name", col("File_Name").cast("string"))
          .withColumn("Year", col("Year").cast("int"))
          .withColumn("Inserted_by", col("Inserted_by").cast("string"))
          .withColumn("Ingested_time", col("Ingested_time").cast("timestamp")),

        "salesorderheader",

        [
           
            (col("SalesOrderID").isNotNull(), "SalesOrderID cannot be null"),
            (col("OrderDate").isNotNull(), "OrderDate cannot be null"),
            (col("DueDate") >= col("OrderDate"), "DueDate must be on or after OrderDate"),
            (col("ShipDate").isNull() | (col("ShipDate") >= col("OrderDate")), "ShipDate must be null or on/after OrderDate"),
            (col("TotalDue") == (col("SubTotal") + col("TaxAmt") + col("Freight")), "TotalDue must equal SubTotal + TaxAmt + Freight")
        ]
    )


# Table Map

table_map = {
    "address_bronze": clean_address,
    "addresstype_bronze": clean_addresstype,
    "businessentityaddress_bronze": clean_businessentityaddress,
    "culture_bronze": clean_culture,
    "customer_bronze": clean_customer,
    "emailaddress_bronze": clean_emailaddress,
    "person_bronze": clean_person,
    "product_bronze": clean_product,
    "productcategory_bronze": clean_productcategory,
    "productdescription_bronze": clean_productdescription,
    "productmodel_bronze": clean_productmodel,
    "productmodelproductdescriptionculture_bronze": clean_productmodelproductdescriptionculture,
    "productsubcategory_bronze": clean_productsubcategory,
    "salesorderdetail_bronze": clean_salesorderdetail,
    "salesorderheader_bronze":clean_salesorderheader
}


# -------------------------
# Silver + Quarantine
# -------------------------
for bronze_table, cleaning_fn in table_map.items():
    silver_table = bronze_table.replace("_bronze", "_silver")

    @dlt.table(
        name=f"{silver_table}",
        comment="Silver layer cleaned data"
    )
    def load_silver(bronze_table=bronze_table, cleaning_fn=cleaning_fn):
        df = dlt.read_stream(f"training.ashish.{bronze_table}")
        good_df, bad_df = cleaning_fn(df)
        return good_df

    # Only create quarantine for selected tables
    if bronze_table in ["address_bronze", "product_bronze", "emailaddress_bronze"]:
        quarantine_table = bronze_table.replace("_bronze", "_quarantine")

        @dlt.table(
            name=f"{quarantine_table}",
            comment=f"Quarantine table for {bronze_table}"
        )
        def load_quarantine(bronze_table=bronze_table, cleaning_fn=cleaning_fn):
            df = dlt.read_stream(f"training.ashish.{bronze_table}")
            _, bad_df = cleaning_fn(df)
            return bad_df
