# Proyecto Final - Big Data (Problema #1)
**Estudiante: Justin Arce**

Este notebook contiene el pipeline completo (Fases 1-5) para el Problema #1: Predicción de la gravedad de accidentes de bicicleta en Madrid. El notebook está diseñado para ejecutarse de inicio a fin.

## Paso 0: Instalación de Dependencias y Configuración

In [None]:
# Instalar 'optuna' para la Optimización de Hiperparámetros (HPO)
!pip3 install optuna

In [None]:
# --- Importación de Librerías --- 
import os
import math
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import optuna
from datetime import date
import time

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DateType, StringType, DoubleType
from pyspark.sql.functions import col, expr, cast, concat_ws, lag, to_date, lit, substring, coalesce, when, regexp_replace, trim, udf, row_number

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, FeatureHasher, StandardScaler
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
# --- Configuración de Spark y Base de Datos (basado en Tarea3_bigdata.ipynb) --- 

# 1. Iniciar Sesión de Spark
spark = SparkSession.builder \
    .appName("ProyectoBigData_SingleNotebook") \
    .config("spark.jars", "/src/postgresql-42.2.14.jar") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

# 2. Rutas de Datos Crudos
PATH_ACCIDENTES = "datos/accidentes/"
PATH_ESTACIONES = "datos/estaciones/Estaciones_control_datos_meteorologicos.csv"
PATH_METEO = "datos/meteo/"

# 3. Configuración de Base de Datos PostgreSQL (para Fases 2 y 3)
# Usamos la IP del gateway de Docker y el puerto expuesto de 'start_local_env.sh'
DB_URL = "jdbc:postgresql://172.17.0.1:5433/accidentes_db"
DB_TABLE = "dataset_final_ml"
DB_PROPS = {
    "user": "postgres",
    "password": "testPassword",
    "driver": "org.postgresql.Driver",
    "encoding": "UTF-8" 
}

# 4. Semilla Fija para Reproducibilidad
SEED = 42

print("Spark y configuración listos.")

## Fase 1: ETL - Preprocesamiento de Fuentes

**Objetivo:** Cargar, limpiar, transformar y unir (append) las 3 fuentes de datos por separado.

In [None]:
# --- Funciones de Ayuda (Sanitización) ---

def sanitizar_columnas_string(df: DataFrame, columnas: list) -> DataFrame:
    """
    Limpia espacios en blanco (trim) y remueve caracteres de nueva línea
    en las columnas de string especificadas.
    """
    df_sanitizado = df
    for c in columnas:
        if c in df.columns:
            df_sanitizado = df_sanitizado.withColumn(
                c,
                trim(regexp_replace(col(c), "[\n\r]", ""))
            )
    return df_sanitizado

# --- Funciones de Fuente A (Accidentes) ---

def cargar_datos_fuente_a(spark: SparkSession, ruta_directorio: str) -> DataFrame:
    """
    Carga y une (append) todos los CSV del directorio 'datos/accidentes/'.
    Maneja los schemas diferentes (lesividad vs tipo_lesividad).
    Limpia coordenadas y fechas.
    """
    print(f"Cargando Fuente A (Accidentes) desde {ruta_directorio}...")

    # Cargar todos los CSVs en el directorio.
    # No usamos inferSchema, castearemos manualmente para robustez.
    df_unido = spark.read \
        .option("header", "true") \
        .option("delimiter", ";") \
        .option("encoding", "ISO-8859-1") \
        .csv(ruta_directorio)

    # 1. Manejar Schemas Diferentes (lesividad vs tipo_lesividad)
    if "lesividad" not in df_unido.columns:
        df_unido = df_unido.withColumn("lesividad", lit(None).cast(StringType()))
    if "tipo_lesividad" not in df_unido.columns:
        df_unido = df_unido.withColumn("tipo_lesividad", lit(None).cast(StringType()))

    df_con_lesividad = df_unido.withColumn(
        "lesividad_unida",
        coalesce(col("lesividad"), col("tipo_lesividad"))
    ).drop("lesividad", "tipo_lesividad")

    # 2. Limpiar Coordenadas (Quitar '.' de miles y reemplazar ',' decimal)
    df_coords_limpias = df_con_lesividad.withColumn(
        "coordenada_x_utm_limpia",
        regexp_replace(col("coordenada_x_utm"), "[\\.]", "") # Quitar puntos de miles
    ).withColumn(
        "coordenada_x_utm_limpia",
        regexp_replace(col("coordenada_x_utm_limpia"), ",", ".") # Reemplazar coma decimal
    ).withColumn(
        "coordenada_y_utm_limpia",
        regexp_replace(col("coordenada_y_utm"), "[\\.]", "")
    ).withColumn(
        "coordenada_y_utm_limpia",
        regexp_replace(col("coordenada_y_utm_limpia"), ",", ".")
    ).withColumn(
        "coordenada_x_utm",
        col("coordenada_x_utm_limpia").cast(DoubleType())
    ).withColumn(
        "coordenada_y_utm",
        col("coordenada_y_utm_limpia").cast(DoubleType())
    ).drop("coordenada_x_utm_limpia", "coordenada_y_utm_limpia")
    
    # 3. Limpiar y Formatear Fecha y Hora
    df_fecha_limpia = df_coords_limpias.withColumn(
        "fecha",
        to_date(col("fecha"), "dd/MM/yyyy")
    ).withColumn(
        "hora",
        concat_ws("H", lit(""), substring(col("hora"), 1, 2)) # "10:00:00" -> "H10"
    )
    
    # 4. Sanitizar columnas de string
    columnas_string = ["tipo_accidente", "distrito", "sexo", "estado_meteorológico"]
    df_sanitizado = sanitizar_columnas_string(df_fecha_limpia, columnas_string)

    return df_sanitizado

