<a href="https://colab.research.google.com/github/adrian22c/Proyecto-CPD/blob/main/Pipline_Proyecto_CPD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install lightgbm dask-ml xgboost seaborn -q

In [None]:
import os, warnings, datetime, json, joblib, time, psutil, gc
warnings.filterwarnings("ignore")

import pandas as pd, numpy as np
import matplotlib.pyplot as plt, seaborn as sns
from matplotlib.patches import Patch
import dask.dataframe as dd
from dask.distributed import Client
from dask.diagnostics import ResourceProfiler, Profiler, CacheProfiler, ProgressBar
from dask_ml.preprocessing import StandardScaler as DaskStandardScaler
from dask_ml.model_selection import GridSearchCV as DaskGridSearchCV
from lightgbm import LGBMRegressor
from sklearn.feature_selection import mutual_info_regression
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.preprocessing import StandardScaler as SKScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge
import xgboost as xgb

# Configuración de visualización
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette("husl")
plt.rcParams['figure.dpi'] = 300
plt.rcParams['savefig.dpi'] = 300
plt.rcParams['font.size'] = 10

# -----------------------------------------------------------
# 1. CONFIGURACIÓN Y LOGGING
# -----------------------------------------------------------
class ExperimentLogger:
    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.log_file = f"{base_dir}/experiment_log.txt"
        self.start_time = time.time()

    def log(self, message):
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_msg = f"[{timestamp}] {message}"
        print(log_msg)
        with open(self.log_file, 'a') as f:
            f.write(log_msg + "\n")

    def log_metrics(self, title, metrics_dict):
        self.log(f"\n=== {title} ===")
        for key, value in metrics_dict.items():
            if isinstance(value, float):
                self.log(f"{key}: {value:.6f}")
            else:
                self.log(f"{key}: {value}")

BASE = '/content/drive/MyDrive/CMAPSSData'
RESULTS_DIR = f'{BASE}/paper_results'
CLEAN = f'{BASE}/clean'
FEAT_DIR = f'{BASE}/features'
MODEL_DIR = f'{BASE}/models'
PLOTS_DIR = f'{RESULTS_DIR}/plots'

for d in [RESULTS_DIR, CLEAN, FEAT_DIR, MODEL_DIR, PLOTS_DIR]:
    os.makedirs(d, exist_ok=True)

logger = ExperimentLogger(RESULTS_DIR)
logger.log("=== INICIANDO EXPERIMENTO CIENTÍFICO ===")

# -----------------------------------------------------------
# 2. LECTURA Y PREPROCESAMIENTO
# -----------------------------------------------------------
def load_and_preprocess_data():
    logger.log("Iniciando carga y preprocesamiento de datos...")

    COLS = ['unit_number','time_in_cycles'] + \
           [f'operational_setting_{i}' for i in range(1,4)] + \
           [f'sensor_measurement_{i}' for i in range(1,22)]

    # Cargar datos con manejo de errores
    try:
        train = dd.read_csv(f'{BASE}/train_FD001.txt', header=None,
                           delim_whitespace=True, names=COLS)
        test = dd.read_csv(f'{BASE}/test_FD001.txt', header=None,
                          delim_whitespace=True, names=COLS)
        rul = pd.read_csv(f'{BASE}/RUL_FD001.txt', header=None, names=['RUL'])

        logger.log(f"Datos cargados - Train: {len(train)} filas, Test: {len(test)} filas")

    except Exception as e:
        logger.log(f"Error cargando datos: {e}")
        return None, None, None

    # Calcular RUL
    train = train.assign(RUL=train.groupby('unit_number')['time_in_cycles'].transform('max') - train['time_in_cycles'])

    # Análisis de datos constantes
    sample = train.sample(frac=0.1).compute()  # Muestra más grande
    const_cols = []
    near_const_cols = []

    for col in sample.columns:
        if col in ['unit_number', 'time_in_cycles', 'RUL']:
            continue
        unique_vals = sample[col].nunique()
        unique_ratio = unique_vals / len(sample)

        if unique_vals <= 1:
            const_cols.append(col)
        elif unique_ratio < 0.01:  # Menos del 1% de valores únicos
            near_const_cols.append(col)

    logger.log(f"Columnas constantes encontradas: {len(const_cols)}")
    logger.log(f"Columnas casi constantes encontradas: {len(near_const_cols)}")

    # Eliminar columnas problemáticas
    cols_to_drop = const_cols + near_const_cols
    if cols_to_drop:
        train = train.drop(columns=cols_to_drop)
        test = test.drop(columns=cols_to_drop)
        logger.log(f"Eliminadas {len(cols_to_drop)} columnas problemáticas")

    # Guardar datos limpios
    train.to_parquet(f'{CLEAN}/train_FD001_clean.parquet')
    test.to_parquet(f'{CLEAN}/test_FD001_clean.parquet')
    rul.to_csv(f'{CLEAN}/rul_FD001.csv', index=False)

    return train, test, rul

