In [2]:
import os
import gc
import json
import numpy as np
import pandas as pd
from datetime import datetime
import subprocess
from sklearn.feature_selection import f_classif, mutual_info_classif, SelectKBest
from sklearn.ensemble import RandomForestClassifier
import dask.dataframe as dd
import dask.array as da
from joblib import Parallel, delayed
import psutil

In [13]:
import os
import numpy as np
import pandas as pd
import dask.dataframe as dd
from sklearn.feature_selection import f_classif, mutual_info_classif
from sklearn.ensemble import RandomForestClassifier
import gc
import json
from datetime import datetime

# =============================================
# Configuración de rendimiento y memoria
# =============================================
MAX_FEATURES = 500
COLS_PER_GROUP = 5
DTYPE = np.float32
SAMPLE_SIZE_MODEL = 50000
SAMPLE_FRACTION = 0.3

def get_memory_usage():
    """Obtiene el uso de memoria actual en GB."""
    import psutil
    return psutil.Process(os.getpid()).memory_info().rss / 1024 ** 3

def is_boolean_column(series):
    """Verifica si una columna es booleana (0/1)."""
    unique_values = series.dropna().unique()
    return set(unique_values).issubset({0, 1})

def safe_transform(data, transform_fn):
    """Aplica transformaciones numéricas seguras."""
    with np.errstate(all='ignore'):
        result = transform_fn(np.abs(data))
    return np.nan_to_num(result, nan=0.0, posinf=0.0, neginf=0.0).astype(DTYPE)

def process_feature_group(df, cols_group, target_col):
    """Procesa características incluyendo cuadradas y cúbicas."""
    features = []
    for col in cols_group:
        if col == target_col:
            continue
            
        col_data = df[col].astype(DTYPE).values
        
        if not is_boolean_column(df[col]):
            target_data = df[target_col].astype(DTYPE).values
            
            # Todas las transformaciones
            features.extend([
                col_data,                                   # Original
                safe_transform(col_data, np.log1p),         # Log
                safe_transform(col_data, np.sqrt),          # Raíz cuadrada
                col_data ** 2,                              # Cuadrada
                col_data ** 3,                              # Cúbica
                col_data * target_data                       # Interacción
            ])
        else:
            features.append(col_data)
    
    return np.column_stack(features)

def clean_nan(df):
    """Limpieza agresiva de NaN."""
    df = df.dropna(axis=1, how='all')
    for col in df.columns:
        if df[col].isna().any():
            median_val = df[col].median(skipna=True)
            df[col] = df[col].fillna(median_val if not np.isnan(median_val) else 0.0)
    return df

