# LIMPIEZA TABLA PRODUCTOS Y CARGA A SILVER

In [0]:
from pyspark.sql.functions import col, regexp_replace, trim, when, current_timestamp

# 1. Cargar desde Bronze
df_bronze_prod = spark.read.table("workspace.bronze.productos")

# Función para manejar decimales (7,14 -> 7.14)
def fix_decimal_prod(column_name):
    # Quitamos puntos de miles y cambiamos coma por punto
    return regexp_replace(regexp_replace(col(column_name), r'\.', ''), r',', '.')

# 2. PROCESAMIENTO SILVER
df_silver_prod = df_bronze_prod.select(
    col("codigo_de_referencia").cast("string").alias("codigo_producto"),
    col("nombre").cast("string"),
    # Limpieza de categorías
    trim(regexp_replace(
        regexp_replace(
            regexp_replace(col("categoria"), "HIDRÁAULICA", "HIDRÁULICA"),
            "ILUMINACÍON", "ILUMINACIÓN"
        ), r'\s+$', ''
    )).alias("categoria"),
    # Conversión monetaria (DOUBLE)
    fix_decimal_prod("precio_1_con_iva").cast("double").alias("precio_con_iva"),
    fix_decimal_prod("costo_sin_iva").cast("double").alias("costo_sin_iva"),
    # SOLUCIÓN AL ERROR: Primero cast a DOUBLE y luego a INT
    fix_decimal_prod("minimo_de_unidades").cast("double").cast("int").alias("minimo_de_unidades"),
    fix_decimal_prod("maximo_de_unidades").cast("double").cast("int").alias("maximo_de_unidades"),
    fix_decimal_prod("stock").cast("double").cast("int").alias("stock"),
    col("unidad_de_medida").cast("string")
).filter(
    (col("codigo_producto").isNotNull()) & 
    (trim(col("codigo_producto")) != "")
).dropDuplicates(["codigo_producto"])

# 3. Reemplazar nulos y añadir timestamp
df_silver_prod = df_silver_prod.fillna(0, subset=["precio_con_iva", "costo_sin_iva", "stock"]) \
    .withColumn("fecha_carga_silver", current_timestamp())

# 4. Guardar en Silver
df_silver_prod.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("workspace.silver.productos")

print("✅ Tabla 'productos' procesada exitosamente en Capa Silver.")