# -----------------------------------------------------------
# 3. INGENIERÍA DE CARACTERÍSTICAS
# -----------------------------------------------------------
def advanced_feature_engineering(train, test, rul):
    logger.log("Iniciando ingeniería de características avanzada...")

    # Asegurar que trabajamos con pandas DataFrames para simplicidad
    if hasattr(train, 'compute'):
        # Es Dask DataFrame, convertir a pandas
        logger.log("Convirtiendo Dask DataFrames a pandas...")
        train_pd = train.compute()
        test_pd = test.compute()
    else:
        # Ya es pandas DataFrame
        logger.log("Trabajando con pandas DataFrames...")
        train_pd = train
        test_pd = test

    # Selección de sensores basada en información mutua
    sensor_cols = [c for c in train_pd.columns if 'sensor_measurement' in c]
    sample = train_pd.sample(frac=0.2)  # Muestra más representativa (sin .compute())

    # Calcular información mutua
    mi_scores = mutual_info_regression(sample[sensor_cols], sample['RUL'], random_state=42)
    mi_df = pd.DataFrame({'sensor': sensor_cols, 'mi_score': mi_scores}).sort_values('mi_score', ascending=False)

    # Seleccionar top sensores y guardar análisis
    threshold = np.percentile(mi_scores, 70)  # Top 30% de sensores
    selected_sensors = mi_df[mi_df['mi_score'] > threshold]['sensor'].tolist()

    logger.log(f"Sensores seleccionados: {len(selected_sensors)} de {len(sensor_cols)}")
    mi_df.to_csv(f'{FEAT_DIR}/mutual_information_analysis.csv', index=False)

    # Crear visualización de MI
    plt.figure(figsize=(12, 6))
    plt.bar(range(len(mi_df)), mi_df['mi_score'])
    plt.axhline(y=threshold, color='r', linestyle='--', label=f'Threshold ({threshold:.3f})')
    plt.xlabel('Sensores (ordenados por MI)')
    plt.ylabel('Información Mutua')
    plt.title('Análisis de Información Mutua - Selección de Sensores')
    plt.legend()
    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/mutual_information_analysis.png', bbox_inches='tight')
    plt.close()

    # Ingeniería de características con múltiples ventanas
    WINDOWS = [3, 5, 10, 15]

    def create_advanced_features(df, sensors, with_rul=True):
        df = df.sort_values(['unit_number', 'time_in_cycles'])
        base_cols = ['unit_number', 'time_in_cycles'] + (['RUL'] if with_rul else [])
        result = df[base_cols].copy()

        for sensor in sensors:
            if sensor not in df.columns:
                continue

            # Características básicas
            result[f'{sensor}_raw'] = df[sensor]

            # Características de ventana móvil
            for window in WINDOWS:
                grouped = df.groupby('unit_number')[sensor]

                # Estadísticas de ventana
                result[f'{sensor}_mean_w{window}'] = grouped.rolling(window, min_periods=1).mean().reset_index(level=0, drop=True)
                result[f'{sensor}_std_w{window}'] = grouped.rolling(window, min_periods=1).std().reset_index(level=0, drop=True)
                result[f'{sensor}_min_w{window}'] = grouped.rolling(window, min_periods=1).min().reset_index(level=0, drop=True)
                result[f'{sensor}_max_w{window}'] = grouped.rolling(window, min_periods=1).max().reset_index(level=0, drop=True)

                # Tendencias
                result[f'{sensor}_trend_w{window}'] = grouped.diff(window).reset_index(level=0, drop=True)

            # Características de degradación
            result[f'{sensor}_cumsum'] = grouped.cumsum().reset_index(level=0, drop=True)
            result[f'{sensor}_diff'] = grouped.diff().fillna(0).reset_index(level=0, drop=True)

        return result.fillna(0)

    # Aplicar ingeniería de características usando pandas directamente
    logger.log("Aplicando transformaciones de características...")

    train_features = create_advanced_features(train_pd, selected_sensors, True)
    test_features = create_advanced_features(test_pd, selected_sensors, False)

    # Agregar RUL real al conjunto de prueba
    rul.index += 1
    rul_dict = rul['RUL'].to_dict()
    max_cycles = test_features.groupby('unit_number')['time_in_cycles'].transform('max')
    test_features['RUL'] = test_features['unit_number'].map(rul_dict) - (max_cycles - test_features['time_in_cycles'])

    # Guardar características
    train_features.to_parquet(f'{FEAT_DIR}/train_features_advanced.parquet')
    test_features.to_parquet(f'{FEAT_DIR}/test_features_advanced.parquet')

    logger.log(f"Características creadas - Train: {train_features.shape}, Test: {test_features.shape}")

    return train_features, test_features, selected_sensors

# -----------------------------------------------------------
# 4. ANÁLISIS EXPLORATORIO DE DATOS (EDA)
# -----------------------------------------------------------
def comprehensive_eda(train_features, selected_sensors):
    logger.log("Realizando análisis exploratorio de datos...")

    # 1. Distribución de RUL
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.hist(train_features['RUL'], bins=50, alpha=0.7, edgecolor='black')
    plt.xlabel('RUL (ciclos)')
    plt.ylabel('Frecuencia')
    plt.title('Distribución de RUL')

    plt.subplot(1, 3, 2)
    train_features.groupby('unit_number')['RUL'].max().hist(bins=30, alpha=0.7, edgecolor='black')
    plt.xlabel('RUL Máximo por Unidad')
    plt.ylabel('Frecuencia')
    plt.title('RUL Máximo por Motor')

    plt.subplot(1, 3, 3)
    unit_cycles = train_features.groupby('unit_number')['time_in_cycles'].max()
    plt.hist(unit_cycles, bins=30, alpha=0.7, edgecolor='black')
    plt.xlabel('Ciclos Totales por Unidad')
    plt.ylabel('Frecuencia')
    plt.title('Duración de Vida por Motor')

    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/rul_distribution_analysis.png', bbox_inches='tight')
    plt.close()

    # 2. Correlación entre sensores seleccionados
    sensor_raw_cols = [f'{s}_raw' for s in selected_sensors if f'{s}_raw' in train_features.columns]
    corr_matrix = train_features[sensor_raw_cols + ['RUL']].corr()

    plt.figure(figsize=(14, 12))
    mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
    sns.heatmap(corr_matrix, mask=mask, annot=True, cmap='coolwarm', center=0,
                square=True, fmt='.2f', cbar_kws={"shrink": .8})
    plt.title('Matriz de Correlación - Sensores Seleccionados vs RUL')
    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/correlation_matrix_selected_sensors.png', bbox_inches='tight')
    plt.close()

    # 3. Evolución temporal de sensores clave
    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    axes = axes.ravel()

    sample_units = train_features['unit_number'].unique()[:6]
    colors = plt.cm.tab10(np.linspace(0, 1, len(sample_units)))

    for i, sensor in enumerate(selected_sensors[:6]):
        sensor_col = f'{sensor}_raw'
        if sensor_col not in train_features.columns:
            continue

        for unit, color in zip(sample_units, colors):
            unit_data = train_features[train_features['unit_number'] == unit]
            axes[i].plot(unit_data['time_in_cycles'], unit_data[sensor_col],
                        alpha=0.7, color=color, linewidth=1)

        axes[i].set_xlabel('Ciclos')
        axes[i].set_ylabel('Valor del Sensor')
        axes[i].set_title(f'{sensor} - Evolución Temporal')
        axes[i].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/sensor_temporal_evolution.png', bbox_inches='tight')
    plt.close()

    # 4. Estadísticas descriptivas
    stats_cols = ['RUL'] + sensor_raw_cols
    desc_stats = train_features[stats_cols].describe()
    desc_stats.to_csv(f'{FEAT_DIR}/descriptive_statistics.csv')

    logger.log("Análisis exploratorio completado")