def process_dataset(dataset_path, norm_name, experiment_folder):
    print(f"\nProcesando {norm_name}")
    print(f"Memoria inicial: {get_memory_usage():.2f} GB")
    
    # Configuración de directorios
    output_folder = os.path.join(experiment_folder, norm_name)
    os.makedirs(output_folder, exist_ok=True)
    
    # Carga de datos
    ddf = dd.read_parquet(dataset_path)
    target_col = 'nivel_triage'
    
    # Identificar columnas numéricas (excluyendo target)
    numeric_cols = [col for col in ddf.columns 
                   if (col != target_col) and (np.issubdtype(ddf[col].dtype, np.number))]
    
    # Filtrar booleanas
    non_boolean = []
    for col in numeric_cols:
        sample = ddf[col].head(500)
        if not is_boolean_column(sample):
            non_boolean.append(col)
    
    print(f"Columnas válidas: {len(non_boolean)}/{len(numeric_cols)}")
    
    # Procesamiento en grupos
    col_groups = [non_boolean[i:i+COLS_PER_GROUP] 
                 for i in range(0, len(non_boolean), COLS_PER_GROUP)]
    
    # Procesar por particiones
    for partition_idx in range(ddf.npartitions):
        print(f"\nPartición {partition_idx+1}/{ddf.npartitions}")
        partition_df = ddf.get_partition(partition_idx).compute()
        target_values = partition_df[target_col].astype(DTYPE).values
        
        all_features = []
        feature_names = []
        
        for group_idx, cols in enumerate(col_groups):
            print(f"Grupo {group_idx+1}/{len(col_groups)}", end='\r')
            
            # Generar características (6 transformaciones por columna)
            features = process_feature_group(partition_df, cols, target_col)
            names = [f"{col}_{suf}" for col in cols 
                    for suf in ['orig', 'log', 'sqrt', 'squared', 'cubic', 'interact']]
            
            all_features.append(features)
            feature_names.extend(names)
        
        # Crear DataFrame temporal con target
        temp_df = pd.DataFrame(np.hstack(all_features), columns=feature_names, dtype=DTYPE)
        temp_df[target_col] = target_values
        
        # Guardar en disco
        temp_path = os.path.join(output_folder, f"temp_part_{partition_idx}.parquet")
        temp_df.to_parquet(temp_path)
        
        del temp_df, partition_df
        gc.collect()
    
    # =============================================
    # Selección de características
    # =============================================
    selectors = {
        "Linear": f_classif,
        "Nonlinear": mutual_info_classif,
        "Model-Based": RandomForestClassifier(n_estimators=30, n_jobs=-1)
    }
    
    for sel_name, selector in selectors.items():
        print(f"\n[SELECTOR] {sel_name}")
        
        full_ddf, sample, X, y = None, None, None, None
        
        try:
            # Cargar datos
            full_ddf = dd.read_parquet(os.path.join(output_folder, "temp_part_*.parquet"))
            
            # Muestreo para reducir memoria
            sample = full_ddf.sample(frac=SAMPLE_FRACTION).compute()
            X = sample.drop(target_col, axis=1)
            y = sample[target_col]
            
            # Limpieza final
            X = clean_nan(X)
            
            # Verificar NaN
            if X.isna().sum().sum() > 0:
                raise ValueError(f"NaN residuales: {X.isna().sum().sum()}")
            
            # Selección de características
            if sel_name == "Model-Based":
                model = selector.fit(X.sample(SAMPLE_SIZE_MODEL), y.sample(SAMPLE_SIZE_MODEL))
                scores = pd.Series(model.feature_importances_, index=X.columns)
            else:
                with np.errstate(all='ignore'):
                    scores = pd.Series(selector(X, y)[0], index=X.columns)
            
            # Seleccionar y guardar resultados
            scores = scores.replace([np.inf, -np.inf], np.nan).fillna(0)
            top_features = scores.nlargest(MAX_FEATURES).index.tolist()
            
            # Guardar reporte CSV
            report_df = pd.DataFrame({'Feature': top_features, 'Score': scores[top_features].values})
            report_path = os.path.join(output_folder, f"{norm_name}_{sel_name}_report.csv")
            report_df.to_csv(report_path, index=False)
            print(f"✅ Reporte CSV guardado: {report_path}")
            
            # Guardar dataset final
            final_df = full_ddf[top_features + [target_col]].compute()
            final_path = os.path.join(output_folder, f"{norm_name}_{sel_name}_final.parquet")
            final_df.to_parquet(final_path, engine='pyarrow', compression='snappy')
            print(f"✅ Dataset Parquet guardado: {final_path}")
            
        except Exception as e:
            print(f"❌ Error en {sel_name}: {str(e)}")
        finally:
            for var in ['full_ddf', 'sample', 'X', 'y']:
                if var in locals():
                    del locals()[var]
            gc.collect()
    
    # Limpiar temporales
    for f in os.listdir(output_folder):
        if f.startswith("temp_"):
            os.remove(os.path.join(output_folder, f))

def main():
    experiment_name = "experimento_final"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Configuración de paths
    base_path = os.path.dirname(os.getcwd())
    config_path = os.path.join(base_path, "config.json")
    
    with open(config_path, "r") as f:
        config = json.load(f)
    
    normalized_path = os.path.join(base_path, config["paths"]["intermediate"]["normalized"])
    outputs_path = os.path.join(base_path, config["paths"]["outputs"])
    experiment_folder = os.path.join(outputs_path, "experiments", f"{experiment_name}_{timestamp}")
    os.makedirs(experiment_folder, exist_ok=True)
    
    # Procesar datasets
    datasets = {
        "MaxAbs": os.path.join(normalized_path, "02_df_Maxabs.parquet"),
        "MinMax": os.path.join(normalized_path, "02_df_MinMax.parquet"),
        "NoNorm": os.path.join(normalized_path, "02_df_None.parquet"),
        "Robust": os.path.join(normalized_path, "02_df_Robust.parquet"),
        "Standard": os.path.join(normalized_path, "02_df_Standard.parquet")
    }
    
    for norm_name, path in datasets.items():
        process_dataset(path, norm_name, experiment_folder)

if __name__ == "__main__":
    main()


Procesando MaxAbs
Memoria inicial: 4.32 GB
Columnas válidas: 106/567

Partición 1/1
Grupo 22/22
[SELECTOR] Linear
✅ Reporte CSV guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_232410\MaxAbs\MaxAbs_Linear_report.csv
✅ Dataset Parquet guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_232410\MaxAbs\MaxAbs_Linear_final.parquet

[SELECTOR] Nonlinear
✅ Reporte CSV guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_232410\MaxAbs\MaxAbs_Nonlinear_report.csv
✅ Dataset Parquet guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_232410\MaxAbs\MaxAbs_Nonlinear_final.parquet