def crear_variable_objetivo(df_accidentes: DataFrame) -> DataFrame:
    """
    Crea la variable objetivo binaria 'accidente_grave'.
    """
    print("Creando variable objetivo 'accidente_grave'...")
    
    df_con_lesividad_int = df_accidentes.withColumn(
        "cod_lesividad_int",
        col("cod_lesividad").cast(IntegerType())
    )

    df_con_objetivo = df_con_lesividad_int.withColumn(
        "accidente_grave",
        when(
            (col("cod_lesividad_int") == 3) | (col("cod_lesividad_int") == 4), 1
        ).otherwise(0)
    ).drop("cod_lesividad_int")
    
    return df_con_objetivo

# --- Funciones de Fuente B (Estaciones) ---

def cargar_datos_fuente_b(spark: SparkSession, ruta_archivo: str) -> DataFrame:
    """
    Carga el CSV de Estaciones (Fuente B).
    """
    print(f"Cargando Fuente B (Estaciones) desde {ruta_archivo}...")
    
    df = spark.read \
        .option("header", "true") \
        .option("delimiter", ";") \
        .option("encoding", "ISO-8859-1") # Corrección para 'CÓDIGO'
        .csv(ruta_archivo)
        
    df_limpio = df.withColumn(
        "COORDENADA_X_ETRS89",
        regexp_replace(col("COORDENADA_X_ETRS89"), ",", ".").cast(DoubleType())
    ).withColumn(
        "COORDENADA_Y_ETRS89",
        regexp_replace(col("COORDENADA_Y_ETRS89"), ",", ".").cast(DoubleType())
    )
    
    df_seleccionado = df_limpio.select(
        col("CÓDIGO").alias("COD_ESTACION_B"),
        col("CÓDIGO_CORTO").cast(IntegerType()),
        col("COORDENADA_X_ETRS89"),
        col("COORDENADA_Y_ETRS89")
    )
    
    return df_seleccionado

# --- Funciones de Fuente C (Meteo) ---

