# Modelado con Optimización de Hiperparámetros

Este script realiza la optimización de hiperparámetros usando Optuna para múltiples modelos y métricas, selecciona el mejor modelo y lo guarda para producción.

In [None]:
# -*- coding: utf-8 -*-

import os
import warnings
import joblib
import optuna
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score, recall_score, matthews_corrcoef
from sklearn.model_selection import StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, HistGradientBoostingClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from imblearn.over_sampling import SMOTE

# Importar configuración y utilidades compartidas
from config import (
    CSV_ENTRENAR, CSV_VALIDAR, MODELO_PRELIMINAR, MODELO_FINAL,
    RANDOM_STATE, THRESHOLDS, N_TRIALS_OPTUNA, MODELOS_DISPONIBLES,
    UMBRAL_CICLOS_DEFAULT, PESO_FALSOS_POSITIVOS, METRICAS,
    configurar_logging
)
from utils import crear_derivadas, calcular_score_balanceado, evaluar_con_umbral

warnings.filterwarnings('ignore')
optuna.logging.set_verbosity(optuna.logging.WARNING)

# Configurar logger
logger = configurar_logging(__name__)

## Carga del Umbral Óptimo desde Modelo Preliminar

In [None]:
# ================================================================================
# CARGA DEL UMBRAL DESDE MODELO PRELIMINAR
# ================================================================================

# Intentar cargar el umbral óptimo del modelo preliminar
if os.path.exists(MODELO_PRELIMINAR):
    config_preliminar = joblib.load(MODELO_PRELIMINAR)
    UMBRAL_OPTIMO = config_preliminar.get('umbral_ciclos', UMBRAL_CICLOS_DEFAULT)
    logger.info(f"Umbral cargado desde modelo preliminar: {UMBRAL_OPTIMO}")
else:
    UMBRAL_OPTIMO = UMBRAL_CICLOS_DEFAULT
    logger.warning(f"Modelo preliminar no encontrado. Usando umbral por defecto: {UMBRAL_OPTIMO}")

# Modelos base a optimizar
MODELOS_BASE = MODELOS_DISPONIBLES

# Métricas a evaluar
METRICA_1 = 'f1'
METRICA_2 = 'recall'
METRICA_3 = 'mcc'
METRICA_PERSONALIZADA = 'Score Propio'

## Funciones de Optimización con Optuna

In [None]:
# ================================================================================
# FUNCIONES DE OPTIMIZACIÓN CON OPTUNA
# ================================================================================