# -----------------------------------------------------------
# 5. MODELADO CON MÚLTIPLES ALGORITMOS
# -----------------------------------------------------------
def comprehensive_modeling(train_features, test_features):
    logger.log("Iniciando modelado comprehensivo...")

    # Preparar datos
    feature_cols = [c for c in train_features.columns
                   if c not in ['unit_number', 'time_in_cycles', 'RUL']]

    X_train = train_features[feature_cols].fillna(0)
    y_train = train_features['RUL']
    X_test = test_features[feature_cols].fillna(0)
    y_test = test_features['RUL']

    logger.log(f"Características utilizadas: {len(feature_cols)}")
    logger.log(f"Datos de entrenamiento: {X_train.shape}")
    logger.log(f"Datos de prueba: {X_test.shape}")

    # Normalización
    scaler = SKScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Definir modelos
    models = {
        'LightGBM': LGBMRegressor(objective='regression', random_state=42, n_jobs=-1, verbose=-1),
        'XGBoost': xgb.XGBRegressor(random_state=42, n_jobs=-1, verbosity=0),
        'RandomForest': RandomForestRegressor(random_state=42, n_jobs=-1),
        'Ridge': Ridge(random_state=42)
    }

    # Parámetros para búsqueda
    param_grids = {
        'LightGBM': {
            'learning_rate': [0.01, 0.1, 0.2],
            'n_estimators': [100, 200, 500],
            'max_depth': [3, 6, 9],
            'num_leaves': [31, 50, 100]
        },
        'XGBoost': {
            'learning_rate': [0.01, 0.1, 0.2],
            'n_estimators': [100, 200, 500],
            'max_depth': [3, 6, 9]
        },
        'RandomForest': {
            'n_estimators': [50, 100, 200],
            'max_depth': [10, 20, None],
            'min_samples_split': [2, 5, 10]
        },
        'Ridge': {
            'alpha': [0.1, 1.0, 10.0, 100.0]
        }
    }

    results = {}
    trained_models = {}

    # Validación cruzada temporal
    tscv = TimeSeriesSplit(n_splits=5)

    for model_name, model in models.items():
        logger.log(f"\n--- Entrenando {model_name} ---")
        start_time = time.time()

        # Usar datos escalados para Ridge, datos originales para tree-based
        X_train_model = X_train_scaled if model_name == 'Ridge' else X_train
        X_test_model = X_test_scaled if model_name == 'Ridge' else X_test

        try:
            # Búsqueda de hiperparámetros
            from sklearn.model_selection import GridSearchCV
            grid_search = GridSearchCV(
                model, param_grids[model_name],
                cv=tscv, scoring='neg_root_mean_squared_error',
                n_jobs=-1, verbose=0
            )

            grid_search.fit(X_train_model, y_train)
            best_model = grid_search.best_estimator_

            # Predicción
            y_pred = best_model.predict(X_test_model)

            # Métricas
            rmse = np.sqrt(mean_squared_error(y_test, y_pred))
            mae = mean_absolute_error(y_test, y_pred)
            r2 = r2_score(y_test, y_pred)

            # Validación cruzada
            cv_scores = cross_val_score(best_model, X_train_model, y_train,
                                      cv=tscv, scoring='neg_root_mean_squared_error')
            cv_rmse_mean = -cv_scores.mean()
            cv_rmse_std = cv_scores.std()

            training_time = time.time() - start_time

            results[model_name] = {
                'rmse': rmse,
                'mae': mae,
                'r2': r2,
                'cv_rmse_mean': cv_rmse_mean,
                'cv_rmse_std': cv_rmse_std,
                'training_time': training_time,
                'best_params': grid_search.best_params_,
                'predictions': y_pred
            }

            trained_models[model_name] = best_model

            logger.log_metrics(f"{model_name} Resultados", {
                'RMSE': rmse,
                'MAE': mae,
                'R²': r2,
                'CV RMSE (mean±std)': f"{cv_rmse_mean:.3f}±{cv_rmse_std:.3f}",
                'Tiempo entrenamiento': f"{training_time:.2f}s"
            })

        except Exception as e:
            logger.log(f"Error entrenando {model_name}: {e}")
            continue

    return results, trained_models, y_test, scaler

# -----------------------------------------------------------
# 6. ANÁLISIS DE RENDIMIENTO DASK vs PANDAS
# -----------------------------------------------------------
def dask_vs_pandas_comparison(train_features, test_features):
    logger.log("Comparando rendimiento Dask vs Pandas...")

    feature_cols = [c for c in train_features.columns
                   if c not in ['unit_number', 'time_in_cycles', 'RUL']]

    comparison_results = {}

    # Preparar datos para ambos enfoques
    X_train_pd = train_features[feature_cols].fillna(0)
    y_train_pd = train_features['RUL']
    X_test_pd = test_features[feature_cols].fillna(0)
    y_test_pd = test_features['RUL']

    # Convertir a Dask
    X_train_dd = dd.from_pandas(X_train_pd, npartitions=4)
    y_train_dd = dd.from_pandas(y_train_pd, npartitions=4)
    X_test_dd = dd.from_pandas(X_test_pd, npartitions=2)

    # Modelo simple para comparación
    model_params = {'n_estimators': 100, 'max_depth': 6, 'random_state': 42, 'n_jobs': -1}

    # ENFOQUE PANDAS (Eager)
    logger.log("Probando enfoque Pandas (eager)...")
    start_time = time.time()
    memory_before = psutil.virtual_memory().used / 1e9

    scaler_pd = SKScaler()
    X_train_scaled_pd = scaler_pd.fit_transform(X_train_pd)
    X_test_scaled_pd = scaler_pd.transform(X_test_pd)

    model_pd = LGBMRegressor(**model_params, verbose=-1)
    model_pd.fit(X_train_scaled_pd, y_train_pd)
    y_pred_pd = model_pd.predict(X_test_scaled_pd)

    pandas_time = time.time() - start_time
    memory_after = psutil.virtual_memory().used / 1e9
    pandas_memory = memory_after - memory_before

    pandas_rmse = np.sqrt(mean_squared_error(y_test_pd, y_pred_pd))
    pandas_r2 = r2_score(y_test_pd, y_pred_pd)

    # ENFOQUE DASK (Lazy)
    logger.log("Probando enfoque Dask (lazy)...")
    start_time = time.time()
    memory_before = psutil.virtual_memory().used / 1e9

    # Usar Dask para normalización
    scaler_dask = DaskStandardScaler()
    X_train_scaled_dd = scaler_dask.fit_transform(X_train_dd)
    X_test_scaled_dd = scaler_dask.transform(X_test_dd)

    # Usar búsqueda distribuida
    tscv = TimeSeriesSplit(n_splits=3)  # Reducido para velocidad
    param_grid = {'learning_rate': [0.1], 'n_estimators': [100]}

    dask_model = DaskGridSearchCV(
        LGBMRegressor(random_state=42, n_jobs=-1, verbose=-1),
        param_grid, cv=tscv, scoring='neg_root_mean_squared_error'
    )

    with ResourceProfiler(), Profiler():
        dask_model.fit(X_train_scaled_dd, y_train_dd)
        y_pred_dask = dask_model.predict(X_test_scaled_dd.compute())

    dask_time = time.time() - start_time
    memory_after = psutil.virtual_memory().used / 1e9
    dask_memory = memory_after - memory_before

    dask_rmse = np.sqrt(mean_squared_error(y_test_pd, y_pred_dask))
    dask_r2 = r2_score(y_test_pd, y_pred_dask)

    # Guardar resultados de comparación
    comparison_results = {
        'pandas': {
            'time': pandas_time,
            'memory_gb': pandas_memory,
            'rmse': pandas_rmse,
            'r2': pandas_r2,
            'approach': 'eager'
        },
        'dask': {
            'time': dask_time,
            'memory_gb': dask_memory,
            'rmse': dask_rmse,
            'r2': dask_r2,
            'approach': 'lazy'
        }
    }

    logger.log_metrics("Comparación Pandas vs Dask", {
        'Pandas - Tiempo': f"{pandas_time:.2f}s",
        'Pandas - Memoria': f"{pandas_memory:.2f}GB",
        'Pandas - RMSE': f"{pandas_rmse:.4f}",
        'Dask - Tiempo': f"{dask_time:.2f}s",
        'Dask - Memoria': f"{dask_memory:.2f}GB",
        'Dask - RMSE': f"{dask_rmse:.4f}",
        'Speedup': f"{pandas_time/dask_time:.2f}x" if dask_time > 0 else "N/A"
    })

    return comparison_results