def procesar_fuente_c(spark: SparkSession, ruta_directorio: str) -> DataFrame:
    """
    Carga todos los CSVs de 'datos/meteo/', los une, y aplica los dos pivotes
    y la ingeniería de features temporales (lag).
    """
    print(f"Procesando Fuente C (Meteo) desde {ruta_directorio}...")

    # Cargar TODOS los CSVs en el directorio de meteo
    df_raw = spark.read \
        .option("header", "true") \
        .option("delimiter", ";") \
        .option("encoding", "ISO-8859-1") \
        .csv(ruta_directorio)

    # Forzar columnas H y V a String ANTES del stack (para evitar error DATATYPE_MISMATCH)
    df_casteado = df_raw
    h_cols_v_cols = [c for c in df_raw.columns if c.startswith('H') or c.startswith('V')]
    for c_name in h_cols_v_cols:
        df_casteado = df_casteado.withColumn(c_name, col(c_name).cast(StringType()))

    # 1. Primer Pivote (Stack)
    stack_expr_list = []
    for i in range(1, 25):
        h_col = f"H{i:02d}"
        v_col = f"V{i:02d}"
        # Corrección: stack espera (constante, valor, constante, valor...)
        stack_expr_list.append(f"'{h_col}', {h_col}, {v_col}")
    
    stack_expr = f"stack(24, {', '.join(stack_expr_list)}) AS (HORA, VALOR, VALIDACION)"
    
    df_largo = df_casteado.select(
        col("ESTACION").cast(IntegerType()),
        col("MAGNITUD").cast(IntegerType()),
        col("ANO").cast(IntegerType()),
        col("MES").cast(IntegerType()),
        col("DIA").cast(IntegerType()),
        expr(stack_expr)
    )

    # Filtrar solo valores válidos ('V') y limpiar comas
    df_valido = df_largo.filter(trim(col("VALIDACION")) == "V") \
                         .withColumn("VALOR_NUM", 
                                     regexp_replace(col("VALOR"), ",", ".").cast(DoubleType())) \
                         .drop("VALOR", "VALIDACION")

    # 2. Segundo Pivote (Magnitudes)
    df_pivotado = df_valido.groupBy("ESTACION", "ANO", "MES", "DIA", "HORA") \
                           .pivot("MAGNITUD", [81, 83, 89]) \
                           .avg("VALOR_NUM") \
                           .withColumnRenamed("81", "VV_81_t=0") \
                           .withColumnRenamed("83", "T_83_t=0") \
                           .withColumnRenamed("89", "P_89_t=0")

    # 3. Ingeniería de Features Temporales (Lag)
    window_spec = Window.partitionBy("ESTACION").orderBy("ANO", "MES", "DIA", "HORA")

    df_con_lag = df_pivotado.withColumn("T_83_t-1h", lag("T_83_t=0", 1).over(window_spec)) \
                            .withColumn("VV_81_t-1h", lag("VV_81_t=0", 1).over(window_spec)) \
                            .withColumn("P_89_t-1h", lag("P_89_t=0", 1).over(window_spec)) \
                            .withColumn("T_83_t-2h", lag("T_83_t=0", 2).over(window_spec)) \
                            .withColumn("VV_81_t-2h", lag("VV_81_t=0", 2).over(window_spec)) \
                            .withColumn("P_89_t-2h", lag("P_89_t=0", 2).over(window_spec))

    # 4. Formatear Fecha y renombrar
    df_final_meteo = df_con_lag.withColumn(
        "FECHA",
        to_date(concat_ws("-", col("ANO"), col("MES"), col("DIA")), "yyyy-MM-dd")
    ).withColumnRenamed("ESTACION", "COD_ESTACION")

    # Rellenar nulos (de lags y pivots) con 0
    df_relleno = df_final_meteo.na.fill(0)
    
    return df_relleno

In [None]:
# --- EJECUTAR FASE 1 --- 
start_time = time.time()

print("--- INICIANDO FASE 1: PREPROCESAMIENTO ---")

df_a_cargado = cargar_datos_fuente_a(spark, PATH_ACCIDENTES)
df_accidentes = crear_variable_objetivo(df_a_cargado)
df_estaciones = cargar_datos_fuente_b(spark, PATH_ESTACIONES)
df_meteo = procesar_fuente_c(spark, PATH_METEO)

df_accidentes.cache()
df_estaciones.cache()
df_meteo.cache()

print(f"\n--- FASE 1 COMPLETADA en {time.time() - start_time:.2f} segundos ---")
print(f"Filas Fuente A (Accidentes): {df_accidentes.count()}")
print(f"Filas Fuente B (Estaciones): {df_estaciones.count()}")
print(f"Filas Fuente C (Meteo Procesado): {df_meteo.count()}")

## Fase 2: Cruce de Fuentes y Materialización

**Objetivo:** Aplicar el cruce espacial (A+B) y temporal (ABC) y guardar el resultado en PostgreSQL.

In [None]:
# --- Funciones de Fase 2 (Cruce y Materialización) ---

@udf(returnType=DoubleType())
def calcular_distancia_euclidiana(x1, y1, x2, y2):
    """
    UDF para calcular la distancia euclidiana.
    """
    if x1 is None or y1 is None or x2 is None or y2 is None:
        return float('inf')
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

