In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE # Para balancear el dataset

# --- Código de Preprocesamiento de Datos (Proporcionado por el usuario) ---

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Realiza la limpieza inicial del conjunto de datos
    """
    df_clean = df.copy()
    # Eliminar columna 'id' si existe y no es útil como característica
    if 'id' in df_clean.columns:
        df_clean = df_clean.drop(columns=['id'])

    # Manejo de valores faltantes en BMI
    # Asegurarse de que 'Other' en gender no cause problemas si es una categoría rara
    df_clean = df_clean[df_clean['gender'] != 'Other'] # O imputar si se prefiere

    # Imputar BMI ANTES de eliminar outliers que puedan afectarlo
    imputer = SimpleImputer(strategy='median')
    df_clean['bmi'] = imputer.fit_transform(df_clean[['bmi']])

    # Eliminar filas donde 'smoking_status' es 'Unknown' podría ser una opción,
    # o tratarlo como una categoría separada si LabelEncoder lo maneja.
    # Por ahora, LabelEncoder lo tratará como una categoría más.

    # Eliminar valores atípicos extremos en glucose_level (opcional, considerar impacto)
    # q_low = df_clean['avg_glucose_level'].quantile(0.01) # Considerar umbrales menos extremos
    # q_high = df_clean['avg_glucose_level'].quantile(0.99)
    # df_clean = df_clean[
    #     (df_clean['avg_glucose_level'] >= q_low) &
    #     (df_clean['avg_glucose_level'] <= q_high)
    # ]
    # Considerar también outliers en 'age' y 'bmi' si es necesario

    return df_clean

def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Realiza la ingeniería de características
    """
    df_engineered = df.copy()

    # Crear categorías de BMI
    df_engineered['bmi_category'] = pd.cut(
        df_engineered['bmi'],
        bins=[0, 18.5, 24.9, 29.9, np.inf],
        labels=['Bajo peso', 'Normal', 'Sobrepeso', 'Obeso'],
        right=False # Asegura que 18.5 cae en 'Normal', etc.
    )

    # Crear categorías de edad
    df_engineered['age_group'] = pd.cut(
        df_engineered['age'],
        bins=[0, 18, 35, 50, 65, np.inf],
        labels=['<18', '18-35', '36-50', '51-65', '>65'],
        right=False
    )

    # Crear categorías de glucosa
    df_engineered['glucose_category'] = pd.cut(
        df_engineered['avg_glucose_level'],
        bins=[0, 70, 100, 125, np.inf],
        labels=['Bajo', 'Normal', 'Pre-diabetes', 'Diabetes'],
        right=False
    )

    # Convertir las nuevas columnas categóricas a tipo 'category' puede ser útil
    for col in ['bmi_category', 'age_group', 'glucose_category']:
         if col in df_engineered.columns: # Check if column exists after potential filtering
            df_engineered[col] = df_engineered[col].astype('category')


    return df_engineered

def encode_variables(df: pd.DataFrame) -> pd.DataFrame:
    """
    Codifica las variables categóricas usando Label Encoding.
    PRECAUCIÓN: LabelEncoder asigna números arbitrarios (0, 1, 2...).
    Para redes neuronales, One-Hot Encoding suele ser preferible para
    variables nominales (sin orden inherente) para evitar que el modelo
    interprete un orden inexistente. Para simplificar, usamos LabelEncoder aquí,
    pero considera pd.get_dummies() para un enfoque más robusto.
    """
    df_encoded = df.copy()

    # Identificar columnas categóricas (object o category dtype)
    categorical_columns = df_encoded.select_dtypes(include=['object', 'category']).columns

    encoders = {} # Guardar encoders por si se necesitan después
    for column in categorical_columns:
        # Asegurarse de que no haya NaNs antes de codificar
        if df_encoded[column].isnull().any():
           # Opción 1: Imputar con una categoría específica como 'Desconocido'
           df_encoded[column] = df_encoded[column].cat.add_categories('Desconocido').fillna('Desconocido')
           # Opción 2: Imputar con la moda (si aplica)
           # mode_val = df_encoded[column].mode()[0]
           # df_encoded[column] = df_encoded[column].fillna(mode_val)

        le = LabelEncoder()
        df_encoded[column] = le.fit_transform(df_encoded[column])
        encoders[column] = le # Guardar el encoder

    # Eliminar las columnas numéricas originales que fueron categorizadas si se desea
    # df_encoded = df_encoded.drop(columns=['age', 'bmi', 'avg_glucose_level'], errors='ignore')

    return df_encoded, encoders # Devolver encoders puede ser útil