def obtener_parametros_modelo(trial, model_name):
    """
    Obtiene los parámetros a optimizar para cada modelo.
    
    Args:
        trial: Objeto trial de Optuna
        model_name: Nombre del modelo
        
    Returns:
        Tupla (params, model_class)
    """
    if model_name == 'XGBoost':
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 100, 500),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
            'scale_pos_weight': trial.suggest_int('scale_pos_weight', 5, 30),
            'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
            'random_state': RANDOM_STATE,
            'eval_metric': 'aucpr',
            'n_jobs': -1
        }
        return params, XGBClassifier
    
    elif model_name == 'LightGBM':
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 100, 500),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
            'min_child_samples': trial.suggest_int('min_child_samples', 3, 20),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
            'class_weight': 'balanced',
            'random_state': RANDOM_STATE,
            'verbose': -1,
            'n_jobs': -1
        }
        return params, LGBMClassifier
    
    elif model_name == 'RandomForest':
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 100, 500),
            'max_depth': trial.suggest_int('max_depth', 5, 15),
            'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
            'min_samples_split': trial.suggest_int('min_samples_split', 2, 15),
            'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]),
            'class_weight': 'balanced_subsample',
            'random_state': RANDOM_STATE,
            'n_jobs': -1
        }
        return params, RandomForestClassifier
    
    elif model_name == 'ExtraTrees':
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 100, 500),
            'max_depth': trial.suggest_int('max_depth', 5, 20),
            'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
            'min_samples_split': trial.suggest_int('min_samples_split', 2, 15),
            'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]),
            'class_weight': 'balanced_subsample',
            'random_state': RANDOM_STATE,
            'n_jobs': -1
        }
        return params, ExtraTreesClassifier
    
    elif model_name == 'HistGradientBoosting':
        params = {
            'max_iter': trial.suggest_int('max_iter', 100, 500),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
            'max_leaf_nodes': trial.suggest_int('max_leaf_nodes', 15, 63),
            'l2_regularization': trial.suggest_float('l2_regularization', 1e-6, 1.0, log=True),
            'class_weight': 'balanced',
            'random_state': RANDOM_STATE
        }
        return params, HistGradientBoostingClassifier
    
    elif model_name == 'CatBoost':
        params = {
            'iterations': trial.suggest_int('iterations', 100, 500),
            'depth': trial.suggest_int('depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
            'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1e-3, 10.0, log=True),
            'allow_writing_files': False,
            'auto_class_weights': 'Balanced',
            'random_state': RANDOM_STATE,
            'verbose': 0,
            'thread_count': -1
        }
        return params, CatBoostClassifier

In [None]:
def optimizar_modelo_balanceado(model_name, X_train_full, y_train_full, y_ciclos_full, umbral, cv_splits=5):
    """
    Crea función objetivo para optimizar usando el score personalizado.
    
    Args:
        model_name: Nombre del modelo
        X_train_full: Features de entrenamiento
        y_train_full: Labels de entrenamiento
        y_ciclos_full: Ciclos de entrenamiento
        umbral: Umbral de ciclos
        cv_splits: Número de folds para CV
        
    Returns:
        Función objetivo para Optuna
    """
    def objetivo(trial):
        params, model_class = obtener_parametros_modelo(trial, model_name)
        
        scores_cv = []
        cv = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=RANDOM_STATE)
        X_val_np = X_train_full.to_numpy() if isinstance(X_train_full, pd.DataFrame) else X_train_full
        y_val_np = y_train_full
        columnas = X_train_full.columns if isinstance(X_train_full, pd.DataFrame) else None

        for train_idx, val_idx in cv.split(X_train_full, y_train_full):
            X_fold_train = X_val_np[train_idx]
            y_fold_train = y_val_np[train_idx]
            X_fold_val = X_val_np[val_idx]
            y_fold_val = y_val_np[val_idx]
            y_fold_ciclos = y_ciclos_full[val_idx]

            # Aplicar SMOTE
            k_neighbors = min(3, y_fold_train.sum() - 1)
            smote = SMOTE(random_state=RANDOM_STATE, k_neighbors=k_neighbors)

            try:
                X_fold_train_bal, y_fold_train_bal = smote.fit_resample(X_fold_train, y_fold_train)
            except ValueError:
                X_fold_train_bal, y_fold_train_bal = X_fold_train, y_fold_train

            model = model_class(**params)

            if columnas is not None:
                X_fold_train_bal = pd.DataFrame(X_fold_train_bal, columns=columnas)
                X_fold_val_df = pd.DataFrame(X_fold_val, columns=columnas)
            else:
                X_fold_val_df = X_fold_val

            model.fit(X_fold_train_bal, y_fold_train_bal)
            probas_val = model.predict_proba(X_fold_val_df)[:, 1]
            
            # Buscar mejor threshold
            mejor_score_fold = float('-inf')
            thresholds_opt = np.arange(0.1, 0.91, 0.1)

            for th in thresholds_opt:
                res = evaluar_con_umbral(y_fold_val, probas_val, y_fold_ciclos, umbral, th)
                if res:
                    sc = calcular_score_balanceado(res['pct_membranas'], res['FP'], res['membranas_total'], PESO_FALSOS_POSITIVOS)
                    if sc > mejor_score_fold:
                        mejor_score_fold = sc

            scores_cv.append(mejor_score_fold if mejor_score_fold > float('-inf') else -100)

        return np.mean(scores_cv)

    return objetivo