def cruzar_fuentes(spark: SparkSession, df_accidentes: DataFrame, df_estaciones: DataFrame, df_meteo: DataFrame) -> DataFrame:
    """
    Implementa la lógica de cruce espacial (A+B) y temporal (Fase 2).
    """
    
    print("Iniciando Fase 2.1: Cruce Espacial (A+B)...")
    # 1. Cruce Espacial (A + B)
    df_distancias = df_accidentes.crossJoin(df_estaciones)
    
    df_con_dist = df_distancias.withColumn(
        "distancia",
        calcular_distancia_euclidiana(
            col("coordenada_x_utm"), col("coordenada_y_utm"),
            col("COORDENADA_X_ETRS89"), col("COORDENADA_Y_ETRS89")
        )
    )
    
    # Seleccionar la estación más cercana (rank=1)
    window_dist = Window.partitionBy("num_expediente").orderBy("distancia")
    
    df_acc_con_estacion = df_con_dist.withColumn("rank", row_number().over(window_dist)) \
                                     .filter(col("rank") == 1) \
                                     .drop("rank", "distancia")
    
    count_pre_temporal = df_acc_con_estacion.count()
    print(f"Cruce espacial completado. Filas resultantes: {count_pre_temporal}")

    # 2. Cruce Temporal (Final)
    print("Iniciando Fase 2.2: Cruce Temporal (Final)...")
    
    df_join = df_acc_con_estacion.join(
        df_meteo,
        [
            df_acc_con_estacion.fecha == df_meteo.FECHA,
            df_acc_con_estacion.hora == df_meteo.HORA,
            df_acc_con_estacion.CÓDIGO_CORTO == df_meteo.COD_ESTACION
        ],
        "left"
    )
    
    # Eliminamos las columnas de la derecha (Meteo) que causan ambigüedad
    df_final_ml = df_join \
        .drop(df_meteo.FECHA) \
        .drop(df_meteo.HORA) \
        .drop(df_meteo.COD_ESTACION)
    
    count_post_temporal = df_final_ml.count()
    print(f"Cruce temporal completado. Filas finales: {count_post_temporal}")
    
    # 3. Schema Final
    columnas_numericas = [
        "T_83_t=0", "VV_81_t=0", "P_89_t=0",
        "T_83_t-1h", "VV_81_t-1h", "P_89_t-1h",
        "T_83_t-2h", "VV_81_t-2h", "P_89_t-2h"
    ]
    columnas_categoricas = ["tipo_accidente", "distrito", "sexo", "estado_meteorológico"]
    columnas_clave = ["num_expediente", "accidente_grave"]
    
    columnas_finales = columnas_clave.copy()
    
    for c in columnas_numericas:
        if c in df_final_ml.columns:
            columnas_finales.append(c)
            
    for c in columnas_categoricas:
         if c in df_final_ml.columns:
            columnas_finales.append(c)
            
    # Rellenar nulos
    df_final_seleccionado = df_final_ml.select(columnas_finales)
    df_final_relleno = df_final_seleccionado.na.fill(0, subset=[c for c in columnas_numericas if c in columnas_finales])
    df_final_relleno = df_final_relleno.na.fill("Desconocido", subset=[c for c in columnas_categoricas if c in columnas_finales])

    return df_final_relleno

def materializar_datos(df: DataFrame, url: str, tabla: str, props: dict):
    """
    Escribe el DataFrame final en PostgreSQL.
    """
    print(f"Materializando en PostgreSQL local (tabla: {tabla})...")

    df.write \
      .format("jdbc") \
      .option("url", url) \
      .option("dbtable", tabla) \
      .option("user", props["user"]) \
      .option("password", props["password"]) \
      .option("driver", props["driver"]) \
      .option("encoding", props["encoding"]) \
      .mode("overwrite") \
      .save()
    
    print(f"Datos escritos en PostgreSQL, tabla '{tabla}'.")

In [None]:
# --- EJECUTAR FASE 2 --- 
start_time = time.time()
print("\n--- INICIANDO FASE 2: CRUCE Y MATERIALIZACIÓN ---")

df_final_ml = cruzar_fuentes(spark, df_accidentes, df_estaciones, df_meteo)

df_final_ml.cache()
print(f"Dataset final de ML generado con {df_final_ml.count()} filas.")
print("Schema final:")
df_final_ml.printSchema()

print("Mostrando 5 filas de muestra (con encoding correcto):")
df_final_ml.show(5, truncate=False)

materializar_datos(df_final_ml, DB_URL, DB_TABLE, DB_PROPS)

# Limpiar DataFrames cacheados de Fase 1
df_accidentes.unpersist()
df_estaciones.unpersist()
df_meteo.unpersist()

print(f"\n--- FASE 2 COMPLETADA en {time.time() - start_time:.2f} segundos ---")

## Fase 3: EDA (Pre-Experimento)

