In [0]:
# retail_silver (actualizado para procesamiento incremental)

from pyspark.sql.functions import col, round, trim, upper, length, to_timestamp, when

# 1. Leer desde Bronze
ruta_bronze = "/Volumes/workspace/default/retail_lakehouse/bronze"
df_bronze = spark.read.format("delta").load(ruta_bronze)

# 2. Limpiar y transformar

# Eliminar registros nulos críticos y datos negativos
df_silver_clean = (
    df_bronze.dropna(subset=["InvoiceNo", "StockCode", "Quantity", "UnitPrice", "CustomerID", "InvoiceDate"])
             .filter((col("Quantity") > 0) & (col("UnitPrice") > 0))
)

# Normalizar columnas de texto
df_silver_clean = (
    df_silver_clean.withColumn("Description", trim(upper(col("Description"))))
                   .withColumn("Country", trim(upper(col("Country"))))
)

# Convertir fechas

# Nota: Si ya está en timestamp, no pasa nada
df_silver_clean = df_silver_clean.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate")))

# Agregar columna de total de venta
df_silver_clean = df_silver_clean.withColumn("TotalVenta", round(col("Quantity") * col("UnitPrice"), 2))

# Validar descripción

df_silver_clean = df_silver_clean.withColumn(
    "DescripcionValida", when(length(col("Description")) > 0, "SI").otherwise("NO")
)

# Eliminar duplicados exactos por seguridad
df_silver_clean = df_silver_clean.dropDuplicates()

# Guardar en capa Silver sin sobrescribir todo, solo appending si es incremental
ruta_silver = "/Volumes/workspace/default/retail_lakehouse/silver"
df_silver_clean.write.format("delta").mode("append").save(ruta_silver)

# Crear vista para uso posterior
df_silver_clean.createOrReplaceTempView("silver_online_retail")