In [0]:
from pyspark.sql.functions import col, when, round, current_timestamp, to_date, lit, abs, lower, coalesce
from pyspark.sql.types import DecimalType, LongType, IntegerType

BRONZE_TABLE = "workspace.retail.bronze_sales"
SILVER_TABLE = "workspace.retail.silver_sales"
AUXILIARY_TABLE = "workspace.retail.silver_auxiliary_sales"
QUARANTINE_TABLE = "workspace.retail.silver_quarantine_sales"

OPS_CODES = ["POST", "D", "M", "BANK CHARGES", "ADJUST", "S","DOT", "AMAZONFEE" , "C2", "B"]

def cast_and_enrich(df):
    df_casted = df.withColumn("quantity", col("quantity").cast(IntegerType())) \
        .withColumn("price", col("price").cast(DecimalType(12, 2))) \
        .withColumn("customer_id", col("customer_id").cast(LongType())) \
        .withColumn("invoice_date_only", to_date(col("invoice_date"))) 

    df_filled = df_casted.withColumn("customer_id", coalesce(col("customer_id"), lit(-1).cast(LongType())))

    df_enriched = df_filled.withColumn("total_amount", round(col("quantity") * col("price"), 2).cast(DecimalType(18, 2))) \
        .withColumn("row_category", lit("BATCH")) \
        .withColumn("meta_silver_load_at", current_timestamp()) 

    business_cols = ["invoice", "stock_code", "customer_id", "quantity", "price", "total_amount", "description", "country", "invoice_date"]
    tech_cols = ["invoice_date_only", "row_category", "file_path", "env"]
    meta_cols = [c for c in df_enriched.columns if c.startswith("meta_")]
    final_column_order = business_cols + tech_cols + meta_cols

    df_final = df_enriched.select(*final_column_order)

    return df_final

def classify_and_split(df):
    
    df_classified = df.withColumn("row_category",
        when(col("stock_code").isin(OPS_CODES), "OPS_FEE")
        .when(lower(col("stock_code")).contains("gift"), "OPS_GIFT")
        .when((col("customer_id") == -1) & (col("quantity") < 0) & (col("price") == 0), "INVENTORY_ADJUSTMENT")
        .when((col("invoice").startswith("C")) & (col("price") > 0), "RETURN")
        .when((col("quantity") > 0) & (col("price") > 0), "SALE")
        .otherwise("INVALID")
    )
    

    silver_sales = df_classified.filter(col("row_category").isin("RETURN", "SALE")) \
        .drop("row_category")

    silver_auxiliary = df_classified.filter(col("row_category").isin("OPS_FEE", "OPS_GIFT", "INVENTORY_ADJUSTMENT"))
    
    silver_quarantine = df_classified.filter(col("row_category") == "INVALID")


    return silver_sales, silver_auxiliary, silver_quarantine

def main():
    bronze_df = spark.table(BRONZE_TABLE)

    enriched_df = cast_and_enrich(bronze_df)
    
    silver_df, auxiliary_df, quarantine_df = classify_and_split(enriched_df)
    
    print(f"Writing {silver_df.count()} rows to Silver...")
    silver_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(SILVER_TABLE)
    
    print(f"Writing {auxiliary_df.count()} rows to Auxiliary...")
    auxiliary_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(AUXILIARY_TABLE)

    print(f"Writing {quarantine_df.count()} rows to Quarantine...")
    quarantine_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(QUARANTINE_TABLE)
        
    print("Silver Stage Completed.")

if __name__ == "__main__":
    main()