# Función para escalar y dividir datos
def prepare_for_modeling(df: pd.DataFrame,
                         target: str = 'stroke',
                         test_size: float = 0.2,
                         random_state: int = 42) -> tuple:
    """
    Prepara los datos para el modelado: separa X/y, divide train/test, y escala X.
    """
    # Separar características y objetivo
    X = df.drop(columns=[target])
    y = df[target]

    # División train-test
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=y # Stratify es importante en clasificación desbalanceada
    )

    # Escalado de características numéricas
    # Identificar columnas numéricas para escalar (excluir las ya codificadas si son el resultado final)
    # Si usamos LabelEncoder en todo, todas las X serán numéricas en este punto.
    numeric_cols = X_train.select_dtypes(include=np.number).columns # Asumiendo que todas son numéricas tras encode

    scaler = StandardScaler()
    # Ajustar y transformar en el conjunto de entrenamiento
    X_train_scaled = scaler.fit_transform(X_train[numeric_cols])
    # Solo transformar en el conjunto de prueba (usando el ajuste del entrenamiento)
    X_test_scaled = scaler.transform(X_test[numeric_cols])

    # Convertir de nuevo a DataFrame para mantener nombres de columnas (opcional pero útil)
    X_train_scaled = pd.DataFrame(X_train_scaled, index=X_train.index, columns=numeric_cols)
    X_test_scaled = pd.DataFrame(X_test_scaled, index=X_test.index, columns=numeric_cols)

    # Asegurarse de que y_train, y_test sean Series de Pandas para el paso de SMOTE
    if not isinstance(y_train, pd.Series): y_train = pd.Series(y_train, index=X_train.index)
    if not isinstance(y_test, pd.Series): y_test = pd.Series(y_test, index=X_test.index)


    return X_train_scaled, X_test_scaled, y_train, y_test, scaler

def balance_dataset(X_train, y_train, random_state=42):
    """
    Aplica SMOTE para balancear el conjunto de datos de entrenamiento.
    """
    print(f"Distribución antes de SMOTE:\n{y_train.value_counts(normalize=True)}")
    # Manejo del desbalance de clases con SMOTE
    smote = SMOTE(random_state=random_state)
    # SMOTE espera arrays de numpy generalmente
    X_train_np = X_train.to_numpy() if isinstance(X_train, pd.DataFrame) else X_train
    y_train_np = y_train.to_numpy() if isinstance(y_train, pd.Series) else y_train

    X_train_balanced, y_train_balanced = smote.fit_resample(X_train_np, y_train_np)
    print(f"Forma después de SMOTE: {X_train_balanced.shape}")
    print(f"Distribución después de SMOTE:\n{pd.Series(y_train_balanced).value_counts(normalize=True)}")
    return X_train_balanced, y_train_balanced


# --- Definición de la Red Neuronal (Proporcionada por el usuario) ---
class FeedForwardNN(nn.Module):
    def __init__(self, input_size, hidden_sizes=[128, 64, 32]):
        super().__init__()
        layers = []
        prev_size = input_size

        # Capas ocultas con normalización por lotes y activación mejorada
        for hidden_size in hidden_sizes:
            layers.extend([
                nn.Linear(prev_size, hidden_size),
                nn.BatchNorm1d(hidden_size), # BatchNorm ayuda a estabilizar
                nn.ReLU(),
                nn.Dropout(0.3) # Dropout para regularización
            ])
            prev_size = hidden_size

        # Capa de salida
        layers.append(nn.Linear(prev_size, 1)) # Salida única para clasificación binaria
        # No añadimos Sigmoid aquí, usaremos BCEWithLogitsLoss que es más estable
        # layers.append(nn.Sigmoid())

        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)