# -----------------------------------------------------------
# 7. ANÁLISIS DE ESCALABILIDAD
# -----------------------------------------------------------
def scalability_analysis():
    logger.log("Realizando análisis de escalabilidad...")

    # Configuraciones de workers a probar
    worker_configs = [
        (1, 1),   # 1 worker, 1 thread
        (2, 1),   # 2 workers, 1 thread cada uno
        (2, 2),   # 2 workers, 2 threads cada uno
        (4, 1),   # 4 workers, 1 thread cada uno
        (4, 2),   # 4 workers, 2 threads cada uno
    ]

    scalability_results = []

    # Datos sintéticos para prueba controlada
    np.random.seed(42)
    n_samples = 50000
    n_features = 50

    X_synthetic = np.random.randn(n_samples, n_features)
    y_synthetic = np.random.randn(n_samples)

    for n_workers, threads_per_worker in worker_configs:
        logger.log(f"Probando configuración: {n_workers} workers, {threads_per_worker} threads c/u")

        try:
            # Inicializar cliente Dask
            client = Client(n_workers=n_workers, threads_per_worker=threads_per_worker,
                          silence_logs=30, dashboard_address=None)

            # Convertir a Dask DataFrame
            df_dask = dd.from_pandas(
                pd.DataFrame(X_synthetic),
                npartitions=n_workers * threads_per_worker
            )

            # Medir tiempo de operaciones típicas
            start_time = time.time()
            memory_before = psutil.virtual_memory().used / 1e9

            # Operaciones de prueba
            result1 = df_dask.mean().compute()
            result2 = df_dask.std().compute()
            result3 = (df_dask * 2).sum().compute()

            execution_time = time.time() - start_time
            memory_after = psutil.virtual_memory().used / 1e9
            memory_used = memory_after - memory_before

            # Calcular métricas de escalabilidad
            total_cores = n_workers * threads_per_worker

            scalability_results.append({
                'n_workers': n_workers,
                'threads_per_worker': threads_per_worker,
                'total_cores': total_cores,
                'execution_time': execution_time,
                'memory_used_gb': memory_used,
                'throughput': n_samples / execution_time if execution_time > 0 else 0
            })

            logger.log(f"Configuración {n_workers}x{threads_per_worker}: {execution_time:.2f}s, {memory_used:.2f}GB")

            client.close()
            time.sleep(2)  # Pausa para limpieza

        except Exception as e:
            logger.log(f"Error en configuración {n_workers}x{threads_per_worker}: {e}")
            continue

    # Guardar resultados
    scalability_df = pd.DataFrame(scalability_results)
    scalability_df.to_csv(f'{RESULTS_DIR}/scalability_analysis.csv', index=False)

    return scalability_results

