In [0]:
%sql
ALTER TABLE santig_120781.ingesta_nueva_bronze.orders 
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

In [0]:
%sql
ALTER TABLE santig_120781.ingesta_nueva_bronze.orders DROP COLUMN IF EXISTS _c7;
ALTER TABLE santig_120781.ingesta_nueva_bronze.orders DROP COLUMN IF EXISTS _c8;
ALTER TABLE santig_120781.ingesta_nueva_bronze.orders DROP COLUMN IF EXISTS _c9;

In [0]:
%sql DESCRIBE TABLE santig_120781.ingesta_nueva_bronze.orders

col_name,data_type,comment
order_id,int,
user_id,int,
eval_set,string,
order_number,int,
order_dow,int,
order_hour_of_day,int,
days_since_prior_order,double,


In [0]:
from pyspark.sql import functions as F

# ============================================================
# CONFIGURACION
# ============================================================
path_ingesta = "abfss://raw@adlsmardata1307.dfs.core.windows.net/ingesta_nueva/"
CATALOGO     = "santig_120781"
ESQUEMA      = f"{CATALOGO}.ingesta_nueva_bronze"

# Columnas exactas que debe tener cada tabla Bronze (sin columnas basura)
columnas_bronze = {
    "orders": ["order_id", "user_id", "eval_set", "order_number",
               "order_dow", "order_hour_of_day", "days_since_prior_order", "_process_date"],
    "products": ["product_id", "product_name", "aisle_id", "department_id", "_process_date"],
    "departments": ["department_id", "department", "_process_date"],
    "aisles": ["aisle_id", "aisle", "_process_date"],
    "order_products__prior": ["order_id", "product_id", "add_to_cart_order", "reordered", "_process_date"],
    "order_products__train": ["order_id", "product_id", "add_to_cart_order", "reordered", "_process_date"],
}

# ============================================================
# PROCESO DE INGESTA
# ============================================================
try:
    files = dbutils.fs.ls(path_ingesta)
except Exception as e:
    raise Exception(f"No se pudo acceder a la ruta de ingesta: {path_ingesta}\nDetalle: {e}")

archivos_procesados = 0
archivos_con_error  = 0

for file in files:
    if not file.name.endswith(".csv"):
        continue

    table_name = file.name.split('_')[0].replace(".csv", "")
    full_table = f"{ESQUEMA}.{table_name}"

    print(f"\nProcesando: {file.name}  ->  {full_table}")

    try:
        # 1. Lectura del CSV
        df_nuevo = spark.read \
            .option("header", "True") \
            .option("inferSchema", "True") \
            .option("sep", ";") \
            .csv(file.path)

        # 2. Agregar columna de auditoria
        df_nuevo = df_nuevo.withColumn("_process_date", F.current_timestamp())

        # 3. Seleccionar SOLO las columnas validas (elimina _c7, _c8, _c9, etc.)
        columnas_validas = columnas_bronze.get(table_name)

        if columnas_validas is None:
            print(f"  AVISO: Tabla '{table_name}' no esta en el mapa de columnas. Se usaran todas las columnas del CSV.")
            df_para_guardar = df_nuevo
        else:
            columnas_disponibles = [c for c in columnas_validas if c in df_nuevo.columns]
            columnas_faltantes   = [c for c in columnas_validas if c not in df_nuevo.columns]

            if columnas_faltantes:
                print(f"  AVISO: Columnas faltantes en el CSV (se ignoraran): {columnas_faltantes}")

            df_para_guardar = df_nuevo.select(*columnas_disponibles)

        # 4. Verificar que el DataFrame no esta vacio
        if df_para_guardar.limit(1).count() == 0:
            print(f"  AVISO: El archivo {file.name} esta vacio. Se omite.")
            continue

        # 5. Escritura: overwrite si la tabla no existe, append si ya existe
        tabla_existe = spark.catalog.tableExists(full_table)

        if not tabla_existe:
            df_para_guardar.write.format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(full_table)
            print(f"  OK: Tabla creada y datos cargados.")
        else:
            # Validar esquema antes del append para evitar el AnalysisException
            esquema_tabla = set([f.name for f in spark.read.table(full_table).schema.fields])
            esquema_df    = set([f.name for f in df_para_guardar.schema.fields])

            if esquema_tabla != esquema_df:
                diferencias = esquema_tabla.symmetric_difference(esquema_df)
                raise Exception(
                    f"Mismatch de esquema detectado antes de escribir.\n"
                    f"  Columnas en tabla:     {sorted(esquema_tabla)}\n"
                    f"  Columnas en DataFrame: {sorted(esquema_df)}\n"
                    f"  Diferencias:           {diferencias}\n"
                    f"  Ejecuta el ALTER TABLE para limpiar la tabla y vuelve a correr."
                )

            df_para_guardar.write.format("delta") \
                .mode("append") \
                .saveAsTable(full_table)
            print(f"  OK: Datos aÃ±adidos en modo append.")

        archivos_procesados += 1

    except Exception as e:
        print(f"  ERROR procesando {file.name}: {e}")
        archivos_con_error += 1

# ============================================================
# RESUMEN
# ============================================================
print(f"\n{'='*55}")
print(f"RESUMEN DE INGESTA")
print(f"  Archivos procesados exitosamente : {archivos_procesados}")
print(f"  Archivos con error               : {archivos_con_error}")
print(f"{'='*55}")

# ============================================================
# LIMPIEZA DE CARPETA (solo si todo fue exitoso)
# ============================================================
if archivos_con_error == 0 and archivos_procesados > 0:
    dbutils.fs.rm(path_ingesta, recurse=True)
    dbutils.fs.mkdirs(path_ingesta)
    print("Carpeta de ingesta limpiada correctamente.")
elif archivos_con_error > 0:
    print("No se limpio la carpeta porque hubo errores. Revisa los archivos fallidos.")
else:
    print("No habia archivos CSV para procesar.")