**Objetivo:** Analizar el dataset limpio (`df_final_ml`) antes de entrenar.

In [None]:
# --- EJECUTAR FASE 3 --- 
print("\n--- INICIANDO FASE 3: ANÁLISIS EXPLORATORIO DE DATOS (EDA) ---")

# 1. Análisis de Variable Objetivo (sensibilidad a clases)
print("Análisis de Desbalanceo:")
df_ml.groupBy('accidente_grave').count().show()

# 2. Análisis de Predictores Numéricos (histogramas, boxplots, matriz)
numeric_cols_to_plot = [c for c in df_ml.columns if c.startswith(('T_', 'VV_', 'P_'))]
df_sample_pd = df_ml.select(['accidente_grave'] + numeric_cols_to_plot).sample(fraction=0.1, seed=SEED).toPandas()

if not df_sample_pd.empty and numeric_cols_to_plot:
    # Histogramas
    df_sample_pd[numeric_cols_to_plot].hist(bins=30, figsize=(15, 5))
    plt.suptitle("Histogramas de Predictores Numéricos")
    plt.show()

    # Boxplots Bivariados
    sns.boxplot(data=df_sample_pd, x='accidente_grave', y=numeric_cols_to_plot[0])
    plt.title(f"{numeric_cols_to_plot[0]} vs. Gravedad del Accidente")
    plt.show()

    # Matriz de Correlación
    corr_matrix = df_sample_pd[numeric_cols_to_plot].corr()
    sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt='.2f')
    plt.title("Matriz de Correlación de Features Numéricas")
    plt.show()
else:
    print("No se encontraron columnas numéricas (T_, VV_, P_) o la muestra está vacía.")

# 3. Dividir Datos (Split) para Experimento
training_data, test_data = df_ml.randomSplit([0.8, 0.2], seed=SEED)
training_data.cache()
test_data.cache()
print(f"Datos divididos: {training_data.count()} para entrenamiento, {test_data.count()} para prueba final.")

print(f"\n--- FASE 3 COMPLETADA ---")

## Fase 4: Experimento Robusto (DOE)

**Objetivo:** Ejecutar el plan "Robusto Ultra-Eficiente" (18 corridas) y guardar los resultados.

In [None]:
# --- Funciones de Fase 4 (Experimento) ---

def construir_pipeline(modelo, factor_b, factor_c, factor_d):
    """
    Construye dinámicamente el pipeline de Spark ML basado en los factores del DOE.
    """
    
    # Factor D: Ventana Temporal
    numeric_features = ["T_83_t=0", "VV_81_t=0", "P_89_t=0"] # L1
    if factor_d in ["t-1h", "t-2h"]:
        numeric_features += ["T_83_t-1h", "VV_81_t-1h", "P_89_t-1h"] # L2
    if factor_d == "t-2h":
        numeric_features += ["T_83_t-2h", "VV_81_t-2h", "P_89_t-2h"] # L3
        
    categorical_features = ["tipo_accidente", "distrito", "sexo", "estado_meteorológico"]

    stages = []
    
    # Convertir target 'accidente_grave' (Int) a 'label' (Double) para Spark ML
    label_converter = col("accidente_grave").cast(DoubleType()).alias("label")
    # NOTA: Esto no es una etapa de pipeline, se aplica al DF antes del .fit()

    # Factor C: Estrategia Categórica
    feature_cols = list(numeric_features)
    
    if factor_c == "Drop":
        pass # No se añaden features categóricas
    elif factor_c == "OHE":
        indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in categorical_features]
        encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_vec") for c in categorical_features]
        stages += indexers + encoders
        feature_cols += [f"{c}_vec" for c in categorical_features]
    elif factor_c == "Hasher":
        hasher = FeatureHasher(inputCols=categorical_features, outputCol="hashed_features")
        stages.append(hasher)
        feature_cols.append("hashed_features")

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unscaled", handleInvalid="skip")
    stages.append(assembler)

    # Lógica Acoplada (Normalización)
    input_features_col = "features_unscaled"
    if modelo == "LR":
        scaler = StandardScaler(inputCol="features_unscaled", outputCol="features_scaled")
        stages.append(scaler)
        input_features_col = "features_scaled"
    
    # Modelo (Factor A)
    model_instance = None
    if modelo == "LR":
        model_instance = LogisticRegression(featuresCol=input_features_col, labelCol="label")
    elif modelo == "GBT":
        model_instance = GBTClassifier(featuresCol=input_features_col, labelCol="label", seed=SEED)
    
    stages.append(model_instance)
    return Pipeline(stages=stages), model_instance, label_converter