# -----------------------------------------------------------
# 8. VISUALIZACIONES COMPREHENSIVAS
# -----------------------------------------------------------
def create_comprehensive_visualizations(results, trained_models, y_test, scalability_results, comparison_results):
    logger.log("Creando visualizaciones comprehensivas para el paper...")

    # 1. Comparación de modelos - Métricas principales
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))

    model_names = list(results.keys())
    rmse_values = [results[m]['rmse'] for m in model_names]
    mae_values = [results[m]['mae'] for m in model_names]
    r2_values = [results[m]['r2'] for m in model_names]
    times = [results[m]['training_time'] for m in model_names]

    # RMSE Comparison
    bars1 = axes[0,0].bar(model_names, rmse_values, color='skyblue', edgecolor='navy', alpha=0.7)
    axes[0,0].set_ylabel('RMSE')
    axes[0,0].set_title('Comparación de RMSE por Modelo')
    axes[0,0].tick_params(axis='x', rotation=45)
    for bar, val in zip(bars1, rmse_values):
        axes[0,0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
                      f'{val:.2f}', ha='center', va='bottom')

    # R² Comparison
    bars2 = axes[0,1].bar(model_names, r2_values, color='lightcoral', edgecolor='darkred', alpha=0.7)
    axes[0,1].set_ylabel('R² Score')
    axes[0,1].set_title('Comparación de R² por Modelo')
    axes[0,1].tick_params(axis='x', rotation=45)
    axes[0,1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
    for bar, val in zip(bars2, r2_values):
        axes[0,1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                      f'{val:.3f}', ha='center', va='bottom')

    # MAE Comparison
    bars3 = axes[1,0].bar(model_names, mae_values, color='lightgreen', edgecolor='darkgreen', alpha=0.7)
    axes[1,0].set_ylabel('MAE')
    axes[1,0].set_title('Comparación de MAE por Modelo')
    axes[1,0].tick_params(axis='x', rotation=45)
    for bar, val in zip(bars3, mae_values):
        axes[1,0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
                      f'{val:.2f}', ha='center', va='bottom')

    # Training Time Comparison
    bars4 = axes[1,1].bar(model_names, times, color='gold', edgecolor='orange', alpha=0.7)
    axes[1,1].set_ylabel('Tiempo de Entrenamiento (s)')
    axes[1,1].set_title('Tiempo de Entrenamiento por Modelo')
    axes[1,1].tick_params(axis='x', rotation=45)
    for bar, val in zip(bars4, times):
        axes[1,1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
                      f'{val:.1f}s', ha='center', va='bottom')

    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/model_comparison_comprehensive.png', bbox_inches='tight')
    plt.close()

    # 2. Predicciones vs Valores Reales (mejor modelo)
    best_model_name = min(results.keys(), key=lambda x: results[x]['rmse'])
    best_predictions = results[best_model_name]['predictions']

    fig, axes = plt.subplots(1, 3, figsize=(18, 6))

    # Scatter plot
    axes[0].scatter(y_test, best_predictions, alpha=0.6, s=20)
    axes[0].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', linewidth=2)
    axes[0].set_xlabel('RUL Real')
    axes[0].set_ylabel('RUL Predicho')
    axes[0].set_title(f'Predicciones vs Real - {best_model_name}')
    axes[0].grid(True, alpha=0.3)

    # Residual plot
    residuals = y_test - best_predictions
    axes[1].scatter(best_predictions, residuals, alpha=0.6, s=20)
    axes[1].axhline(y=0, color='r', linestyle='--', linewidth=2)
    axes[1].set_xlabel('RUL Predicho')
    axes[1].set_ylabel('Residuales')
    axes[1].set_title('Análisis de Residuales')
    axes[1].grid(True, alpha=0.3)

    # Error distribution
    axes[2].hist(residuals, bins=30, alpha=0.7, edgecolor='black')
    axes[2].axvline(x=0, color='r', linestyle='--', linewidth=2)
    axes[2].set_xlabel('Error (Real - Predicho)')
    axes[2].set_ylabel('Frecuencia')
    axes[2].set_title('Distribución del Error')
    axes[2].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/best_model_analysis.png', bbox_inches='tight')
    plt.close()

    # 3. Análisis de Escalabilidad
    if scalability_results:
        scalability_df = pd.DataFrame(scalability_results)

        fig, axes = plt.subplots(1, 3, figsize=(18, 6))

        # Tiempo vs Cores
        axes[0].plot(scalability_df['total_cores'], scalability_df['execution_time'],
                    'bo-', linewidth=2, markersize=8)
        axes[0].set_xlabel('Número Total de Cores')
        axes[0].set_ylabel('Tiempo de Ejecución (s)')
        axes[0].set_title('Escalabilidad: Tiempo vs Cores')
        axes[0].grid(True, alpha=0.3)

        # Throughput vs Cores
        axes[1].plot(scalability_df['total_cores'], scalability_df['throughput'],
                    'go-', linewidth=2, markersize=8)
        axes[1].set_xlabel('Número Total de Cores')
        axes[1].set_ylabel('Throughput (muestras/s)')
        axes[1].set_title('Escalabilidad: Throughput vs Cores')
        axes[1].grid(True, alpha=0.3)

        # Memoria vs Cores
        axes[2].plot(scalability_df['total_cores'], scalability_df['memory_used_gb'],
                    'ro-', linewidth=2, markersize=8)
        axes[2].set_xlabel('Número Total de Cores')
        axes[2].set_ylabel('Memoria Utilizada (GB)')
        axes[2].set_title('Uso de Memoria vs Cores')
        axes[2].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.savefig(f'{PLOTS_DIR}/scalability_analysis.png', bbox_inches='tight')
        plt.close()

    # 4. Comparación Dask vs Pandas
    if comparison_results:
        categories = ['Tiempo (s)', 'Memoria (GB)', 'RMSE']
        pandas_values = [comparison_results['pandas']['time'],
                        comparison_results['pandas']['memory_gb'],
                        comparison_results['pandas']['rmse']]
        dask_values = [comparison_results['dask']['time'],
                      comparison_results['dask']['memory_gb'],
                      comparison_results['dask']['rmse']]

        x = np.arange(len(categories))
        width = 0.35

        fig, ax = plt.subplots(figsize=(12, 8))
        bars1 = ax.bar(x - width/2, pandas_values, width, label='Pandas (Eager)',
                      color='lightblue', edgecolor='navy', alpha=0.7)
        bars2 = ax.bar(x + width/2, dask_values, width, label='Dask (Lazy)',
                      color='lightcoral', edgecolor='darkred', alpha=0.7)

        ax.set_xlabel('Métricas')
        ax.set_ylabel('Valores')
        ax.set_title('Comparación Rendimiento: Dask vs Pandas')
        ax.set_xticks(x)
        ax.set_xticklabels(categories)
        ax.legend()
        ax.grid(True, alpha=0.3)

        # Añadir valores en las barras
        for bars in [bars1, bars2]:
            for bar in bars:
                height = bar.get_height()
                ax.text(bar.get_x() + bar.get_width()/2., height + height*0.01,
                       f'{height:.3f}', ha='center', va='bottom')

        plt.tight_layout()
        plt.savefig(f'{PLOTS_DIR}/dask_vs_pandas_comparison.png', bbox_inches='tight')
        plt.close()

    # 5. Cross-validation results
    cv_means = [results[m]['cv_rmse_mean'] for m in model_names]
    cv_stds = [results[m]['cv_rmse_std'] for m in model_names]

    fig, ax = plt.subplots(figsize=(12, 8))
    bars = ax.bar(model_names, cv_means, yerr=cv_stds, capsize=5,
                 color='mediumpurple', edgecolor='indigo', alpha=0.7)
    ax.set_ylabel('RMSE (Cross-Validation)')
    ax.set_title('Validación Cruzada - RMSE con Intervalos de Confianza')
    ax.tick_params(axis='x', rotation=45)
    ax.grid(True, alpha=0.3)

    for bar, mean, std in zip(bars, cv_means, cv_stds):
        ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + std + 1,
               f'{mean:.2f}±{std:.2f}', ha='center', va='bottom')

    plt.tight_layout()
    plt.savefig(f'{PLOTS_DIR}/cross_validation_results.png', bbox_inches='tight')
    plt.close()

    logger.log("Todas las visualizaciones creadas exitosamente")

