In [0]:
# IMPORTS
from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_date, when, trim, lower
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import re


In [0]:
def process_data(audit_date: str):
    """
    Función para transformar los datos de raw y guardar en processed.
    
    Args:
        run_date (str): fecha de ejecución (yyyy-MM-dd) desde orquestador.
    """
    
    # --- Lectura de raw filtrando por audit_date ---
    PATH_RAW = "/mnt/datalake/tfm/mercadona/raw/"
    datos_raw = spark.read.format("delta").load(PATH_RAW)
    datos_raw = datos_raw.filter(col("audit_date") == audit_date)
    
    # --- Renombrar columnas y adaptar tipos ---
    datos_raw = datos_raw.withColumnRenamed("nombre", "producto")
    datos_raw = datos_raw.withColumn("precio_unitario", col("precio_unitario").cast(DecimalType(10,2))) \
                         .withColumn("precio_pack", col("precio_pack").cast(DecimalType(10,2))) \
                         .withColumn("precio_referencia", col("precio_referencia").cast(DecimalType(10,2))) \
                         .withColumn("precio_anterior", col("precio_anterior").cast(DecimalType(10,2))) \
                         .withColumn("tamano_unidad", col("tamano_unidad").cast(DecimalType(10,2))) \
                         .withColumn("unidades_totales", col("unidades_totales").cast("int")) \
                         .withColumn("iva", col("iva").cast("int")) \
                         .withColumn("audit_date", to_date(col("audit_date"), "yyyy-MM-dd"))
    
    # --- Limpieza de strings ---
    datos_raw = datos_raw.withColumn("producto", trim(lower(col("producto")))) \
                         .withColumn("categoria", trim(lower(col("categoria"))))
    
    # --- Tratamiento de variables ---
    datos_raw = datos_raw.withColumn("precio_bajado",
                                     when(col("precio_anterior").isNotNull(), True).otherwise(False))
    datos_raw = datos_raw.withColumn("precio_anterior",
                                     when(col("precio_anterior").isNull(), col("precio_unitario")).otherwise(col("precio_anterior")))
    datos_raw = datos_raw.withColumn("porcentaje_descuento",
                                     when(col("precio_bajado") == True, 
                                          ((col("precio_anterior") - col("precio_unitario")) / col("precio_anterior")) * 100
                                         ).otherwise(0)
                                    ).withColumn("porcentaje_descuento", col("porcentaje_descuento").cast(DecimalType(10,2)))
    
    # --- Categorías grandes ---
    datos_raw = datos_raw.withColumn("categoria_grande",
        when(col("categoria").isin("aceite de oliva", "otros aceites"), "Aceites")
        .when(col("categoria").isin("vinagre y otros aderezos"), "Vinagres y salsas")
        .when(col("categoria").isin("cerveza botella y botellín", "cerveza lata", "combinado de cerveza"), "Cervezas")
        .when(col("categoria").isin("rioja", "ribera del duero", "otros vinos tintos", "vino tinto de mesa", "tinto de verano y sangría"), "Vinos")
        .when(col("categoria").isin("cereales integrales y muesli", "barritas de cereales", "cereales"), "Cereales")
        .when(col("categoria").isin("arroz"), "Arroces y legumbres")
        .when(col("categoria").isin("huevos"), "Huevos y lácteos")
        .when(col("categoria").isin("detergente en polvo y monodosis", "detergente líquido y gel", "detergente lavado a mano", "suavizante", "quitamanchas", "activador y antical lavadora", "planchado"), "Limpieza")
        .when(col("categoria").isin("pepinos y otros encurtidos", "aceitunas verdes", "aceitunas negras", "cóctel y banderillas"), "Conservas")
        .otherwise("Otros")
    )
    
    # --- Columnas de tiempo ---
    datos_raw = datos_raw.withColumn("anio", year(col("audit_date"))) \
                         .withColumn("mes", month(col("audit_date"))) \
                         .withColumn("dia", dayofmonth(col("audit_date")))
    
    # --- Función para extraer marca ---
    def extraer_marca(producto):
        producto = producto.strip().lower()
        marcas = [
            "hacendado", "bronchales", "vichy catalan", "cortes", "la casera",
            "nesquik", "colacao", "la chocolatera", "bosque verde", "beltrán",
            "enervit sport", "kellogg's", "brüggen", "stars", "campo nature",
            "huerta de barros", "steinburg", "heineken", "amstel", "desperados",
            "casón histórico", "sandevid", "don simón", "el elegido", "fidencio",
            "bodegas borsao", "the guv'nor", "davida", "mar de uvas", "torre oria"
        ]
        for marca in marcas:
            if marca in producto:
                return marca
        return None
    
    extraer_marca_udf = F.udf(extraer_marca, StringType())
    datos_raw = datos_raw.withColumn("marca", extraer_marca_udf(col("producto"))).dropDuplicates(["id_producto", "audit_date"])
    
    # --- Guardar en processed (Delta Lake) ---
    PATH_PROCESSED = "/mnt/datalake/tfm/mercadona/processed/"
    datos_raw.write.format("delta") \
        .mode("append") \
        .partitionBy("audit_date") \
        .save(PATH_PROCESSED)
    
    print(f"[OK] Datos processed guardados para {audit_date}")

dbutils.widgets.text("run_date", "")
audit_date = dbutils.widgets.get("run_date")

result = process_data(audit_date)
dbutils.notebook.exit(result)