def optimizar_modelo_metricas(model_name, X_train_full, y_train_full, metric, cv_splits=5):
    """
    Crea función objetivo para optimizar usando métricas estándar.
    
    Args:
        model_name: Nombre del modelo
        X_train_full: Features de entrenamiento
        y_train_full: Labels de entrenamiento
        metric: Métrica a optimizar ('f1', 'recall', 'mcc')
        cv_splits: Número de folds para CV
        
    Returns:
        Función objetivo para Optuna
    """
    def objetivo(trial):
        params, model_class = obtener_parametros_modelo(trial, model_name)
        
        scores_cv = []
        cv = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=RANDOM_STATE)
        X_val_np = X_train_full.to_numpy() if isinstance(X_train_full, pd.DataFrame) else X_train_full
        y_val_np = y_train_full
        columnas_f = X_train_full.columns if isinstance(X_train_full, pd.DataFrame) else None

        for train_idx, val_idx in cv.split(X_train_full, y_train_full):
            X_fold_train, y_fold_train = X_val_np[train_idx], y_val_np[train_idx]
            X_fold_val, y_fold_val = X_val_np[val_idx], y_val_np[val_idx]
            
            # Aplicar SMOTE
            k_n = min(3, y_fold_train.sum() - 1)
            smote = SMOTE(random_state=RANDOM_STATE, k_neighbors=k_n)

            try:
                X_fold_train_bal, y_fold_train_bal = smote.fit_resample(X_fold_train, y_fold_train)
            except ValueError:
                X_fold_train_bal, y_fold_train_bal = X_fold_train, y_fold_train

            model = model_class(**params)

            if columnas_f is not None:
                X_fold_train_bal = pd.DataFrame(X_fold_train_bal, columns=columnas_f)
                X_fold_val_df = pd.DataFrame(X_fold_val, columns=columnas_f)
            else:
                X_fold_val_df = X_fold_val

            model.fit(X_fold_train_bal, y_fold_train_bal)

            y_pred = model.predict(X_fold_val_df)
            
            if metric == 'f1':
                score = f1_score(y_fold_val, y_pred)
            elif metric == 'recall':
                score = recall_score(y_fold_val, y_pred)
            elif metric == 'mcc':
                score = matthews_corrcoef(y_fold_val, y_pred)
            else:
                score = 0

            scores_cv.append(score)

        return np.mean(scores_cv)

    return objetivo

In [None]:
def crear_modelo(model_name, best_params):
    """
    Crea un modelo con los mejores parámetros encontrados.
    
    Args:
        model_name: Nombre del modelo
        best_params: Diccionario con los mejores parámetros
        
    Returns:
        Instancia del modelo configurado
    """
    if model_name == 'XGBoost':
        return XGBClassifier(**best_params, random_state=RANDOM_STATE, eval_metric='aucpr', n_jobs=-1)
    elif model_name == 'LightGBM':
        return LGBMClassifier(**best_params, class_weight='balanced', random_state=RANDOM_STATE, verbose=-1, n_jobs=-1)
    elif model_name == 'RandomForest':
        return RandomForestClassifier(**best_params, class_weight='balanced_subsample', random_state=RANDOM_STATE, n_jobs=-1)
    elif model_name == 'ExtraTrees':
        return ExtraTreesClassifier(**best_params, class_weight='balanced_subsample', random_state=RANDOM_STATE, n_jobs=-1)
    elif model_name == 'HistGradientBoosting':
        return HistGradientBoostingClassifier(**best_params, class_weight='balanced', random_state=RANDOM_STATE)
    elif model_name == 'CatBoost':
        return CatBoostClassifier(**best_params, allow_writing_files=False, auto_class_weights='Balanced', 
                                   random_state=RANDOM_STATE, verbose=0, thread_count=-1)

## Carga y Preparación de Datos

In [None]:
# ================================================================================
# CARGA Y PREPARACIÓN DE DATOS
# ================================================================================

logger.info("Cargando datos...")

df_train = pd.read_csv(CSV_ENTRENAR)
df_test = pd.read_csv(CSV_VALIDAR)

# Eliminar columna de índice
for df in [df_train, df_test]:
    if 'Indice' in df.columns:
        df.drop(columns=['Indice'], inplace=True)

# Aplicar ingeniería de características
df_train_der = crear_derivadas(df_train)
df_test_der = crear_derivadas(df_test)

# Separar variables
columnas = [c for c in df_train_der.columns if c != 'Ciclos']

X_train = df_train_der[columnas]
X_test = df_test_der[columnas]
y_train_ciclos = df_train['Ciclos'].values
y_test_ciclos = df_test['Ciclos'].values