# -----------------------------------------------------------
# 9. GENERACIÓN DE REPORTE
# -----------------------------------------------------------
def generate_scientific_report(results, comparison_results, scalability_results, selected_sensors):
    logger.log("Generando reporte científico completo...")

    report_content = f"""
# REPORTE CIENTÍFICO: PREDICCIÓN DISTRIBUIDA DE VIDA ÚTIL REMANENTE (RUL)
## Análisis Comparativo de Dask vs Enfoques Tradicionales

**Fecha de generación:** {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}

## RESUMEN EJECUTIVO

Este estudio evalúa la eficacia de Dask para el procesamiento distribuido en la predicción de vida útil remanente (RUL) de motores turbofan utilizando el dataset C-MAPSS de la NASA.

### Hallazgos Principales:
"""

    # Encontrar el mejor modelo
    best_model = min(results.keys(), key=lambda x: results[x]['rmse'])
    best_rmse = results[best_model]['rmse']
    best_r2 = results[best_model]['r2']

    report_content += f"""
- **Mejor modelo:** {best_model} (RMSE: {best_rmse:.4f}, R²: {best_r2:.4f})
- **Sensores seleccionados:** {len(selected_sensors)} de 21 disponibles
- **Características generadas:** {len([c for c in results[best_model].get('feature_names', []) if c])} características engineered
"""

    if comparison_results:
        pandas_time = comparison_results['pandas']['time']
        dask_time = comparison_results['dask']['time']
        speedup = pandas_time / dask_time if dask_time > 0 else 1

        report_content += f"""
- **Speedup Dask vs Pandas:** {speedup:.2f}x
- **Eficiencia de memoria:** Dask utilizó {comparison_results['dask']['memory_gb']:.2f}GB vs {comparison_results['pandas']['memory_gb']:.2f}GB de Pandas
"""

    report_content += f"""

## METODOLOGÍA

### Preprocesamiento de Datos:
1. **Eliminación de variables constantes:** Variables con varianza cero o casi cero fueron removidas
2. **Selección de sensores:** Utilizando información mutua, se seleccionaron los {len(selected_sensors)} sensores más informativos
3. **Ingeniería de características:** Se crearon características de ventana móvil (3, 5, 10, 15 ciclos), tendencias y estadísticas acumulativas

### Modelos Evaluados:
"""

    for model_name, model_results in results.items():
        report_content += f"""
#### {model_name}:
- RMSE: {model_results['rmse']:.4f}
- MAE: {model_results['mae']:.4f}
- R²: {model_results['r2']:.4f}
- Tiempo entrenamiento: {model_results['training_time']:.2f}s
- CV RMSE: {model_results['cv_rmse_mean']:.4f} ± {model_results['cv_rmse_std']:.4f}
- Mejores parámetros: {model_results['best_params']}
"""

    if scalability_results:
        report_content += f"""

## ANÁLISIS DE ESCALABILIDAD

Se evaluaron diferentes configuraciones de workers y threads:

| Workers | Threads/Worker | Total Cores | Tiempo (s) | Memoria (GB) | Throughput |
|---------|----------------|-------------|------------|--------------|------------|
"""
        for result in scalability_results:
            report_content += f"| {result['n_workers']} | {result['threads_per_worker']} | {result['total_cores']} | {result['execution_time']:.2f} | {result['memory_used_gb']:.2f} | {result['throughput']:.0f} |\n"

    if comparison_results:
        report_content += f"""

## COMPARACIÓN DASK VS PANDAS

| Métrica | Pandas (Eager) | Dask (Lazy) | Mejora |
|---------|----------------|-------------|---------|
| Tiempo | {comparison_results['pandas']['time']:.2f}s | {comparison_results['dask']['time']:.2f}s | {pandas_time/dask_time:.2f}x |
| Memoria | {comparison_results['pandas']['memory_gb']:.2f}GB | {comparison_results['dask']['memory_gb']:.2f}GB | {comparison_results['pandas']['memory_gb']/comparison_results['dask']['memory_gb']:.2f}x |
| RMSE | {comparison_results['pandas']['rmse']:.4f} | {comparison_results['dask']['rmse']:.4f} | - |
"""

    report_content += f"""

## CONCLUSIONES Y RECOMENDACIONES

### Rendimiento del Modelo:
- El modelo {best_model} mostró el mejor rendimiento con RMSE de {best_rmse:.4f}
- La validación cruzada temporal confirma la robustez del modelo
- Se requiere mejorar la ingeniería de características para obtener R² positivo

### Eficiencia Computacional:
- Dask demostró ventajas en el procesamiento de datos distribuido
- La escalabilidad horizontal es efectiva hasta cierto punto
- El overhead de coordinación se vuelve significativo con configuraciones pequeñas

### Recomendaciones para Investigación Futura:
1. **Explorar arquitecturas de deep learning** (LSTM, CNN) para capturar patrones temporales complejos
2. **Implementar técnicas de ensemble** combinando múltiples modelos
3. **Optimizar hiperparámetros** con búsquedas más exhaustivas
4. **Evaluar en datasets más grandes** para aprovechar completamente Dask
5. **Investigar técnicas de feature selection** más sofisticadas

## ARCHIVOS GENERADOS

### Modelos:
- `{MODEL_DIR}/best_model_{best_model.lower()}.pkl`: Mejor modelo entrenado
- `{MODEL_DIR}/scaler.pkl`: Normalizador ajustado

### Datos:
- `{FEAT_DIR}/train_features_advanced.parquet`: Características de entrenamiento
- `{FEAT_DIR}/test_features_advanced.parquet`: Características de prueba
- `{FEAT_DIR}/mutual_information_analysis.csv`: Análisis de selección de sensores

### Resultados:
- `{RESULTS_DIR}/comprehensive_results.json`: Todos los resultados numéricos
- `{RESULTS_DIR}/scalability_analysis.csv`: Resultados de escalabilidad
- `{PLOTS_DIR}/`: Todas las visualizaciones generadas

### Visualizaciones Clave:
- `model_comparison_comprehensive.png`: Comparación entre todos los modelos
- `best_model_analysis.png`: Análisis detallado del mejor modelo
- `scalability_analysis.png`: Resultados de escalabilidad
- `dask_vs_pandas_comparison.png`: Comparación de enfoques
- `mutual_information_analysis.png`: Selección de sensores
- `correlation_matrix_selected_sensors.png`: Correlaciones entre variables

---
*Reporte generado automáticamente por el sistema de análisis distribuido*
"""

    # Guardar reporte
    with open(f'{RESULTS_DIR}/scientific_report.md', 'w', encoding='utf-8') as f:
        f.write(report_content)

    # Guardar resultados completos en JSON
    complete_results = {
        'experiment_timestamp': datetime.datetime.now().isoformat(),
        'model_results': results,
        'comparison_results': comparison_results,
        'scalability_results': scalability_results,
        'selected_sensors': selected_sensors,
        'best_model': best_model,
        'dataset_info': {
            'name': 'C-MAPSS FD001',
            'source': 'NASA Prognostics Center of Excellence'
        }
    }

    with open(f'{RESULTS_DIR}/comprehensive_results.json', 'w') as f:
        json.dump(complete_results, f, indent=2, default=str)

    logger.log("Reporte científico generado exitosamente")
    return report_content