def ejecutar_hpo(spark, training_data_with_label, pipeline, model, modelo_str):
    """
    Etapa A: HPO Ultra-Ligera (3 trials, K=3)
    """
    evaluator = BinaryClassificationEvaluator(metricName="areaUnderPR", labelCol="label")

    def objective(trial):
        param_map = {}
        if modelo_str == "LR":
            regParam = trial.suggest_loguniform("regParam", 1e-4, 1.0)
            elasticNetParam = trial.suggest_uniform("elasticNetParam", 0.0, 1.0)
            param_map[model.regParam] = regParam
            param_map[model.elasticNetParam] = elasticNetParam
        elif modelo_str == "GBT":
            maxDepth = trial.suggest_int("maxDepth", 2, 5)
            maxIter = trial.suggest_int("maxIter", 10, 20)
            param_map[model.maxDepth] = maxDepth
            param_map[model.maxIter] = maxIter
        
        cv = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=[param_map],
            evaluator=evaluator,
            numFolds=3, # K-Fold=3
            seed=SEED
        )
        
        cv_model = cv.fit(training_data_with_label)
        return cv_model.avgMetrics[0] # Maximizar AUC-PR
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=3) # 3 trials
    return study.best_params

def medir_robustez(spark, training_data_with_label, pipeline, best_params):
    """
    Etapa B: Medición S/N (K=3, Seed Fija)
    """
    evaluator = BinaryClassificationEvaluator(metricName="areaUnderPR", labelCol="label")
    
    model_stage = pipeline.getStages()[-1]
    param_grid = ParamGridBuilder()
    
    for param, value in best_params.items():
        model_param = model_stage.getParam(param)
        param_grid = param_grid.addGrid(model_param, [value])
        
    param_map = param_grid.build()

    cv = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=param_map,
        evaluator=evaluator,
        numFolds=3, # K-Fold=3
        seed=SEED # Semilla Fija
    )
    
    cv_model = cv.fit(training_data_with_label)
    
    metrics = cv_model.avgMetrics # Esta es la lista de métricas por fold
    
    metrics_array = np.array(metrics)
    metrics_array[metrics_array <= 0] = 1e-9 # Evitar división por cero
    
    sn_ratio = -10 * np.log10(np.mean(1 / (metrics_array**2)))
    
    return sn_ratio, metrics

In [None]:
# --- EJECUTAR FASE 4 --- 
start_time = time.time()
print("\n--- INICIANDO FASE 4: EJECUCIÓN DEL EXPERIMENTO DOE ---")

# Definir los 2 Torneos L9 (3^3)
MODELOS = ["LR", "GBT"]
DESBALANCEO = ["Weights", "RUS", "None"]
CATEGORICA = ["Drop", "OHE", "Hasher"]
TEMPORAL = ["t=0", "t-1h", "t-2h"]

# Array Ortogonal L9 (3^3) - para factores B, C, D
L9 = [
  (0, 0, 0), # Run 1 (L1, L1, L1)
  (0, 1, 1), # Run 2 (L1, L2, L2)
  (0, 2, 2), # Run 3 (L1, L3, L3)
  (1, 0, 1), # Run 4 (L2, L1, L2)
  (1, 1, 2), # Run 5 (L2, L2, L3)
  (1, 2, 0), # Run 6 (L2, L3, L1)
  (2, 0, 2), # Run 7 (L3, L1, L3)
  (2, 1, 0), # Run 8 (L3, L2, L1)
  (2, 2, 1)  # Run 9 (L3, L3, L2)
]

resultados_doe = []