[SELECTOR] Model-Based
✅ Reporte CSV guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\exper

In [11]:
import os
import numpy as np
import pandas as pd
import dask.dataframe as dd
from sklearn.feature_selection import f_classif, mutual_info_classif
from sklearn.ensemble import RandomForestClassifier
import gc
import json
from datetime import datetime

# =============================================
# Configuración crítica de rendimiento
# =============================================
MAX_FEATURES = 500
COLS_PER_GROUP = 5
DTYPE = np.float32
SAMPLE_SIZE_MODEL = 50000
SAMPLE_FRACTION = 0.3

def get_memory_usage():
    """Obtiene el uso de memoria actual en GB."""
    import psutil
    return psutil.Process(os.getpid()).memory_info().rss / 1024 ** 3

def is_boolean_column(series):
    """Verifica si una columna es booleana (0/1)."""
    unique_values = series.dropna().unique()
    return set(unique_values).issubset({0, 1})

def safe_transform(data, transform_fn):
    """Aplica transformaciones numéricas seguras."""
    with np.errstate(all='ignore'):
        result = transform_fn(np.abs(data))
    return np.nan_to_num(result, nan=0.0, posinf=0.0, neginf=0.0).astype(DTYPE)

def process_feature_group(df, cols_group, target_col):
    """Procesa características excluyendo el target."""
    features = []
    for col in cols_group:
        # Excluir el target de las transformaciones
        if col == target_col:
            continue
            
        col_data = df[col].astype(DTYPE).values
        
        if not is_boolean_column(df[col]):
            target_data = df[target_col].astype(DTYPE).values  # Usar target para interacción
            
            features.extend([
                col_data,
                safe_transform(col_data, np.log1p),
                safe_transform(col_data, np.sqrt),
                col_data * target_data  # Interacción
            ])
        else:
            features.append(col_data)
    
    return np.column_stack(features)

def clean_nan(df):
    """Limpieza agresiva de NaN."""
    df = df.dropna(axis=1, how='all')
    for col in df.columns:
        if df[col].isna().any():
            median_val = df[col].median(skipna=True)
            df[col] = df[col].fillna(median_val if not np.isnan(median_val) else 0.0)
    return df

def process_dataset(dataset_path, norm_name, experiment_folder):
    print(f"\nProcesando {norm_name}")
    print(f"Memoria inicial: {get_memory_usage():.2f} GB")
    
    # Configuración de rutas
    output_folder = os.path.join(experiment_folder, norm_name)
    os.makedirs(output_folder, exist_ok=True)
    
    # Carga de datos con Dask
    ddf = dd.read_parquet(dataset_path)
    target_col = 'nivel_triage'
    
    # Identificar columnas numéricas (excluyendo target)
    numeric_cols = [col for col in ddf.columns 
                   if (col != target_col) and (np.issubdtype(ddf[col].dtype, np.number))]
    
    # Filtrar booleanas
    non_boolean = []
    for col in numeric_cols:
        sample = ddf[col].head(500)
        if not is_boolean_column(sample):
            non_boolean.append(col)
    
    print(f"Columnas válidas: {len(non_boolean)}/{len(numeric_cols)}")
    
    # Procesamiento en grupos
    col_groups = [non_boolean[i:i+COLS_PER_GROUP] 
                 for i in range(0, len(non_boolean), COLS_PER_GROUP)]
    
    # Procesar por particiones
    for partition_idx in range(ddf.npartitions):
        print(f"\nPartición {partition_idx+1}/{ddf.npartitions}")
        partition_df = ddf.get_partition(partition_idx).compute()
        target_values = partition_df[target_col].astype(DTYPE).values  # Guardar target
        
        all_features = []
        feature_names = []
        
        for group_idx, cols in enumerate(col_groups):
            print(f"Grupo {group_idx+1}/{len(col_groups)}", end='\r')
            
            # Generar características (excluyendo target)
            features = process_feature_group(partition_df, cols, target_col)
            names = [f"{col}_{suf}" for col in cols 
                    for suf in ['orig', 'log', 'sqrt', 'interact']]
            
            all_features.append(features)
            feature_names.extend(names)
        
        # Crear DataFrame temporal CON target
        temp_df = pd.DataFrame(np.hstack(all_features), columns=feature_names, dtype=DTYPE)
        temp_df[target_col] = target_values  # Añadir target sin transformar
        
        # Guardar en disco
        temp_path = os.path.join(output_folder, f"temp_part_{partition_idx}.parquet")
        temp_df.to_parquet(temp_path)
        
        del temp_df, partition_df
        gc.collect()
    
    # =============================================
    # Selección de características (excluyendo target)
    # =============================================
    selectors = {
        "Linear": f_classif,
        "Nonlinear": mutual_info_classif,
        "Model-Based": RandomForestClassifier(n_estimators=30, n_jobs=-1)
    }
    
    for sel_name, selector in selectors.items():
        print(f"\n[SELECTOR] {sel_name}")
        
        # Variables inicializadas para evitar errores
        full_ddf, sample, X, y = None, None, None, None
        
        try:
            # Cargar datos
            full_ddf = dd.read_parquet(os.path.join(output_folder, "temp_part_*.parquet"))
            
            # Muestreo para reducir memoria
            sample = full_ddf.sample(frac=SAMPLE_FRACTION).compute()
            X = sample.drop(target_col, axis=1)  # Excluir target de features
            y = sample[target_col]               # Target separado
            
            # Limpieza final
            X = clean_nan(X)
            
            # Verificar NaN
            if X.isna().sum().sum() > 0:
                raise ValueError(f"NaN residuales: {X.isna().sum().sum()}")
            
            # Selección de características
            if sel_name == "Model-Based":
                model = selector.fit(X.sample(SAMPLE_SIZE_MODEL), y.sample(SAMPLE_SIZE_MODEL))
                scores = pd.Series(model.feature_importances_, index=X.columns)
            else:
                with np.errstate(all='ignore'):
                    scores = pd.Series(selector(X, y)[0], index=X.columns)
            
            # Seleccionar top features
            scores = scores.replace([np.inf, -np.inf], np.nan).fillna(0)
            top_features = scores.nlargest(MAX_FEATURES).index.tolist()
            
            # Guardar dataset final CON target
            final_df = full_ddf[top_features + [target_col]].compute()
            
            output_path = os.path.join(output_folder, f"{norm_name}_{sel_name}_final.parquet")
            final_df.to_parquet(output_path, engine='pyarrow', compression='snappy')
            print(f"✅ Guardado: {output_path} ({final_df.shape})")
            
        except Exception as e:
            print(f"❌ Error en {sel_name}: {str(e)}")
        finally:
            # Limpieza segura de variables
            for var in ['full_ddf', 'sample', 'X', 'y']:
                if var in locals():
                    del locals()[var]
            gc.collect()
    
    # Limpiar temporales
    for f in os.listdir(output_folder):
        if f.startswith("temp_"):
            os.remove(os.path.join(output_folder, f))