# -----------------------------------------------------------
# 10. FUNCIÓN PRINCIPAL
# -----------------------------------------------------------
def main():
    """Función principal que ejecuta todo el pipeline científico"""

    logger.log("=== INICIANDO PIPELINE CIENTÍFICO COMPLETO ===")
    total_start_time = time.time()

    try:
        # Etapa 1: Carga y preprocesamiento
        logger.log("\n ETAPA 1: CARGA Y PREPROCESAMIENTO")
        train_dd, test_dd, rul = load_and_preprocess_data()
        if train_dd is None:
            logger.log(" Error: No se pudieron cargar los datos")
            return

        # Recargar desde archivos limpios como DataFrames de pandas
        train_features_path = f'{CLEAN}/train_FD001_clean.parquet'
        test_features_path = f'{CLEAN}/test_FD001_clean.parquet'

        # Leer como pandas primero, luego convertir a Dask si es necesario
        if os.path.exists(train_features_path):
            train = pd.read_parquet(train_features_path)
            test = pd.read_parquet(test_features_path)
        else:
            train = train_dd.compute()
            test = test_dd.compute()

        # Etapa 2: Ingeniería de características
        logger.log("\n ETAPA 2: INGENIERÍA DE CARACTERÍSTICAS")
        train_features, test_features, selected_sensors = advanced_feature_engineering(train, test, rul)

        # Etapa 3: Análisis exploratorio
        logger.log("\n ETAPA 3: ANÁLISIS EXPLORATORIO DE DATOS")
        comprehensive_eda(train_features, selected_sensors)

        # Etapa 4: Modelado comprehensivo
        logger.log("\n ETAPA 4: MODELADO Y EVALUACIÓN")
        results, trained_models, y_test, scaler = comprehensive_modeling(train_features, test_features)

        # Etapa 5: Comparación Dask vs Pandas
        logger.log("\n ETAPA 5: COMPARACIÓN DASK VS PANDAS")
        comparison_results = dask_vs_pandas_comparison(train_features, test_features)

        # Etapa 6: Análisis de escalabilidad
        logger.log("\n ETAPA 6: ANÁLISIS DE ESCALABILIDAD")
        scalability_results = scalability_analysis()

        # Etapa 7: Visualizaciones comprehensivas
        logger.log("\n ETAPA 7: GENERACIÓN DE VISUALIZACIONES")
        create_comprehensive_visualizations(results, trained_models, y_test,
                                          scalability_results, comparison_results)

        # Etapa 8: Guardar modelos
        logger.log("\n ETAPA 8: GUARDADO DE MODELOS")
        best_model_name = min(results.keys(), key=lambda x: results[x]['rmse'])
        best_model = trained_models[best_model_name]

        joblib.dump(best_model, f'{MODEL_DIR}/best_model_{best_model_name.lower()}.pkl')
        joblib.dump(scaler, f'{MODEL_DIR}/scaler.pkl')

        # Etapa 9: Reporte científico
        logger.log("\n ETAPA 9: GENERACIÓN DE REPORTE CIENTÍFICO")
        report = generate_scientific_report(results, comparison_results,
                                          scalability_results, selected_sensors)

        # Resumen final
        total_time = time.time() - total_start_time
        logger.log(f"\n PIPELINE COMPLETADO EXITOSAMENTE")
        logger.log(f" Tiempo total: {total_time:.2f} segundos")
        logger.log(f" Resultados guardados en: {RESULTS_DIR}")
        logger.log(f" Mejor modelo: {best_model_name} (RMSE: {results[best_model_name]['rmse']:.4f})")

        # Mostrar estructura de archivos generados
        logger.log("\n ARCHIVOS GENERADOS:")
        for root, dirs, files in os.walk(RESULTS_DIR):
            level = root.replace(RESULTS_DIR, '').count(os.sep)
            indent = ' ' * 2 * level
            logger.log(f"{indent}{os.path.basename(root)}/")
            subindent = ' ' * 2 * (level + 1)
            for file in files:
                logger.log(f"{subindent}{file}")

        return {
            'success': True,
            'results': results,
            'best_model': best_model_name,
            'total_time': total_time,
            'output_dir': RESULTS_DIR
        }

    except Exception as e:
        logger.log(f" ERROR CRÍTICO: {str(e)}")
        import traceback
        logger.log(traceback.format_exc())
        return {'success': False, 'error': str(e)}

# -----------------------------------------------------------
# EJECUTAR EXPERIMENTO
# -----------------------------------------------------------
if __name__ == "__main__":
    # Configurar warnings
    warnings.filterwarnings("ignore", category=FutureWarning)
    warnings.filterwarnings("ignore", category=UserWarning)

    print(" Iniciando Experimento Científico - Predicción RUL con Dask")
    print("=" * 60)

    # Ejecutar pipeline completo
    final_results = main()

    if final_results['success']:
        print("\n ¡EXPERIMENTO COMPLETADO CON ÉXITO!")
        print(f" Mejor modelo: {final_results['best_model']}")
        print(f" Tiempo total: {final_results['total_time']:.2f}s")
        print(f" Resultados en: {final_results['output_dir']}")
    else:
        print(f"\n EXPERIMENTO FALLÓ: {final_results['error']}")

    print("=" * 60)

[2025-08-03 17:32:32] === INICIANDO EXPERIMENTO CIENTÍFICO ===
🚀 Iniciando Experimento Científico - Predicción RUL con Dask
[2025-08-03 17:32:32] === INICIANDO PIPELINE CIENTÍFICO COMPLETO ===
[2025-08-03 17:32:32] 
🔄 ETAPA 1: CARGA Y PREPROCESAMIENTO
[2025-08-03 17:32:32] Iniciando carga y preprocesamiento de datos...




[2025-08-03 17:32:32] Datos cargados - Train: 20631 filas, Test: 13096 filas




[2025-08-03 17:32:32] Columnas constantes encontradas: 7
[2025-08-03 17:32:32] Columnas casi constantes encontradas: 3
[2025-08-03 17:32:32] Eliminadas 10 columnas problemáticas




[2025-08-03 17:32:33] 
🔄 ETAPA 2: INGENIERÍA DE CARACTERÍSTICAS
[2025-08-03 17:32:33] Iniciando ingeniería de características avanzada...
[2025-08-03 17:32:33] Trabajando con pandas DataFrames...
[2025-08-03 17:32:33] Sensores seleccionados: 4 de 13
[2025-08-03 17:32:33] Aplicando transformaciones de características...
[2025-08-03 17:32:34] Características creadas - Train: (20631, 95), Test: (13096, 95)
[2025-08-03 17:32:34] 
🔄 ETAPA 3: ANÁLISIS EXPLORATORIO DE DATOS
[2025-08-03 17:32:34] Realizando análisis exploratorio de datos...
[2025-08-03 17:32:38] Análisis exploratorio completado
[2025-08-03 17:32:38] 
🔄 ETAPA 4: MODELADO Y EVALUACIÓN
[2025-08-03 17:32:38] Iniciando modelado comprehensivo...
[2025-08-03 17:32:38] Características utilizadas: 92
[2025-08-03 17:32:38] Datos de entrenamiento: (20631, 92)
[2025-08-03 17:32:38] Datos de prueba: (13096, 92)
[2025-08-03 17:32:38] 
--- Entrenando LightGBM ---
[2025-08-03 18:57:12] 
=== LightGBM Resultados ===
[2025-08-03 18:57:12] RMSE: 

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:37859
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:40449/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39903'