# Iterar sobre los dos modelos (Torneo A y B)
for MODELO in MODELOS:
    print(f"\n--- INICIANDO TORNEO PARA MODELO: {MODELO} ---")
    
    # Iterar sobre las 9 corridas L9
    for i, (b_idx, c_idx, d_idx) in enumerate(L9):
        run_id = i + 1
        B_VAL = DESBALANCEO[b_idx]
        C_VAL = CATEGORICA[c_idx]
        D_VAL = TEMPORAL[d_idx]

        print(f"\n==> Ejecutando Run {run_id}/9 para {MODELO}: B={B_VAL}, C={C_VAL}, D={D_VAL} <==")
        
        # 1. Construir pipeline base
        pipeline, model_instance, label_converter = construir_pipeline(MODELO, B_VAL, C_VAL, D_VAL)
        
        # 2. Aplicar conversión de label a los datos de training
        training_data_run = training_data.select("*", label_converter)
        
        # 3. Etapa A: HPO Ultra-Ligera (3 trials * 3 folds)
        print("--- Etapa A: HPO ---")
        best_params = ejecutar_hpo(spark, training_data_run, pipeline, model_instance, MODELO)
        print(f"Best Params: {best_params}")

        # 4. Etapa B: Medición S/N (3 folds)
        print("--- Etapa B: S/N ---")
        pipeline_sn, _, _ = construir_pipeline(MODELO, B_VAL, C_VAL, D_VAL)
        sn_ratio, fold_metrics = medir_robustez(spark, training_data_run, pipeline_sn, best_params)
        print(f"S/N Ratio: {sn_ratio:.4f}")

        # 5. Guardar resultados
        resultados_doe.append({
            "modelo": MODELO,
            "run_id": run_id,
            "desbalanceo": B_VAL,
            "categorica": C_VAL,
            "temporal": D_VAL,
            "sn_ratio": sn_ratio,
            "best_params": best_params
        })
        
        print(f"==> Run {run_id}/9 para {MODELO} completado.")

print(f"\n--- FASE 4 COMPLETADA en {time.time() - start_time:.2f} segundos ---")

# Convertir resultados a Pandas DataFrame para análisis
df_resultados = pd.DataFrame(resultados_doe)


## Fase 5: Análisis de Resultados y Duelo Final

**Objetivo:** Analizar los S/N Ratios, seleccionar los dos campeones y enfrentarlos en el `test_data`.

In [None]:
# --- EJECUTAR FASE 5 --- 
print("\n--- INICIANDO FASE 5: ANÁLISIS Y DUELO FINAL ---")

# 1. Análisis Torneo A (LR)
print("\n--- Resultados Torneo LR ---")
df_lr = df_resultados[df_resultados["modelo"] == "LR"]
if not df_lr.empty:
    print(df_lr.sort_values("sn_ratio", ascending=False).to_markdown(index=False))
    
    # Gráfico de Efectos Principales para LR
    fig, axes = plt.subplots(1, 3, figsize=(20, 5))
    sns.pointplot(data=df_lr, x="desbalanceo", y="sn_ratio", ax=axes[0], order=DESBALANCEO).set_title("Factor B (Desbalanceo) - LR")
    sns.pointplot(data=df_lr, x="categorica", y="sn_ratio", ax=axes[1], order=CATEGORICA).set_title("Factor C (Categórica) - LR")
    sns.pointplot(data=df_lr, x="temporal", y="sn_ratio", ax=axes[2], order=TEMPORAL).set_title("Factor D (Temporal) - LR")
    plt.suptitle("Gráficos de Efectos Principales (S/N Ratio) para Logistic Regression", fontsize=16)
    plt.show()
    
    # Identificar Campeón A
    campeon_a_run = df_lr.loc[df_lr['sn_ratio'].idxmax()]
    print(f"\nCampeón Robusto A (LR) (Run ID: {campeon_a_run['run_id']}):\n{campeon_a_run}")
else:
    print("No se encontraron resultados para el Torneo LR.")

# 2. Análisis Torneo B (GBT)
print("\n--- Resultados Torneo GBT ---")
df_gbt = df_resultados[df_resultados["modelo"] == "GBT"]
if not df_gbt.empty:
    print(df_gbt.sort_values("sn_ratio", ascending=False).to_markdown(index=False))
    
    # Gráfico de Efectos Principales para GBT
    fig, axes = plt.subplots(1, 3, figsize=(20, 5))
    sns.pointplot(data=df_gbt, x="desbalanceo", y="sn_ratio", ax=axes[0], order=DESBALANCEO).set_title("Factor B (Desbalanceo) - GBT")
    sns.pointplot(data=df_gbt, x="categorica", y="sn_ratio", ax=axes[1], order=CATEGORICA).set_title("Factor C (Categórica) - GBT")
    sns.pointplot(data=df_gbt, x="temporal", y="sn_ratio", ax=axes[2], order=TEMPORAL).set_title("Factor D (Temporal) - GBT")
    plt.suptitle("Gráficos de Efectos Principales (S/N Ratio) para GBT Classifier", fontsize=16)
    plt.show()
    
    # Identificar Campeón B
    campeon_b_run = df_gbt.loc[df_gbt['sn_ratio'].idxmax()]
    print(f"\nCampeón Robusto B (GBT) (Run ID: {campeon_b_run['run_id']}):\n{campeon_b_run}")