def main():
    experiment_name = "experimento_final"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Configuración de paths
    base_path = os.path.dirname(os.getcwd())
    config_path = os.path.join(base_path, "config.json")
    
    with open(config_path, "r") as f:
        config = json.load(f)
    
    normalized_path = os.path.join(base_path, config["paths"]["intermediate"]["normalized"])
    outputs_path = os.path.join(base_path, config["paths"]["outputs"])
    experiment_folder = os.path.join(outputs_path, "experiments", f"{experiment_name}_{timestamp}")
    os.makedirs(experiment_folder, exist_ok=True)
    
    # Procesar datasets
    datasets = {
        "MaxAbs": os.path.join(normalized_path, "02_df_Maxabs.parquet"),
        "MinMax": os.path.join(normalized_path, "02_df_MinMax.parquet"),
        "NoNorm": os.path.join(normalized_path, "02_df_None.parquet"),
        "Robust": os.path.join(normalized_path, "02_df_Robust.parquet"),
        "Standard": os.path.join(normalized_path, "02_df_Std.parquet")
    }
    
    for norm_name, path in datasets.items():
        process_dataset(path, norm_name, experiment_folder)

if __name__ == "__main__":
    main()


Procesando MaxAbs
Memoria inicial: 5.13 GB
Columnas válidas: 106/567

Partición 1/1
Grupo 22/22
[SELECTOR] Linear
✅ Guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_174201\MaxAbs\MaxAbs_Linear_final.parquet ((560486, 425))

[SELECTOR] Nonlinear
✅ Guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_174201\MaxAbs\MaxAbs_Nonlinear_final.parquet ((560486, 425))

[SELECTOR] Model-Based
✅ Guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_174201\MaxAbs\MaxAbs_Model-Based_final.parquet ((560486, 425))

Procesando MinMax
Memoria inicial: 3.11 GB
Columnas válidas: 106/567

Partición 1/1
Grupo 22/22
[SELECTOR] Linear
✅ Guardado: c:\Users\Administrador\Documents\PythonScripts\Tesis\tesisaustral\outputs\experiments\experimento_final_20250129_174201\MinMax\MinMax_Linear_final.parquet 

FileNotFoundError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: c:/Users/Administrador/Documents/PythonScripts/Tesis/tesisaustral/intermediate/normalized/02_df_No_Norm.parquet