# Escalar variables (scaler solo de entrenamiento)
scaler = StandardScaler()
X_train_esc = pd.DataFrame(scaler.fit_transform(X_train), columns=columnas)
X_test_esc = pd.DataFrame(scaler.transform(X_test), columns=columnas)

# Aplicar umbral y SMOTE
y_train_umbral = (y_train_ciclos < UMBRAL_OPTIMO).astype(int)
y_test_umbral = (y_test_ciclos < UMBRAL_OPTIMO).astype(int)

k_neighbors = min(3, y_train_umbral.sum() - 1)
smote_final = SMOTE(random_state=RANDOM_STATE, k_neighbors=k_neighbors)

try:
    X_train_res_final, y_train_bal_final = smote_final.fit_resample(X_train_esc, y_train_umbral)
    X_train_bal_final = pd.DataFrame(X_train_res_final, columns=columnas)
except ValueError as e:
    logger.warning(f"SMOTE falló: {e}")
    X_train_bal_final, y_train_bal_final = X_train_esc, y_train_umbral

logger.info(f"Datos cargados: {len(df_train)} train, {len(df_test)} test")

## Optimización de Hiperparámetros

In [None]:
# ================================================================================
# FUNCIÓN DE ORQUESTACIÓN
# ================================================================================

def correr_optimizacion(tipo, metrica_nombre):
    """
    Orquesta la optimización para todos los modelos.
    
    Args:
        tipo: 'balanceado' o 'estandar'
        metrica_nombre: Nombre de la métrica
        
    Returns:
        Diccionario con resultados por modelo
    """
    res_modelos = {}
    print(f"\nOptimizando para métrica: {metrica_nombre}")

    for nombre in MODELOS_BASE:
        print(f"  {nombre}...", end=" ")

        if tipo == 'balanceado':
            objetivo = optimizar_modelo_balanceado(nombre, X_train_esc, y_train_umbral, y_train_ciclos, UMBRAL_OPTIMO, cv_splits=5)
        else:
            objetivo = optimizar_modelo_metricas(nombre, X_train_esc, y_train_umbral, metrica_nombre, cv_splits=5)

        estudio = optuna.create_study(
            direction='maximize',
            sampler=optuna.samplers.TPESampler(seed=RANDOM_STATE)
        )
        estudio.optimize(objetivo, n_trials=N_TRIALS_OPTUNA, show_progress_bar=False)

        mejores_parametros = estudio.best_params
        mejor_score_cv = estudio.best_value
        mejor_modelo = crear_modelo(nombre, mejores_parametros)
        mejor_modelo.fit(X_train_bal_final, y_train_bal_final)

        res_modelos[nombre] = {
            'modelo': mejor_modelo,
            'parametros': mejores_parametros,
            'cv_score': mejor_score_cv
        }

        print(f"Score: {mejor_score_cv:.4f}")
        
    return res_modelos

In [None]:
# ================================================================================
# EJECUTAR OPTIMIZACIÓN
# ================================================================================

logger.info("Iniciando exploración de hiperparámetros...")
print("="*50)

modelos_1 = correr_optimizacion('estandar', METRICA_1)
modelos_2 = correr_optimizacion('estandar', METRICA_2)
modelos_3 = correr_optimizacion('estandar', METRICA_3)
modelos_pers = correr_optimizacion('balanceado', METRICA_PERSONALIZADA)

# Consolidar resultados
mejores_modelos = {
    **{f"{k}_{METRICA_1}": v for k, v in modelos_1.items()},
    **{f"{k}_{METRICA_2}": v for k, v in modelos_2.items()},
    **{f"{k}_{METRICA_3}": v for k, v in modelos_3.items()},
    **{f"{k}_{METRICA_PERSONALIZADA}": v for k, v in modelos_pers.items()}
}

print("\nExploración terminada.")

## Evaluación de Resultados

In [None]:
# ================================================================================
# EVALUACIÓN EN TEST
# ================================================================================

print("Resultados en conjunto de validación:")
print("="*50)

resultados_optimizados = []
probas_test = {}
mejor_config_global = None
mejor_score_global = float('-inf')
mejor_nombre_global = None

