In [0]:
# 02_clean_silver (Unity Catalog)
from perf_lab_utils import perf_lab
from pyspark.sql import functions as F
from pyspark.sql.window import Window

dbutils.widgets.text("catalog", spark.sql("SELECT current_catalog()").first()[0])
dbutils.widgets.text("schema", "mini_lakehouse")
CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")

def tn(name: str) -> str:
    return f"{CATALOG}.{SCHEMA}.{name}"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

customers_b = spark.table(tn("customers_bronze"))
orders_b = spark.table(tn("orders_bronze"))
items_b = spark.table(tn("order_items_bronze"))

print("Bronze counts:",
      "customers=", customers_b.count(),
      "orders=", orders_b.count(),
      "items=", items_b.count())

def dedup_latest(df, key_cols, ts_col="ingest_ts"):
    if ts_col not in df.columns:
        return df.dropDuplicates(key_cols)
    w = Window.partitionBy([F.col(c) for c in key_cols]).orderBy(F.col(ts_col).desc())
    return (df
            .withColumn("_rn", F.row_number().over(w))
            .filter(F.col("_rn") == 1)
            .drop("_rn"))

def ensure_col(df, col_name, dtype):
    if col_name not in df.columns:
        return df.withColumn(col_name, F.lit(None).cast(dtype))
    return df

c = customers_b
c = ensure_col(c, "customer_id", "string")
c = c.withColumn("customer_id", F.col("customer_id").cast("string"))

for colname in ["first_name", "last_name", "country"]:
    if colname in c.columns:
        c = c.withColumn(colname, F.trim(F.col(colname)))

if "email" in c.columns:
    c = c.withColumn("email", F.lower(F.trim(F.col("email"))))

c = c.filter(F.col("customer_id").isNotNull() & (F.length(F.col("customer_id")) > 0))
c_silver_df = dedup_latest(c, ["customer_id"], ts_col="ingest_ts")

o = orders_b
o = ensure_col(o, "order_id", "string")
o = ensure_col(o, "customer_id", "string")
o = o.withColumn("order_id", F.col("order_id").cast("string"))
o = o.withColumn("customer_id", F.col("customer_id").cast("string"))

if "order_ts" in o.columns:
    o = o.withColumn("order_ts", F.to_timestamp(F.col("order_ts")))
elif "order_date" in o.columns:
    o = o.withColumn("order_ts", F.to_timestamp(F.col("order_date")))
else:
    o = o.withColumn("order_ts", F.lit(None).cast("timestamp"))

if "status" in o.columns:
    o = o.withColumn("status", F.upper(F.trim(F.col("status"))))

o = o.filter(
    F.col("order_id").isNotNull() & (F.length(F.col("order_id")) > 0) &
    F.col("customer_id").isNotNull() & (F.length(F.col("customer_id")) > 0)
)

o = dedup_latest(o, ["order_id"], ts_col="ingest_ts")

o_before = o.count()
o_silver_df = o.join(
    c_silver_df.select("customer_id").dropDuplicates(["customer_id"]),
    on="customer_id",
    how="inner"
)
print("Orders removed due to missing customer reference:", o_before - o_silver_df.count())

i = items_b
i = ensure_col(i, "order_id", "string")
i = ensure_col(i, "product_id", "string")
i = ensure_col(i, "quantity", "int")
i = ensure_col(i, "unit_price", "double")

i = (i
     .withColumn("order_id", F.col("order_id").cast("string"))
     .withColumn("product_id", F.col("product_id").cast("string"))
     .withColumn("quantity", F.col("qty").cast("int"))
     .withColumn("unit_price", F.col("price").cast("double"))
     .drop("qty", "price")
)

i = i.filter(
    F.col("order_id").isNotNull() & (F.length(F.col("order_id")) > 0) &
    F.col("product_id").isNotNull() & (F.length(F.col("product_id")) > 0)
)

i = i.filter((F.col("quantity").isNull()) | (F.col("quantity") >= 0))
i = i.filter((F.col("unit_price").isNull()) | (F.col("unit_price") >= 0))

i = i.withColumn(
    "line_amount",
    F.when(F.col("quantity").isNotNull() & F.col("unit_price").isNotNull(),
           F.col("quantity") * F.col("unit_price"))
     .otherwise(F.lit(None).cast("double"))
)

i = dedup_latest(i, ["order_id", "product_id"], ts_col="ingest_ts")

i_before = i.count()
i_silver_df = i.join(
    o_silver_df.select("order_id").dropDuplicates(["order_id"]),
    on="order_id",
    how="inner"
)
print("Items removed due to missing order reference:", i_before - i_silver_df.count())

order_totals = (i_silver_df
                .groupBy("order_id")
                .agg(F.sum("line_amount").alias("order_total_amount")))

o_silver_df = o_silver_df.join(order_totals, on="order_id", how="left")

perf_lab(c_silver_df, "customers_silver", keys=["customer_id"], null_cols=["customer_id"], emit_view="perf_metrics")
perf_lab(o_silver_df, "orders_silver", keys=["order_id"], null_cols=["order_id", "customer_id"], emit_view="perf_metrics")
perf_lab(i_silver_df, "order_items_silver", keys=["order_id", "product_id"], null_cols=["order_id", "product_id"], negative_cols=["quantity", "unit_price"], emit_view="perf_metrics")

c_silver_df.write.mode("overwrite").format("delta").saveAsTable(tn("customers_silver"))
o_silver_df.write.mode("overwrite").format("delta").saveAsTable(tn("orders_silver"))
i_silver_df.write.mode("overwrite").format("delta").saveAsTable(tn("order_items_silver"))

print("Created tables:", tn("customers_silver"), tn("orders_silver"), tn("order_items_silver"))

print("Silver counts:",
      "customers=", spark.table(tn("customers_silver")).count(),
      "orders=", spark.table(tn("orders_silver")).count(),
      "items=", spark.table(tn("order_items_silver")).count())

print("\n=== Customers Silver ===")
display(c_silver_df)

print("\n=== Orders Silver ===")
display(o_silver_df)

print("\n=== Order Items Silver ===")
display(i_silver_df)