# --- Función para preparar datos para PyTorch ---
def prepare_data(X_train, X_test, y_train, y_test):
    """
    Prepara los datos (NumPy arrays) para PyTorch y los mueve al dispositivo.
    """
    # Configurar dispositivo para PyTorch
    if torch.backends.mps.is_available():
        device = torch.device("mps")
        print("Usando Apple Metal (MPS)")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
        print("Usando CUDA GPU")
    else:
        device = torch.device("cpu")
        print("Usando CPU")

    # Convertir DataFrames/Series a arreglos numpy si aún no lo son
    if isinstance(X_train, pd.DataFrame): X_train = X_train.to_numpy()
    if isinstance(X_test, pd.DataFrame): X_test = X_test.to_numpy()
    if isinstance(y_train, pd.Series): y_train = y_train.to_numpy()
    if isinstance(y_test, pd.Series): y_test = y_test.to_numpy()

    # Asegurar que X_train y X_test sean bidimensionales (ya deberían serlo tras StandardScaler)
    if len(X_train.shape) == 1: X_train = X_train.reshape(-1, 1)
    if len(X_test.shape) == 1: X_test = X_test.reshape(-1, 1)

    # Convertir a Tensores de PyTorch (usar FloatTensor para X, podría ser LongTensor para y si la loss lo requiere, pero BCEWithLogitsLoss prefiere Float)
    X_train = torch.FloatTensor(X_train).to(device)
    X_test = torch.FloatTensor(X_test).to(device)
    y_train = torch.FloatTensor(y_train).reshape(-1, 1).to(device) # BCEWithLogitsLoss espera y como Float
    y_test = torch.FloatTensor(y_test).reshape(-1, 1).to(device)

    return X_train, X_test, y_train, y_test, device


# --- Flujo Principal ---

# 1. Carga de datos
DATA_PATH = "data/healthcare-dataset-stroke-data.csv" # Asegúrate que la ruta sea correcta
try:
    df = pd.read_csv(DATA_PATH)
except FileNotFoundError:
    print(f"Error: No se encontró el archivo en {DATA_PATH}")
    exit() # Salir si no se encuentra el archivo


# 2. Preprocesamiento
df_clean = clean_data(df)
df_engineered = engineer_features(df_clean)
df_encoded, _ = encode_variables(df_engineered) # Guardamos los encoders por si acaso

# 3. Preparación para Modelado (Divide y Escala)
X_train_scaled, X_test_scaled, y_train, y_test, scaler = prepare_for_modeling(df_encoded)

# 4. Balanceo del Dataset de Entrenamiento (SMOTE)
#    Asegúrate que X_train_scaled es NumPy array o compatible con SMOTE
X_train_balanced, y_train_balanced = balance_dataset(X_train_scaled, y_train)

# 5. Preparación para PyTorch (Convertir a Tensores y mover a Dispositivo)
X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, device = prepare_data(
    X_train_balanced, X_test_scaled.to_numpy(), y_train_balanced, y_test.to_numpy() # Asegurar que X_test es numpy
)

# --- Configuración del Entrenamiento ---

# Hiperparámetros
INPUT_SIZE = X_train_tensor.shape[1] # Número de características tras preprocesamiento
HIDDEN_SIZES = [128, 64, 32] # Arquitectura definida en la clase
OUTPUT_SIZE = 1 # Salida única para clasificación binaria
LEARNING_RATE = 0.001
EPOCHS = 100 # Número de veces que se itera sobre todo el dataset
BATCH_SIZE = 64 # Tamaño de los lotes de datos para entrenar

# Crear DataLoaders para manejar los lotes eficientemente
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False) # No barajar en test

# Instanciar el modelo y moverlo al dispositivo
model = FeedForwardNN(input_size=INPUT_SIZE, hidden_sizes=HIDDEN_SIZES).to(device)

