In [0]:
# Este código se ejecuta en una celda de un Notebook de Databricks
# Su responsabilidad es detectar un único archivo de novedades,
# validarlo, fusionarlo (MERGE) con la tabla 'category' y archivarlo.

from pyspark.sql.functions import col, count, lit
from pathlib import Path

# 1. --- Detección Automática del Archivo a Procesar ---
updates_volume_path = "/Volumes/workspace/tecnomundo_data_raw/uploads_dimensions"
archive_path = f"{updates_volume_path}/archive" # Directorio para el historial

print(f"Buscando archivos de novedades en: {updates_volume_path}")

try:
    # Listar todos los archivos y carpetas en el directorio del volumen
    all_items = dbutils.fs.ls(updates_volume_path)
    
    # Filtrar para encontrar solo archivos CSV que no estén en la carpeta de archivo
    csv_files_to_process = [f for f in all_items if f.name.lower().endswith('.csv') and not f.isDir()]
    
    # --- Validación de Archivo Único ---
    if len(csv_files_to_process) == 0:
        print("No se encontraron nuevos archivos CSV de novedades para procesar. Finalizando el proceso.")
        dbutils.notebook.exit("No hay archivos para procesar.")
    elif len(csv_files_to_process) > 1:
        print(f"Error: Se encontraron {len(csv_files_to_process)} archivos de novedades. Solo debe haber uno.")
        for f in csv_files_to_process:
            print(f" - {f.name}")
        dbutils.notebook.exit("Proceso abortado. Por favor, deje solo un archivo de novedades en el directorio.")
    
    # Si pasamos las validaciones, tenemos exactamente un archivo para procesar
    file_to_process_info = csv_files_to_process[0]
    updates_csv_path = file_to_process_info.path
    print(f"Archivo único detectado para procesar: {updates_csv_path}")

except Exception as e:
    print(f"Error al listar los archivos en el volumen: {e}")
    dbutils.notebook.exit("Fallo al acceder al directorio de novedades. Verifique los permisos del clúster.")


# 2. --- Configuración de Tablas ---
target_dimension_table = "workspace.tecnomundo_data_dimensions.category"
print(f"Tabla de destino a actualizar: {target_dimension_table}")


# 3. --- Lectura y Preparación de las Novedades ---
print("\nLeyendo archivo CSV de novedades...")
try:
    df_updates = (spark.read
                  .format("csv")
                  .option("header", "true")
                  .load(updates_csv_path))
    
    # Creamos una vista temporal para usarla en las validaciones y el MERGE
    df_updates.createOrReplaceTempView("category_updates_temp_view")
    print("Novedades leídas y listas para la fusión.")

except Exception as e:
    print(f"Error al leer el archivo CSV desde el Volume: {e}")
    dbutils.notebook.exit("No se pudo leer el archivo de novedades. Verifica que la ruta y el nombre del archivo sean correctos.")


# 4. --- Validaciones Previas al MERGE ---
print("\n--- INICIANDO VALIDACIONES PREVIAS AL MERGE ---")

# --- Validación 1: Verificar la estructura del archivo de novedades ---
print("Validación 1: Comprobando que el archivo de novedades tenga las columnas requeridas...")
required_columns = {"codigo_producto", "nombre_del_producto", "categoria"}
actual_columns = set(df_updates.columns)

if not required_columns.issubset(actual_columns):
    missing_cols = required_columns - actual_columns
    found_cols = list(actual_columns)
    error_message = (f"Proceso abortado: Estructura de archivo incorrecta. "
                     f"Columnas requeridas que faltan: {missing_cols}. "
                     f"Columnas encontradas en el archivo: {found_cols}.")
    print(f"Error: {error_message}")
    dbutils.notebook.exit(error_message)
print("  - OK: La estructura del archivo de novedades es correcta.")


# --- Validación 2: Comprobar si hay duplicados en el archivo de entrada ---
print("\nValidación 2: Comprobando duplicados en el archivo de novedades...")
duplicates_in_source_df = df_updates.groupBy("codigo_producto").count().filter(col("count") > 1)
if duplicates_in_source_df.count() > 0:
    print("Error: Se encontraron los siguientes códigos de producto duplicados en el archivo de entrada:")
    duplicates_in_source_df.show()
    dbutils.notebook.exit("Proceso abortado debido a duplicados en el archivo de origen.")
print("  - OK: No se encontraron duplicados en el archivo de origen.")


# --- Validación 3: Análisis de Cambios ---
print("\nValidación 3: Analizando cambios a realizar (INSERT vs UPDATE)...")
df_to_update = spark.sql(f"""
    SELECT source.codigo_producto FROM category_updates_temp_view source
    INNER JOIN {target_dimension_table} target ON source.codigo_producto = target.codigo_producto
""")
print(f"  - Se van a ACTUALIZAR {df_to_update.count()} productos existentes.")
if df_to_update.count() > 0: df_to_update.show(5, truncate=False)

df_to_insert = spark.sql(f"""
    SELECT source.codigo_producto FROM category_updates_temp_view source
    LEFT JOIN {target_dimension_table} target ON source.codigo_producto = target.codigo_producto
    WHERE target.codigo_producto IS NULL
""")
print(f"  - Se van a INSERTAR {df_to_insert.count()} productos nuevos.")
if df_to_insert.count() > 0: df_to_insert.show(5, truncate=False)

print("--- VALIDACIONES COMPLETADAS ---")


# 5. --- Fusión de Datos con MERGE INTO ---
print(f"\nEjecutando MERGE en la tabla '{target_dimension_table}'...")
try:
    merge_result = spark.sql(f"""
      MERGE INTO {target_dimension_table} AS target
      USING category_updates_temp_view AS source
      ON target.codigo_producto = source.codigo_producto
      WHEN MATCHED THEN
        UPDATE SET
          target.nombre_del_producto = source.nombre_del_producto,
          target.categoria = source.categoria
      WHEN NOT MATCHED THEN
        INSERT (codigo_producto, nombre_del_producto, categoria)
        VALUES (source.codigo_producto, source.nombre_del_producto, source.categoria)
    """)
    print("¡Éxito! La tabla de dimensiones ha sido actualizada.")
    merge_result.show()

    # --- 6. Archivar el archivo procesado ---
    print("\nArchivando el archivo de novedades procesado...")
    dbutils.fs.mkdirs(archive_path) # Asegura que la carpeta de archivo exista
    processed_file_name = Path(updates_csv_path).name
    dbutils.fs.mv(updates_csv_path, f"{archive_path}/{processed_file_name}")
    print(f"Archivo procesado movido a: {archive_path}/{processed_file_name}")

except Exception as e:
    print(f"Ocurrió un error durante la operación MERGE o el archivado: {e}")


# 7. --- Verificación Final ---
print("\nMostrando una muestra de la tabla de dimensiones actualizada:")
display(spark.table(target_dimension_table).orderBy(col("codigo_producto").desc()))