else:
    print("No se encontraron resultados para el Torneo GT.")

In [None]:
# --- 3. Duelo Final --- 
print("\n--- Duelo Final en Test Data ---")

# Preparar datos (aplicar conversión de label)
_, _, label_converter = construir_pipeline("LR", "None", "Drop", "t=0") # Solo para obtener el 'label_converter'
training_data_final = training_data.select("*", label_converter)
test_data_final = test_data.select("*", label_converter)

# Re-construir y Entrenar Campeón A (LR)
if 'campeon_a_run' in locals():
    pipeline_a, _ = construir_pipeline(
        'LR', 
        campeon_a_run["desbalanceo"],
        campeon_a_run["categorica"],
        campeon_a_run["temporal"]
    )
    pipeline_a.getStages()[-1].setParams(**campeon_a_run["best_params"])
    
    print("Entrenando modelo final A (LR)...")
    modelo_final_a = pipeline_a.fit(training_data_final)
    predicciones_a = modelo_final_a.transform(test_data_final)
else:
    print("Campeón A no definido. Saltando duelo.")

# Re-construir y Entrenar Campeón B (GBT)
if 'campeon_b_run' in locals():
    pipeline_b, _ = construir_pipeline(
        'GBT', 
        campeon_b_run["desbalanceo"],
        campeon_b_run["categorica"],
        campeon_b_run["temporal"]
    )
    pipeline_b.getStages()[-1].setParams(**campeon_b_run["best_params"])
    
    print("Entrenando modelo final B (GBT)...")
    modelo_final_b = pipeline_b.fit(training_data_final)
    predicciones_b = modelo_final_b.transform(test_data_final)
else:
    print("Campeón B no definido. Saltando duelo.")

# 4. Evaluación Final (AUC-PR y Recall)
evaluator_pr = BinaryClassificationEvaluator(metricName="areaUnderPR", labelCol="label")
evaluator_recall1 = MulticlassClassificationEvaluator(metricName="recallByLabel", metricLabel=1.0, labelCol="label")

if 'predicciones_a' in locals():
    auc_pr_a = evaluator_pr.evaluate(predicciones_a)
    recall_a = evaluator_recall1.evaluate(predicciones_a)
    print(f"\nModelo A (LR) - AUC-PR Final: {auc_pr_a:.4f} | Recall (Clase 1): {recall_a:.4f}")

if 'predicciones_b' in locals():
    auc_pr_b = evaluator_pr.evaluate(predicciones_b)
    recall_b = evaluator_recall1.evaluate(predicciones_b)
    print(f"Modelo B (GBT) - AUC-PR Final: {auc_pr_b:.4f} | Recall (Clase 1): {recall_b:.4f}")

# 5. Matriz de Confusión del Ganador
ganador_preds = None
ganador_nombre = "N/A"

if 'auc_pr_b' in locals() and auc_pr_b > auc_pr_a:
    ganador_preds = predicciones_b
    ganador_nombre = "GBT Classifier"
elif 'auc_pr_a' in locals():
    ganador_preds = predicciones_a
    ganador_nombre = "Logistic Regression"

if ganador_preds:
    preds_and_labels = ganador_preds.select("prediction", "label").rdd.map(lambda r: (float(r.prediction), float(r.label)))
    metrics = MulticlassMetrics(preds_and_labels)
    
    print(f"\nMatriz de Confusión ({ganador_nombre}) en Test Data:")
    print(metrics.confusionMatrix().toArray())
else:
    print("No se pudo determinar un modelo ganador para la matriz de confusión.")

# 6. Conclusión Final
print("\n--- Conclusión Final ---")
if 'auc_pr_a' not in locals() and 'auc_pr_b' not in locals():
    print("No se completaron las ejecuciones para determinar un ganador.")
elif auc_pr_b > auc_pr_a:
    print("El modelo GBT Classifier es el ganador.")
    print(f"Justificación: Demostró la arquitectura más robusta (S/N Ratio: {campeon_b_run['sn_ratio']:.2f}) en el DOE \n y logró el mejor rendimiento (AUC-PR: {auc_pr_b:.4f}) en el Test Data final.")
else:
    print("El modelo Logistic Regression es el ganador.")
    print(f"Justificación: Demostró la arquitectura más robusta (S/N Ratio: {campeon_a_run['sn_ratio']:.2f}) en el DOE \n y logró el mejor rendimiento (AUC-PR: {auc_pr_a:.4f}) en el Test Data final.")