for nombre, info in mejores_modelos.items():
    clf = info['modelo']
    y_proba = clf.predict_proba(X_test_esc)[:, 1]
    probas_test[nombre] = y_proba
    mejor_config = None
    mejor_score = float('-inf')

    for th in THRESHOLDS:
        resultado_threshold = evaluar_con_umbral(y_test_umbral, y_proba, y_test_ciclos, UMBRAL_OPTIMO, th)
        if resultado_threshold:
            resultado_threshold['modelo'] = nombre
            resultados_optimizados.append(resultado_threshold)

            score = calcular_score_balanceado(
                resultado_threshold['pct_membranas'], 
                resultado_threshold['FP'], 
                resultado_threshold['membranas_total'],
                PESO_FALSOS_POSITIVOS
            )
            if score > mejor_score:
                mejor_score = score
                mejor_config = resultado_threshold
                mejor_config['score_balanceado'] = score

    if mejor_config:
        print(f"{nombre}: th={mejor_config['threshold']:.2f} | "
              f"Membranas={mejor_config['membranas_detectadas']}/{mejor_config['membranas_total']} | "
              f"FP={mejor_config['FP']} | Score={mejor_config['score_balanceado']:.2f}")
        
        # Guardar el mejor global
        if mejor_score > mejor_score_global:
            mejor_score_global = mejor_score
            mejor_config_global = mejor_config
            mejor_nombre_global = nombre

## Entrenamiento del Modelo Final

In [None]:
# ================================================================================
# ENTRENAMIENTO CON TODOS LOS DATOS
# ================================================================================

print(f"\nMejor modelo: {mejor_nombre_global} (Score: {mejor_score_global:.2f})")
print("Entrenando modelo final con todos los datos...")

# Obtener parámetros del mejor modelo
params_ganadores = mejores_modelos[mejor_nombre_global]['parametros']
nombre_modelo_base = mejor_nombre_global.split('_')[0]  # Extraer nombre base (ej: 'ExtraTrees')

# Unir datasets
X_full = pd.concat([X_train, X_test], axis=0).reset_index(drop=True)
y_full_ciclos = np.concatenate([y_train_ciclos, y_test_ciclos])
y_full_umbral = (y_full_ciclos < UMBRAL_OPTIMO).astype(int)

# Reescalar con SOLO datos de entrenamiento (scaler ya ajustado)
# NOTA: Usamos el scaler original para mantener consistencia
X_full_esc = pd.DataFrame(scaler.transform(X_full), columns=columnas)

# SMOTE
k_neighbors = min(3, y_full_umbral.sum() - 1)
smote_full = SMOTE(random_state=RANDOM_STATE, k_neighbors=k_neighbors)

try:
    X_full_bal, y_full_bal = smote_full.fit_resample(X_full_esc, y_full_umbral)
    X_full_bal = pd.DataFrame(X_full_bal, columns=columnas)
except ValueError as e:
    logger.warning(f"SMOTE falló en entrenamiento final: {e}")
    X_full_bal, y_full_bal = X_full_esc, y_full_umbral

# Crear y entrenar modelo final
modelo_final = crear_modelo(nombre_modelo_base, params_ganadores)
modelo_final.fit(X_full_bal, y_full_bal)

logger.info("Modelo final entrenado.")

## Guardado del Modelo

In [None]:
# ================================================================================
# GUARDADO DEL MODELO
# ================================================================================

config_produccion = {
    'umbral_ciclos': UMBRAL_OPTIMO,
    'threshold': mejor_config_global['threshold'],  # Clave consistente (singular)
    'scaler': scaler,  # Scaler ajustado solo con datos de entrenamiento
    'modelo': modelo_final,        
    'feature_cols': columnas,
    'nombre_modelo': nombre_modelo_base
}

# Crear directorio si no existe
os.makedirs(os.path.dirname(MODELO_FINAL), exist_ok=True)

# Guardar modelo
joblib.dump(config_produccion, MODELO_FINAL)

logger.info(f"Modelo guardado en: {MODELO_FINAL}")
print("\n" + "="*50)
print("MODELADO COMPLETADO")
print("="*50)