[2025-08-03 20:56:18] 
=== Comparación Pandas vs Dask ===
[2025-08-03 20:56:18] Pandas - Tiempo: 1.24s
[2025-08-03 20:56:18] Pandas - Memoria: 0.00GB
[2025-08-03 20:56:18] Pandas - RMSE: 176.7530
[2025-08-03 20:56:18] Dask - Tiempo: 7.35s
[2025-08-03 20:56:18] Dask - Memoria: 0.09GB
[2025-08-03 20:56:18] Dask - RMSE: 176.0701
[2025-08-03 20:56:18] Speedup: 0.17x
[2025-08-03 20:56:18] 
🔄 ETAPA 6: ANÁLISIS DE ESCALABILIDAD
[2025-08-03 20:56:18] Realizando análisis de escalabilidad...
[2025-08-03 20:56:18] Probando configuración: 1 workers, 1 threads c/u


INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:37985 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:37985
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48628
INFO:distributed.scheduler:Receive client connection: Client-4d6c534b-70ac-11f0-8105-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48630
INFO:distributed.scheduler:Remove client Client-4d6c534b-70ac-11f0-8105-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:48630; closing.
INFO:distributed.scheduler:Remove client Client-4d6c534b-70ac-11f0-8105-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-4d6c534b-70ac-11f0-8105-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1754254581.2234986') (0,)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:39903'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker t

[2025-08-03 20:56:21] Configuración 1x1: 2.02s, 0.18GB


INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:39903' closed.
INFO:distributed.scheduler:Closing scheduler. Reason: unknown
INFO:distributed.scheduler:Scheduler closing all comms
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:45807
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:40941/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38017'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41795'


[2025-08-03 20:56:23] Probando configuración: 2 workers, 1 threads c/u


INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:33625 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33625
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:54682
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42525 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42525
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:54690
INFO:distributed.scheduler:Receive client connection: Client-507cc356-70ac-11f0-8105-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:54698
INFO:distributed.scheduler:Remove client Client-507cc356-70ac-11f0-8105-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:54698; closing.
INFO:distributed.scheduler:Remove client Client-507cc356-70ac-11f0-8105-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-507cc356-70ac-11f0-8105-0242ac1c0

[2025-08-03 20:56:26] Configuración 2x1: 2.40s, 0.30GB


INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:38017' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:41795' closed.
INFO:distributed.scheduler:Closing scheduler. Reason: unknown
INFO:distributed.scheduler:Scheduler closing all comms
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:35701
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:34975/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35735'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39493'


[2025-08-03 20:56:29] Probando configuración: 2 workers, 2 threads c/u


INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:39681 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:39681
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48694
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42177 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42177
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48696
INFO:distributed.scheduler:Receive client connection: Client-53e892e6-70ac-11f0-8105-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48708
INFO:distributed.scheduler:Remove client Client-53e892e6-70ac-11f0-8105-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:48708; closing.
INFO:distributed.scheduler:Remove client Client-53e892e6-70ac-11f0-8105-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-53e892e6-70ac-11f0-8105-0242ac1c0

[2025-08-03 20:56:33] Configuración 2x2: 2.76s, 0.32GB


INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:39493' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:35735' closed.
INFO:distributed.scheduler:Closing scheduler. Reason: unknown
INFO:distributed.scheduler:Scheduler closing all comms
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:32967
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:46179/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35921'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37789'


[2025-08-03 20:56:35] Probando configuración: 4 workers, 1 threads c/u


INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41689'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34669'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:33129 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33129
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39122
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:32919 name: 3
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:32919
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39112
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42957 name: 2
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42957
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39114
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:45963 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp

[2025-08-03 20:56:43] Configuración 4x1: 6.25s, 0.50GB


INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:37789' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:35921' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:34669' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:41689' closed.
INFO:distributed.scheduler:Closing scheduler. Reason: unknown
INFO:distributed.scheduler:Scheduler closing all comms
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:41633
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:37449/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46841'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38233'


[2025-08-03 20:56:47] Probando configuración: 4 workers, 2 threads c/u


INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:36369'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44661'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:36405 name: 3
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:36405
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:45614
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:35075 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:35075
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:45620
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:32895 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:32895
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:45634
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:39271 name: 2
INFO:distributed.scheduler:Starting worker compute stream, tcp

[2025-08-03 20:56:53] Configuración 4x2: 4.92s, 0.51GB


INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:38233' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:46841' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:44661' closed.
INFO:distributed.nanny:Nanny at 'tcp://127.0.0.1:36369' closed.
INFO:distributed.scheduler:Closing scheduler. Reason: unknown
INFO:distributed.scheduler:Scheduler closing all comms


[2025-08-03 20:56:56] 
🔄 ETAPA 7: GENERACIÓN DE VISUALIZACIONES
[2025-08-03 20:56:56] Creando visualizaciones comprehensivas para el paper...
[2025-08-03 20:57:01] Todas las visualizaciones creadas exitosamente
[2025-08-03 20:57:01] 
🔄 ETAPA 8: GUARDADO DE MODELOS
[2025-08-03 20:57:01] 
🔄 ETAPA 9: GENERACIÓN DE REPORTE CIENTÍFICO
[2025-08-03 20:57:01] Generando reporte científico completo...
[2025-08-03 20:57:01] Reporte científico generado exitosamente
[2025-08-03 20:57:01] 
✅ PIPELINE COMPLETADO EXITOSAMENTE
[2025-08-03 20:57:01] ⏱️  Tiempo total: 12269.53 segundos
[2025-08-03 20:57:01] 📁 Resultados guardados en: /content/drive/MyDrive/CMAPSSData/paper_results
[2025-08-03 20:57:01] 🏆 Mejor modelo: Ridge (RMSE: 174.1227)
[2025-08-03 20:57:01] 
📋 ARCHIVOS GENERADOS:
[2025-08-03 20:57:01] paper_results/
[2025-08-03 20:57:01]   experiment_log.txt
[2025-08-03 20:57:01]   scalability_analysis.csv
[2025-08-03 20:57:01]   scientific_report.md
[2025-08-03 20:57:01]   comprehensive_results.jso