# Función de Pérdida y Optimizador
# Usar BCEWithLogitsLoss es numéricamente más estable que Sigmoid + BCELoss
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# --- Bucle de Entrenamiento ---
print("\n--- Iniciando Entrenamiento ---")
for epoch in range(EPOCHS):
    model.train() # Poner el modelo en modo entrenamiento (activa Dropout, BatchNorm en modo train)
    running_loss = 0.0

    for i, (inputs, labels) in enumerate(train_loader):
        # Los DataLoaders ya deberían tener los tensores en el dispositivo correcto
        # si se crearon a partir de tensores ya movidos. Si no:
        # inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels) # Calcular la pérdida

        # Backward pass y optimización
        optimizer.zero_grad() # Limpiar gradientes anteriores
        loss.backward() # Calcular gradientes
        optimizer.step() # Actualizar pesos

        running_loss += loss.item()

    # Imprimir pérdida promedio de la época
    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {epoch_loss:.4f}")

    # --- Evaluación Opcional por Época (en el conjunto de test) ---
    if (epoch + 1) % 10 == 0: # Evaluar cada 10 épocas, por ejemplo
        model.eval() # Poner el modelo en modo evaluación (desactiva Dropout, BatchNorm usa estadísticas acumuladas)
        test_loss = 0.0
        correct = 0
        total = 0
        all_preds = []
        all_labels = []

        with torch.no_grad(): # Desactivar cálculo de gradientes para evaluación
            for inputs, labels in test_loader:
                # inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                test_loss += loss.item()

                # Convertir logits a probabilidades (aplicando Sigmoid) y luego a predicciones (0 o 1)
                predicted = torch.sigmoid(outputs) > 0.5
                # predicted = (outputs > 0).float() # Alternativa si se usa BCEWithLogitsLoss (logits > 0 equivale a sigmoid > 0.5)

                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                # Guardar para reporte de clasificación
                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())


        avg_test_loss = test_loss / len(test_loader)
        accuracy = 100 * correct / total
        print(f"Epoch [{epoch+1}/{EPOCHS}] - Evaluación Test: Loss: {avg_test_loss:.4f}, Accuracy: {accuracy:.2f}%")
        # print(classification_report(all_labels, all_preds, target_names=['No Stroke', 'Stroke']))


print("\n--- Entrenamiento Finalizado ---")

# --- Evaluación Final ---
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for inputs, labels in test_loader:
        # inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        # Convertir logits a predicciones
        predicted = (torch.sigmoid(outputs) > 0.5).float()
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

print("\n--- Reporte de Clasificación Final (Test Set) ---")
print(confusion_matrix(all_labels, all_preds))
print(classification_report(all_labels, all_preds, target_names=['No Stroke (0)', 'Stroke (1)']))
final_accuracy = accuracy_score(all_labels, all_preds)
print(f"Accuracy Final en Test: {final_accuracy * 100:.2f}%")

# Puedes guardar el modelo entrenado si lo deseas
# torch.save(model.state_dict(), 'stroke_ffnn_model.pth')
# print("Modelo guardado en stroke_ffnn_model.pth")

Distribución antes de SMOTE:
stroke
0    0.951309
1    0.048691
Name: proportion, dtype: float64
Forma después de SMOTE: (7776, 13)
Distribución después de SMOTE:
0    0.5
1    0.5
Name: proportion, dtype: float64
Usando CPU

--- Iniciando Entrenamiento ---
Epoch [1/100], Loss: 0.5196
Epoch [2/100], Loss: 0.4452
Epoch [3/100], Loss: 0.4235
Epoch [4/100], Loss: 0.4137
Epoch [5/100], Loss: 0.3976
Epoch [6/100], Loss: 0.3886
Epoch [7/100], Loss: 0.3824
Epoch [8/100], Loss: 0.3752
Epoch [9/100], Loss: 0.3682
Epoch [10/100], Loss: 0.3595
Epoch [10/100] - Evaluación Test: Loss: 0.4430, Accuracy: 75.44%
Epoch [11/100], Loss: 0.3482
Epoch [12/100], Loss: 0.3410
Epoch [13/100], Loss: 0.3395
Epoch [14/100], Loss: 0.3348
Epoch [15/100], Loss: 0.3279
Epoch [16/100], Loss: 0.3232
Epoch [17/100], Loss: 0.3180
Epoch [18/100], Loss: 0.3070
Epoch [19/100], Loss: 0.3054
Epoch [20/100], Loss: 0.3088
Epoch [20/100] - Evaluación Test: Loss: 0.4054, Accuracy: 78.47%
Epoch [21/100], Loss: 0.3004
Epoch [22/10