# MLP Binario con dataset real

In [None]:
# Manejo de datos
import numpy as np
import pandas as pd
import random
import os

# Modelos de Deep Learning
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import Counter

#Preprocesamiento de datos
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Gráficos
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('fivethirtyeight')
plt.rcParams['lines.linewidth'] = 1.5
plt.rcParams['font.size'] = 10

# Configuración warnings (Quitar en caso de errores desconocidos)
import warnings
warnings.filterwarnings('ignore')

# Versión de paquetes usados
color = '\033[1m\033[38;5;208m'
print(f"{color}Versión de las librerias utilizadas:")
print(f"{color}- Version torch: {torch.__version__}")
print(f"{color}- Version pandas: {pd.__version__}")
print(f"{color}- Version numpy: {np.__version__}")
print(f"{color}- Version sklearn: {pd.__version__}")

In [None]:
#numero total de datos a usar de los 100k de datos
num_datos = 1001 #1k, 5k, 10k, 50k, 100k

In [None]:
#Cargar Dataset
try:
    data = pd.read_csv(f'/content/gdrive/My Drive/G11/Datasets/dataset_original.csv')
    print("Archivo cargado correctamente.")
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Convertir variables categóricas a numéricas
data = pd.get_dummies(data, columns=['gender', 'smoking_history'], drop_first=True)

# Eliminar filas con valores nulos si los hubiera
data.dropna(inplace=True)

# Renombrar columnas para que no tengan caracteres especiales
data.columns = data.columns.str.replace('[^A-Za-z0-9_]+', '', regex=True)

print("Preprocesamiento listo. Nuevas columnas:")
print(data.columns)
data.head()
data_sample = data

In [None]:
seed = 110425
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

In [None]:
#Resample manteniendo proporcion de datos original
class_0 = data[data['diabetes'] == 0].sample(n=int(num_datos*0.915), random_state=42) 
class_1 = data[data['diabetes'] == 1].sample(n=int(num_datos*0.085), random_state=42) 
balanced_train_data = pd.concat([class_0, class_1]).sample(frac=1, random_state=42).reset_index(drop=True)
data_sample = balanced_train_data

In [None]:
# 1. Separar características (x) y objetivo (y) de la muestra
x = data_sample.drop(columns=["diabetes"]).to_numpy()
y = data_sample["diabetes"].to_numpy()

# 2. Dividir en conjuntos de entrenamiento, validación y prueba 
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.20, random_state=42, stratify=y) # 20% para test
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.15, random_state=42, stratify=y_train) # 15% del resto para validación

# 3. Aplicar el escalado
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val) # Usar el mismo scaler del train
X_test = scaler.transform(X_test) # Usar el mismo scaler del train


In [None]:
#Aplicar Tensores
#Train
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)

#Val
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.long)

#Test
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

In [None]:
#Definir los Batchs del dataset
batchsize = 32
class DiabetesDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_data = DiabetesDataset(X_train, y_train)
val_data = DiabetesDataset(X_val, y_val)
test_data = DiabetesDataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size=batchsize, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batchsize, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batchsize, shuffle=False)

In [None]:
#Definir el Modelo a usar
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.hc1 = nn.Linear(x.shape[1], 256) # Ajuste automático al número de columnas
        self.hc2 = nn.Linear(256, 128)
        self.hc3 = nn.Linear(128, 64)
        self.hc4 = nn.Linear(64, 2)
        self.act = nn.LeakyReLU()
        self.dp1 = nn.Dropout(0.4)
        self.dp2 = nn.Dropout(0.2)

    def forward(self, input):
        l1 = self.dp1(self.act(self.hc1(input)))
        l2 = self.dp2(self.act(self.hc2(l1)))
        l3 = self.act(self.hc3(l2))
        output = self.hc4(l3)
        return output

In [None]:
# Inicializa el Modelo, define el Learning Rate y el optimizador
testeo = MLP()
lr = 1e-4
opt = torch.optim.Adam(testeo.parameters(), lr=lr, weight_decay=1e-4)

#Calcula peso para las clases para posteriormente darle mas importancia las clases minoritaria
class_counts = Counter(y)
print(class_counts)
total_samples = len(y)
num_classes = len(class_counts)
class_weights = [0.0] * num_classes
for class_id, count in class_counts.items():
    class_weights[class_id] = total_samples / count
print("\nPesos iniciales calculados (más alto para clases raras):")
print(class_weights)
weights_tensor = torch.tensor(class_weights, dtype=torch.float)
print(f"\nTensor de pesos para CrossEntropyLoss: {weights_tensor}")

#Se define loss con los weight previamente calculados
loss_func = nn.CrossEntropyLoss(weight=weights_tensor)


In [None]:
# Direccion donde estan o se guardarn los pesos y el modelo
testeo_path = f'/content/gdrive/My Drive/G11/Guardar_modelo/MLP/Binary_original/Binary_original_{num_datos}_pytorch.pth'
testeo_path_FULL = f'/content/gdrive/My Drive/G11/Guardar_modelo/MLP/Binary_original/Binary_original_{num_datos}_pytorch_FULL.pth'

In [None]:
# Cargar Pesos
#testeo.load_state_dict(torch.load(testeo_path))
# Cargar Modelo
#testeo = torch.load(testeo_path_FULL)

In [None]:
#Entrenamiento

n_epochs = 2000

history = {
    "TL" : [],
    "VL" : []
}

early_stopping = {
    "delta" : 1e-5,
    "patience": 10
}

best_val_loss = float("inf")
aux = 0

for i in range(n_epochs+1):
    testeo.train()
    epoch_loss = 0
    for X_batch, y_batch in train_loader:  
      output = testeo(X_batch)
      loss = loss_func(output, y_batch) 
      
      opt.zero_grad()
      loss.backward()
      opt.step()

      epoch_loss += loss.item()
    epoch_loss /= len(train_loader)
    history["TL"].append(epoch_loss)

    testeo.eval()
    epoch_loss = 0
    for X_batch, y_batch in val_loader:
      with torch.no_grad():
        preds_val = testeo(X_batch)
      val_loss = loss_func(preds_val, y_batch)

      epoch_loss += val_loss.item()
    epoch_loss /= len(val_loader)
    history["VL"].append(epoch_loss)
    
    if epoch_loss + early_stopping["delta"] < best_val_loss:
      best_val_loss = epoch_loss
      aux = 0
    else:
      aux += 1
      if aux >= early_stopping["patience"]:
        print(f"Terminando el entrenamiento en la época {i}")
        break
    

In [None]:
#Guardar Pesos
torch.save(testeo.state_dict(), testeo_path)
#Guardar Modelo Completo
torch.save(testeo, testeo_path_FULL)

In [None]:
#Grafico de loss en tre training y validacion
plt.plot(history["TL"], label="Train")
plt.plot(history["VL"], label="Val")
plt.xlabel('Epoch')
plt.ylabel('Loss Train')
plt.legend()
plt.show()

In [None]:
#Calculo de accuracy del cnjunto de entrenamiento y de testeo
#Train 
with torch.no_grad():
    output_train = testeo(X_train)
y_hat_train = output_train.numpy()
y_hat_train = [np.argmax(i) for i in y_hat_train]
print(f'Training Accuracy: {accuracy_score(y_train, y_hat_train):.2f}')
#Testeo 
with torch.no_grad():
    output = testeo(X_test)  
y_hat = output.numpy()
y_hat = [np.argmax(i) for i in y_hat]
print(f'Testing Accuracy: {accuracy_score(y_test, y_hat):.2f}')

#precision, recall f1-score support usando resultados de conjunto de testeo
print(classification_report(y_test, y_hat))

In [None]:
cm =confusion_matrix(y_test, y_hat)
cm_plot = ConfusionMatrixDisplay(confusion_matrix=cm)
cm_plot.plot()
plt.show()

# XGBoost(ML) Binario con dataset real

In [None]:
#Imports necesarios para el análisis de datos
import numpy as np
import pandas as pd

#Librerías para visualización
import matplotlib.pyplot as plt
import seaborn as sns

#XGBoost para el modelo de clasificación
import xgboost as xgb

#Herramientas de sklearn para entrenamiento y evaluación
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from collections import Counter

#Se usa para guardar el modelo entrenado
import joblib

#Suprimir warnings para una salida más limpia
import warnings
warnings.filterwarnings("ignore")

In [None]:
#Cargar el dataset de diabetes
data = pd.read_csv('/content/gdrive/My Drive/G11/Datasets/dataset_original.csv')

In [None]:
#Limpiar y estandarizar la columna de género
data['gender'] = data['gender'].astype(str).str.strip().str.upper()

In [None]:
#Mostrar información general del dataset para comprobar visualmente
data.info()

In [None]:
#Se recategoriza el historial de tabaquismo en grupos más simples
def recategorize_smoking(smoking_status):
    if smoking_status in ['never', 'No Info']:
        return 'non-smoker'
    elif smoking_status == 'current':
        return 'current'
    elif smoking_status in ['ever', 'former', 'not current']:
        return 'past_smoker'
    
data['smoking_history'] = data['smoking_history'].apply(recategorize_smoking)

In [None]:
#Configuración del tamaño de muestra para el entrenamiento
total_samples = 5000  

#Calcular muestras por clase manteniendo proporciones 91.5% / 8.5%
class_0_samples = int(total_samples * 0.915)
class_1_samples = int(total_samples * 0.085)

#print(f"=== CONFIGURACIÓN: {total_samples} muestras totales ===")
#print(f"Clase 0 (No diabetes): {class_0_samples}")
#print(f"Clase 1 (Diabetes): {class_1_samples}")

#Crear muestra manteniendo las proporciones originales
stratified_data = pd.concat([
    data[data['diabetes'] == 0].sample(n=min(class_0_samples, len(data[data['diabetes'] == 0])), random_state=42),
    data[data['diabetes'] == 1].sample(n=min(class_1_samples, len(data[data['diabetes'] == 1])), random_state=42)
]).sample(frac=1, random_state=42).reset_index(drop=True)

#print(f"\nDistribución final:")
#print(stratified_data['diabetes'].value_counts())
#print(f"Proporciones finales:")
#print(stratified_data['diabetes'].value_counts(normalize=True))

data = stratified_data

In [None]:
#En esta celda se configura el preprocesamiento de datos
preprocessor = ColumnTransformer(
    transformers=[
        #Primero se estandarizan las variables numéricas
        ('num', StandardScaler(), ['age', 'bmi', 'HbA1c_level', 'blood_glucose_level','hypertension','heart_disease']),
        #Luego se codificar las variables categóricas
        ('cat', OneHotEncoder(), ['gender','smoking_history'])
    ])

X = data.drop('diabetes', axis=1)
y = data['diabetes']

In [None]:
# Calcular pesos automáticamente para balancear clases desbalanceadas
class_counts = Counter(y)
#print(f"Conteo de clases: {class_counts}")

total_samples = len(y)
weight_for_0 = total_samples / class_counts[0] 
weight_for_1 = total_samples / class_counts[1]

#En XGBoost, scale_pos_weight es el ratio que penaliza más los errores en la clase minoritaria, esto se usa porque se tienen muy pocos datos de pacientes con diabetes
scale_pos_weight = weight_for_1 / weight_for_0
#print(f"Scale pos weight calculado: {scale_pos_weight}")

#Se crea el pipeline que incluye preprocesamiento y modelo con pesos balanceados
clf = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', xgb.XGBClassifier(scale_pos_weight=scale_pos_weight))])

In [None]:
#Se definen los hiperparámetros para optimizar con GridSearch
param_grid = {
    'classifier__n_estimators': [100],
    'classifier__max_depth': [3],
    'classifier__learning_rate': [0.001, 0.01],
    'classifier__subsample': [0.8],
    'classifier__colsample_bytree': [0.8]
}

In [None]:
#Se configura la búsqueda de hiperparámetros usando Cross-Validation
grid_search = GridSearchCV(clf, param_grid, cv=5)

#Aquí se dividen los datos en entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#Y finalmente se entrena el modelo con los mejores hiperparámetros
grid_search.fit(X_train, y_train)

#print("Best Parameters: ", grid_search.best_params_)

In [None]:
#Esta celda permite guardar el modelo entrenado
model_dir = f'/content/gdrive/My Drive/G11/Guardar_modelo/ML/Binary/'
os.makedirs(model_dir, exist_ok=True)

model_path = f'{model_dir}Binary_{total_samples}_xgboost.pkl'
joblib.dump(grid_search.best_estimator_, model_path) 

In [None]:
#Se hacen predicciones en el conjunto de prueba para evaluar el rendimiento del modelo
y_pred = grid_search.predict(X_test)

print("Model Accuracy: ", accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred))

#Finalmente se crea y se muestra la matriz de confusión
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

# TabNet Binario con dataset real

Importación de librerias

In [None]:
import pandas as pd
import numpy as np
import os

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import roc_auc_score, confusion_matrix, accuracy_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler

import torch
from pytorch_tabnet.tab_model import TabNetClassifier

import warnings
warnings.filterwarnings('ignore')

Ajustamos el número de muestras (El máximo del dataset es 100.000)

In [None]:
N_Samples = 50000

Carga del dataset

In [None]:
try:
    data = pd.read_csv(f'/content/gdrive/My Drive/G11/Datasets/dataset_original.csv')
except FileNotFoundError:
    print("Error: No se encontró el archivo 'dataset_original.csv'.")
    exit()

Codificar las features categoricas

In [None]:
target_column = 'diabetes'
categorical_features = ['gender', 'smoking_history']
numerical_features = [col for col in data.columns if col not in categorical_features + [target_column]]
# Codificar las características categóricas
label_encoders_X = {}
for col in categorical_features:
    le = LabelEncoder()
    data[col] = data[col].astype(str)
    data[col] = le.fit_transform(data[col])
    label_encoders_X[col] = le

Ajustar el número de muestras con la misma proporción entre clases

In [None]:
min_class_count = data['diabetes'].value_counts().min()
if N_Samples > len(data) or int(N_Samples * data['diabetes'].value_counts(normalize=True).min()) > min_class_count:
     print(f"Advertencia: El tamaño de muestra es muy grande. Se usará el dataset completo.")
     data = data.copy()
else:
    proportions = data['diabetes'].value_counts(normalize=True)
    data = data.groupby('diabetes', group_keys=False).apply(
        lambda x: x.sample(int(N_Samples * proportions[x.name]), random_state=42)
    )

print(f"Cantidad de datos a considerar: {len(data)}")
print("\nProporción de clases en el nuevo dataset:")
print(data['diabetes'].value_counts(normalize=True))

In [None]:
data[target_column] = data[target_column].astype(int)

X = data.drop(columns=[target_column])
y = data[target_column]

# Se usa un 80% para entrenamiento y un 20% para test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Escalado de las características numéricas
scaler = StandardScaler()
X_train.loc[:, numerical_features] = scaler.fit_transform(X_train[numerical_features])
X_test.loc[:, numerical_features] = scaler.transform(X_test[numerical_features])

X_train_np = X_train.values
y_train_np = y_train.values
X_test_np = X_test.values
y_test_np = y_test.values

## Saltar esta celda en caso de querer usar el modelo guardado -->

Preparación de parametros, inicialización y entrenamiento del modelo

In [None]:
# Requisisto para TabNet: Busca los índices y dimensiones de las características categóricas
cat_idxs = [X.columns.get_loc(col) for col in categorical_features]
# Requisito para TabNet: Calcula cuántas categorías únicas hay en cada columna categórica
cat_dims = [len(le.classes_) for le in label_encoders_X.values()]

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
weighted_loss_fn = torch.nn.CrossEntropyLoss() # Función de pérdida

# Inicialización del modelo TabNetClassifier
clf = TabNetClassifier(
    cat_idxs=cat_idxs,
    cat_dims=cat_dims,
    optimizer_params=dict(lr=1e-3), # Ajuste del learning rate
    optimizer_fn=torch.optim.Adam, # Se usa el optimizador Adam
    verbose=1,
    seed=42
)

# Entrenamiento del modelo TabNetClassifier
print(f"\n Iniciando Entrenamiento de TabNet:{' ' + DEVICE.upper()}")
clf.fit(
    X_train=X_train_np, y_train=y_train_np,
    eval_set=[(X_test_np, y_test_np)],
    patience=10, # Early stopping después de 10 epoch sin mejora
    max_epochs=100,
    eval_metric=['accuracy', 'logloss'], # Métricas de evaluación
    loss_fn=weighted_loss_fn, # Función de pérdida
    batch_size=256 # Número de muestras por cada paso de entrenamiento
)

In [None]:
directorio = os.getcwd()
carpeta = "/content/gdrive/My Drive/G11/Guardar_modelo/TabNet"
archivo_xgb = f'Binary_TabNet_{N_Samples}'

ruta = os.path.join(directorio, carpeta, archivo_xgb)

directorio_final_para_guardar = os.path.dirname(ruta)
os.makedirs(directorio_final_para_guardar, exist_ok=True)

clf.save_model(ruta)

Se puede cargar el modelo descomentando las primeras líneas

In [None]:
#Carga del modelo guardado
#preTrained = TabNetClassifier()
#preTrained.load_model(ruta)

y_pred = clf.predict(X_test_np) #Cambiar cfl por preTrained si se usa el modelo guardado
test_accuracy = accuracy_score(y_test_np, y_pred)

print("\n--- Resultados de la Evaluación Final ---")
print(f"Precisión (Accuracy) en el conjunto de prueba: {test_accuracy:.4f}")

print("\nReporte de Clasificación:")
print(classification_report(y_test_np, y_pred, target_names=['No Diabetes', 'Diabetes']))

print("\nMatriz de Confusión:")
cm = confusion_matrix(y_test_np, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['No Diabetes', 'Diabetes'], yticklabels=['No Diabetes', 'Diabetes'])
plt.xlabel('Prediction Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

print("\nCurva de Pérdida del Entrenamiento:")
train_loss = clf.history['loss']
val_loss = clf.history['val_0_logloss']
plt.figure(figsize=(10, 6))
plt.plot(train_loss, label='Train Loss (Logloss)')
plt.plot(val_loss, label='Validation Loss (Logloss)')
plt.title('Curva de Loss del Entrenamiento')
plt.xlabel('Epochs')
plt.ylabel('Loss train')
plt.legend()
plt.grid(True)
plt.show()

# GANs

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
import torch.optim as optim

from imblearn.over_sampling import SMOTE 

# --- Parámetros de la GAN y del Entrenamiento ---
LATENT_DIM = 100
EPOCHS = 0 
BATCH_SIZE = 64
SAMPLE_INTERVAL = 1 
LEARNING_RATE_G = 0.0002
LEARNING_RATE_D = 0.00001
BETA1 = 0.5 
SAVE_INTERVAL = 100

iteration = 3
generator_path = "/content/gdrive/My Drive/G11/Guardar_modelo/GANs/Multi/generator_gan_pytorch.pth"
discriminator_path = "/content/gdrive/My Drive/G11/Guardar_modelo/GANs/Multi/discriminator_gan_pytorch.pth"

# Configurar dispositivo (GPU si está disponible)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

# --- Cargar el archivo CSV original ---
input_file = '/content/gdrive/My Drive/G11/Datasets/datasetMulti.csv'
data = pd.read_csv(input_file)
# --- Eliminar columnas de identificación ---
data = data.drop(columns=['ID', 'No_Pation'])

# --- Estandatiza valores en las columnas CLASS y Gender ya que hay alguno ---
data['CLASS'] = data['CLASS'].astype(str).str.strip()
data['Gender'] = data['Gender'].astype(str).str.strip().str.upper()

# Mapear valores de 'Gender' a numéricos 
data['Gender'] = data['Gender'].map({'F': 0, 'M': 1})

# Mapear 'CLASS' (ahora 'CLASS') a valores numéricos 
class_mapping = {'N': 0, 'P': 1, 'Y': 2}
data['CLASS'] = data['CLASS'].map(class_mapping)

original_columns = data.columns.tolist()

# --- Verificar el conteo de clases antes de SMOTE ---
print("\n--- Conteo de valores para CLASS (Original antes de SMOTE): ---")
print(data['CLASS'].value_counts().sort_index())

# --- 2. Aplicar SMOTE para balancear las clases ---
print("\nIniciando balanceo con SMOTE...")

# Separar características (X) y variable objetivo (y)
X_smote = data.drop('CLASS', axis=1)
y_smote = data['CLASS']

# Definir la estrategia de muestreo para que todas tengan 844
sampling_strategy = {0: 844, 1: 844, 2: 844} 

#Se aplica SMOTE
smote = SMOTE(sampling_strategy=sampling_strategy, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_smote, y_smote)

# Recombinar los datos balanceados en un DataFrame para el preprocesamiento de la GAN
data_df = pd.DataFrame(X_resampled, columns=X_smote.columns)
data_df['CLASS'] = y_resampled

# --- Verificar el conteo de clases después de SMOTE ---
print("\n--- Conteo de valores para CLASS (Después de SMOTE): ---")
print(data_df['CLASS'].value_counts().sort_index())

# --- 3. Preprocesamiento ---
print("Iniciando preprocesamiento...")
processed_data_parts = []
scalers_dict = {}
column_info_for_generator_output = []

special_cols_log_scale = ['AGE', 'BMI']
diabetes_col = 'CLASS'
gender_col = 'Gender' # Nueva columna categórica

# B. Gender (One-Hot Encoding)
num_classes_gender = data_df[gender_col].nunique() # Debería ser 2 (M/F)
gender_one_hot = np.eye(num_classes_gender)[data_df[gender_col].astype(int)]
processed_data_parts.append(gender_one_hot)
column_info_for_generator_output.append({'name': gender_col, 'type': 'one_hot', 'num_classes': num_classes_gender})
print(f"Preprocesado {gender_col} con One-Hot Encoding. Shape: {gender_one_hot.shape}")

# C. Columnas con log1p + MinMaxScaler
for col_name in special_cols_log_scale:
    original_values = data_df[col_name].values.reshape(-1, 1)
    log_transformed_values = np.log1p(original_values)
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_values = scaler.fit_transform(log_transformed_values)
    processed_data_parts.append(scaled_values)
    scalers_dict[col_name] = {'scaler': scaler, 'log_applied': True, 'original_min': original_values.min(), 'original_max': original_values.max()}
    column_info_for_generator_output.append({'name': col_name, 'type': 'scaled_continuous_sigmoid'})
    print(f"Preprocesado {col_name} con log1p + MinMaxScaler. Shape: {scaled_values.shape}")

# D. Otras columnas (MinMaxScaler para [0,1])

other_cols = [col for col in original_columns if col not in [gender_col] + special_cols_log_scale]
for col_name in other_cols:
    original_values = data_df[col_name].values.reshape(-1, 1)
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_values = scaler.fit_transform(original_values)
    processed_data_parts.append(scaled_values)
    scalers_dict[col_name] = {'scaler': scaler, 'log_applied': False, 'original_min': original_values.min(), 'original_max': original_values.max()}
    column_info_for_generator_output.append({'name': col_name, 'type': 'scaled_continuous_sigmoid'})
    print(f"Preprocesado {col_name} con MinMaxScaler. Shape: {scaled_values.shape}")

X_train_processed_np = np.concatenate(processed_data_parts, axis=1).astype(np.float32)
DATA_DIM = X_train_processed_np.shape[1]
print(f"Forma final de los datos procesados (X_train_processed_np): {X_train_processed_np.shape}")

# Convertir datos a tensores de PyTorch
X_train_tensor = torch.tensor(X_train_processed_np, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_resampled.astype(int), dtype=torch.long).to(device)
num_classes = len(np.unique(y_resampled))

# --- 3. Definir el modelo GAN (PyTorch) ---

class Generator(nn.Module):
    def __init__(self, latent_dim, n_classes, data_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(latent_dim + n_classes, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            nn.BatchNorm1d(512, momentum=0.8),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.BatchNorm1d(1024, momentum=0.8),
            nn.Linear(1024, 2048),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.2),
            nn.BatchNorm1d(2048, momentum=0.8),
            nn.Linear(2048, 4096),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.1),
            nn.BatchNorm1d(4096, momentum=0.8),
            nn.Linear(4096, data_dim),
            nn.Sigmoid() # Salida general en [0,1]
        )

    def forward(self, z, class_onehot):
        x = torch.cat([z, class_onehot], dim=1)
        return self.model(x)

class Discriminator(nn.Module):
    def __init__(self, data_dim, n_classes):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(data_dim + n_classes, 1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.Linear(512, 1),
            nn.Sigmoid() # Salida binaria (real/falso)
        )

    def forward(self, x, class_onehot):
        x = torch.cat([x, class_onehot], dim=1)
        return self.model(x)

# Inicializar generador y discriminador
generator = Generator(LATENT_DIM, num_classes, DATA_DIM).to(device)
discriminator = Discriminator(DATA_DIM, num_classes).to(device)

# Load models
generator.load_state_dict(torch.load(generator_path, map_location=device))
discriminator.load_state_dict(torch.load(discriminator_path, map_location=device))

# Función de pérdida
adversarial_loss = nn.BCELoss().to(device)

# Optimizadores
optimizer_G = optim.Adam(generator.parameters(), lr=LEARNING_RATE_G, betas=(BETA1, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=LEARNING_RATE_D, betas=(BETA1, 0.999))

print("\n--- Arquitectura del Generador (PyTorch) ---")
print(generator)
print("\n--- Arquitectura del Discriminador (PyTorch) ---")
print(discriminator)


# --- 4. Bucle de Entrenamiento (PyTorch) ---
print("\nIniciando entrenamiento de la GAN con PyTorch...")
d_loss_history = []
g_loss_history = []
d_acc_history = [] # Para la precisión del discriminador

for epoch in range(EPOCHS):
    perm = torch.randperm(X_train_tensor.size(0))
    X_train_shuffled = X_train_tensor[perm]
    y_train_shuffled = y_train_tensor[perm]

    d_loss_epoch = 0
    g_loss_epoch = 0
    num_batches = X_train_shuffled.size(0) // BATCH_SIZE
    for i in range(num_batches): 
        # ---------------------
        #  Entrenar Discriminador
        # ---------------------
        discriminator.train()
        generator.eval() 

        batch_classes = y_train_tensor[perm][i*BATCH_SIZE:(i+1)*BATCH_SIZE] 
        batch_classes_onehot = torch.nn.functional.one_hot(batch_classes, num_classes).float() 

        # Datos reales
        real_imgs = X_train_shuffled[i*BATCH_SIZE:(i+1)*BATCH_SIZE]
        real_classes = y_train_shuffled[i*BATCH_SIZE:(i+1)*BATCH_SIZE]
        real_classes_onehot = torch.nn.functional.one_hot(real_classes, num_classes).float()
        real_imgs += 0.1 * torch.randn_like(real_imgs)
        real_labels = torch.full((real_imgs.size(0), 1), 0.9, device=device)

        # Datos falsos
        noise = torch.randn(real_imgs.size(0), LATENT_DIM, dtype=torch.float32).to(device)
        fake_classes = torch.randint(0, num_classes, (real_imgs.size(0),), device=device)
        fake_classes_onehot = torch.nn.functional.one_hot(fake_classes, num_classes).float()
        fake_imgs = generator(noise, fake_classes_onehot)
        fake_imgs += 0.1 * torch.randn_like(fake_imgs)
        fake_labels = torch.full((fake_imgs.size(0), 1), 0.1, device=device)

        # Entrenar con datos reales
        optimizer_D.zero_grad()
        real_output = discriminator(real_imgs, real_classes_onehot)
        d_loss_real = adversarial_loss(real_output, real_labels)

        # Entrenar con datos falsos
        fake_output = discriminator(fake_imgs.detach(), fake_classes_onehot)
        d_loss_fake = adversarial_loss(fake_output, fake_labels)
        d_loss = (d_loss_real + d_loss_fake) / 2
        d_loss_epoch += d_loss.item()
        d_loss.backward()
        optimizer_D.step()

        # ---------------------
        #  Entrenar Generador
        # ---------------------
        generator.train() 
        for _ in range(4):
          optimizer_G.zero_grad()
          #Generacion de datos con generator
          noise_g = torch.randn(BATCH_SIZE, LATENT_DIM, dtype=torch.float32).to(device) 
          gen_classes = torch.randint(0, num_classes, (BATCH_SIZE,), device=device)
          gen_classes_onehot = torch.nn.functional.one_hot(gen_classes, num_classes).float()
          gen_imgs_for_g = generator(noise_g, gen_classes_onehot)
          real_labels_for_g = torch.ones(gen_imgs_for_g.size(0), 1, dtype=torch.float32).to(device)
          #verifica dato creado en el discrimidar
          output_g = discriminator(gen_imgs_for_g, gen_classes_onehot)
          #Calcula la loss del generador
          g_loss = adversarial_loss(output_g, real_labels_for_g)
          g_loss_epoch += g_loss.item()
          g_loss.backward()
          optimizer_G.step()

    # Guardar el progreso al final de la época (promedio si se quiere)
    d_loss_history.append(d_loss_epoch / num_batches)
    g_loss_history.append(g_loss_epoch / (num_batches*4))

    if (epoch + 1) % SAVE_INTERVAL == 0:
        #torch.save(generator.state_dict(), generator_path)
        #torch.save(discriminator.state_dict(), discriminator_path)
        print(f"Modelos guardados en '{generator_path}' y '{discriminator_path}'")

    if (epoch + 1) % SAMPLE_INTERVAL == 0:
        print(f"{epoch + 1}/{EPOCHS} [D loss: {d_loss.item():.4f}] [G loss: {g_loss.item():.4f}]")

# --- Guardar modelos entrenados ---
#torch.save(generator.state_dict(), generator_path)
#torch.save(discriminator.state_dict(), discriminator_path)
#print(f"Modelos guardados en '{generator_path}' y '{discriminator_path}'")

# --- Graficar historial de pérdidas ---
plt.figure(figsize=(10, 5))
plt.plot(d_loss_history, label='Discriminator Loss')
plt.plot(g_loss_history, label='Generator Loss')
plt.title("Historial de Pérdidas de la GAN (PyTorch)")
plt.xlabel("Época")
plt.ylabel("Pérdida")
plt.legend()
plt.savefig(f'Guardar_modelo/GANs/Multi/gan_loss_history_pytorch_{EPOCHS}_{iteration}.png')
plt.show()


# --- 5. Generación y Postprocesamiento de Datos Finales (PyTorch) ---
print("\nGenerando datos sintéticos finales con PyTorch...")
num_samples_per_class = 50000  # Numero de datos a generar pr clase
num_classes = data_df['CLASS'].nunique()
generation_batch_size = 512

generator.eval()
all_class_labels = []
synthetic_df_final = pd.DataFrame()
all_generated_data_scaled = []

#Generacion de datos poor Batches
with torch.no_grad():
    for class_value in range(num_classes):
        samples_generated = 0
        while samples_generated < num_samples_per_class:
            current_batch_size = min(generation_batch_size, num_samples_per_class - samples_generated)
            noise = torch.randn(current_batch_size, LATENT_DIM, dtype=torch.float32).to(device)
            class_onehot = torch.zeros(current_batch_size, num_classes, device=device)
            class_onehot[:, class_value] = 1
            generated_batch_scaled = generator(noise, class_onehot).cpu().numpy()
            all_generated_data_scaled.append(generated_batch_scaled)
            all_class_labels.extend([class_value] * current_batch_size)
            samples_generated += current_batch_size


generated_data_scaled_final_np = np.concatenate(all_generated_data_scaled, axis=0)
all_class_labels = np.array(all_class_labels)

synthetic_df_final['CLASS'] = all_class_labels

#se reajustan los valores con el one_hot_encoding y se mapean lo datos devuelta a etiquetas Originales
current_col_idx_in_generated = 0
for col_info in column_info_for_generator_output:
    col_name = col_info['name']
    col_type = col_info['type']

    if col_type == 'one_hot':
        num_classes_other = col_info['num_classes']
        one_hot_part = generated_data_scaled_final_np[:, current_col_idx_in_generated : current_col_idx_in_generated + num_classes_other]
        decoded_classes = np.argmax(one_hot_part, axis=1)
        synthetic_df_final[col_name] = decoded_classes
        # Si 'Gender' o 'CLASS', mapear de nuevo a sus etiquetas originales si es necesario para la visualización/guardado
        if col_name == 'CLASS':
            # Invertir el mapeo {'N': 0, 'P': 1, 'Y': 2}
            reverse_class_mapping = {v: k for k, v in class_mapping.items()}
            synthetic_df_final[col_name] = synthetic_df_final[col_name].map(reverse_class_mapping)
        elif col_name == 'Gender':
            # Invertir el mapeo {'F': 0, 'M': 1}
            reverse_gender_mapping = {0: 'F', 1: 'M'}
            synthetic_df_final[col_name] = synthetic_df_final[col_name].map(reverse_gender_mapping)
        current_col_idx_in_generated += num_classes_other

    elif col_type == 'scaled_continuous_sigmoid':
        generated_values_scaled = generated_data_scaled_final_np[:, current_col_idx_in_generated : current_col_idx_in_generated + 1]
        current_col_idx_in_generated += 1

        s_info = scalers_dict[col_name]
        scaler_obj = s_info['scaler']
        inverted_values = scaler_obj.inverse_transform(generated_values_scaled)

        if s_info['log_applied']:
            inverted_values = np.expm1(inverted_values)
        if data_df[col_name].dtype == 'int64' or (data_df[col_name].dtype == 'float64' and np.all(data_df[col_name] == data_df[col_name].astype(int))):
            final_values = np.round(inverted_values)
        else:
            final_values = inverted_values

        final_values = np.clip(final_values, s_info['original_min'], s_info['original_max'])
        synthetic_df_final[col_name] = final_values.flatten().astype(data_df[col_name].dtype)

synthetic_df_final = synthetic_df_final[original_columns]


temp_df_for_balancing = synthetic_df_final.copy()
print("Value counts in temp_df_for_balancing['CLASS'] before balancing:")
print(temp_df_for_balancing['CLASS'].value_counts())

#se balancea lo generado en caso de no salir balanceado
balanced_synthetic = []
for class_value in range(num_classes):
    class_samples = temp_df_for_balancing[temp_df_for_balancing['CLASS'] == class_value]
    if len(class_samples) == 0:
        print(f"Warning: No synthetic samples found for class {class_value}. Skipping this class.")
        continue
    balanced_synthetic.append(class_samples.sample(n=num_samples_per_class, replace=True, random_state=42))
synthetic_df_final_balanced = pd.concat(balanced_synthetic).sample(frac=1, random_state=42).reset_index(drop=True)

reverse_class_mapping = {v: k for k, v in class_mapping.items()}
synthetic_df_final_balanced['CLASS'] = synthetic_df_final_balanced['CLASS'].map(reverse_class_mapping)

#Guarda los datos generados en un CSV
output_file_pytorch = f'/content/gdrive/My Drive/G11/Datasets/generated_data_gan_pytorch_{EPOCHS}_{iteration}.csv'
synthetic_df_final_balanced.to_csv(output_file_pytorch, index=False)
print(f"\nDatos sintéticos generados y guardados en: {output_file_pytorch}")

# --- Mostrar algunas estadísticas de los datos generados (igual que antes) ---
print("\n--- Descripción de los datos originales: ---")
print(data_df.describe())
print("\n--- Descripción de los datos sintéticos (PyTorch): ---")
print(synthetic_df_final_balanced.describe())

print("\n--- Conteo de valores para CLASS (Original): ---")
print(data_df['CLASS'].value_counts(normalize=True).sort_index())
print("\n--- Conteo de valores para CLASS (Sintético - PyTorch): ---")
print(synthetic_df_final_balanced['CLASS'].value_counts(normalize=True).sort_index())

fig, axes = plt.subplots(1, 2, figsize=(12, 5))
sns.histplot(data_df['AGE'], ax=axes[0], color='blue', label='Original', kde=True, stat="density")
sns.histplot(synthetic_df_final_balanced['AGE'], ax=axes[0], color='green', label='Sintético (PyTorch)', kde=True, stat="density")
axes[0].set_title('Distribución de AGE')
axes[0].legend()

sns.histplot(data_df['BMI'], ax=axes[1], color='blue', label='Original', kde=True, stat="density")
sns.histplot(synthetic_df_final_balanced['BMI'], ax=axes[1], color='green', label='Sintético (PyTorch)', kde=True, stat="density")
axes[1].set_title('Distribución de BMI')
axes[1].legend()

plt.tight_layout()
plt.savefig(f'Guardar_modelo/GANs/Multi/generated_data_distributions_comparison_pytorch_{EPOCHS}_{iteration}.png')
plt.show()

print("\nFinalizado con PyTorch.")

In [None]:
# --- Función para comparar y visualizar distribuciones ---
# esto es solo para verificacion manual para saber como estan resultando los datos generados
def compare_data_distributions(real_df, generated_df, numerical_cols, categorical_cols, output_prefix="comparison"):
    print("\n--- Iniciando comparación de distribuciones (Reales vs. Sintéticas) ---")

    # 1. Comparación de Estadísticas Descriptivas Generales
    print("\n--- Estadísticas Descriptivas - Datos Originales (después de SMOTE) ---")
    print(real_df.describe())
    print("\n--- Estadísticas Descriptivas - Datos Sintéticos ---")
    print(generated_df.describe())

    # 2. Comparación de Conteo de Valores (para Categóricas) y Varianzas (para Numéricas)
    print("\n--- Conteo de Clases / Valores (Categóricas) ---")
    for col in categorical_cols:
        print(f"\nColumna: {col}")
        print("Real:")
        print(real_df[col].value_counts(normalize=True).sort_index())
        print("Sintético:")
        print(generated_df[col].value_counts(normalize=True).sort_index())

    print("\n--- Varianzas de Columnas Numéricas ---")
    real_variances = real_df[numerical_cols].var()
    gen_variances = generated_df[numerical_cols].var()
    comparison_variances = pd.DataFrame({'Real_Variance': real_variances, 'Synthetic_Variance': gen_variances})
    print(comparison_variances)

    # 3. Visualización de Histogramas/KDE (para Numéricas)
    print("\n--- Visualizando distribuciones numéricas ---")
    num_plots_per_row = 3
    num_rows_numerical = (len(numerical_cols) + num_plots_per_row - 1) // num_plots_per_row
    fig_num, axes_num = plt.subplots(num_rows_numerical, num_plots_per_row, figsize=(5 * num_plots_per_row, 4 * num_rows_numerical))
    axes_num = axes_num.flatten() 

    for i, col in enumerate(numerical_cols):
        sns.histplot(real_df[col], ax=axes_num[i], color='blue', label='Real', kde=True, stat="density", alpha=0.6)
        sns.histplot(generated_df[col], ax=axes_num[i], color='green', label='Sintético', kde=True, stat="density", alpha=0.6)
        axes_num[i].set_title(f'Distribución de {col}')
        axes_num[i].legend()

    # Ocultar ejes vacíos si hay menos subplots que el espacio total
    for j in range(i + 1, len(axes_num)):
        fig_num.delaxes(axes_num[j])

    plt.tight_layout()
    plt.savefig(f"{output_prefix}_numerical_distributions.png")
    plt.show()

    # 4. Visualización de Gráficos de Barras (para Categóricas)
    print("\n--- Visualizando distribuciones categóricas ---")
    num_plots_per_row = 2
    num_rows_categorical = (len(categorical_cols) + num_plots_per_row - 1) // num_plots_per_row
    fig_cat, axes_cat = plt.subplots(num_rows_categorical, num_plots_per_row, figsize=(6 * num_plots_per_row, 5 * num_rows_categorical))
    axes_cat = axes_cat.flatten()

    for i, col in enumerate(categorical_cols):
        real_counts = real_df[col].value_counts(normalize=True).sort_index()
        gen_counts = generated_df[col].value_counts(normalize=True).sort_index()
        all_categories = real_counts.index.union(gen_counts.index)

        df_plot = pd.DataFrame({
            'Category': all_categories,
            'Real': real_counts.reindex(all_categories, fill_value=0),
            'Sintético': gen_counts.reindex(all_categories, fill_value=0)
        }).melt(id_vars='Category', var_name='Dataset', value_name='Proportion')

        sns.barplot(x='Category', y='Proportion', hue='Dataset', data=df_plot, ax=axes_cat[i], palette={'Real': 'blue', 'Sintético': 'green'})
        axes_cat[i].set_title(f'Distribución de {col}')
        axes_cat[i].set_ylabel('Proporción')

    for j in range(i + 1, len(axes_cat)):
        fig_cat.delaxes(axes_cat[j])

    plt.tight_layout()
    plt.savefig(f"{output_prefix}_categorical_distributions.png")
    plt.show()

    print("\n--- Fin de la comparación de distribuciones ---")

# Define tus columnas numéricas y categóricas
numerical_features_for_comparison = ['AGE', 'Urea', 'Cr', 'HbA1c', 'Chol', 'TG', 'HDL', 'LDL', 'VLDL', 'BMI']
categorical_features_for_comparison = ['Gender', 'CLASS']

# Llama a la función de comparación
compare_data_distributions(
    real_df=data_df, 
    generated_df=synthetic_df_final_balanced, 
    numerical_cols=numerical_features_for_comparison,
    categorical_cols=categorical_features_for_comparison,
    output_prefix="gan_data_comparison"
)

# MLP Multiclase mixto entre dataset real y generado

In [None]:
# Manejo de datos
import numpy as np
import pandas as pd
import random
import os

# Modelos de Deep Learning
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

#Preprocesamiento de datos
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Gráficos
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('fivethirtyeight')
plt.rcParams['lines.linewidth'] = 1.5
plt.rcParams['font.size'] = 10

# Configuración warnings (Quitar en caso de errores desconocidos)
import warnings
warnings.filterwarnings('ignore')

from imblearn.over_sampling import SMOTE

# Versión de paquetes usados
color = '\033[1m\033[38;5;208m'
print(f"{color}Versión de las librerias utilizadas:")
print(f"{color}- Version torch: {torch.__version__}")
print(f"{color}- Version pandas: {pd.__version__}")
print(f"{color}- Version numpy: {np.__version__}")
print(f"{color}- Version sklearn: {pd.__version__}")

In [None]:
#Numero de datos por clase que tendra el dataset mezclado
num_sample_per_class = 1000

In [None]:
#Cargar Dataset Original
try:
    data = pd.read_csv(f'/content/gdrive/My Drive/G11/Datasets/datasetMulti_original.csv') 
    print("Archivo cargado correctamente.")
except Exception as e:
    print(f"Error: {e}")

In [None]:
#Cargar Dataset generado por el modelo GANs
try:
    data_gen = pd.read_csv(f'/content/gdrive/My Drive/G11/Datasets/datasetMulti_Gen.csv') 
    print("Archivo cargado correctamente.")
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Eliminar columnas de identificación
columns_to_drop = ['ID', 'No_Pation']
existing_columns = [col for col in columns_to_drop if col in data.columns]
if existing_columns:
    data = data.drop(columns=['ID', 'No_Pation'])

# Estandatiza valores en las columnas CLASS y Gender ya que hay alguno 
data['CLASS'] = data['CLASS'].astype(str).str.strip()
data['Gender'] = data['Gender'].astype(str).str.strip().str.upper()

# Mapear la columna objetivo 'CLASS' a valores numéricos
class_mapping = {'N': 0, 'P': 1, 'Y': 2}
data['CLASS'] = data['CLASS'].map(class_mapping)

# Convertir la variable categórica 'Gender' a numérica
data = pd.get_dummies(data, columns=['Gender'], drop_first=True)

# Eliminar filas con valores nulos si los hubiera
data.dropna(inplace=True)

# Renombrar columnas para que no tengan caracteres especiales
data.columns = data.columns.str.replace('[^A-Za-z0-9_]+', '', regex=True)

data_sample = data

In [None]:

# Mapear la columna objetivo 'CLASS' a valores numéricos
class_mapping = {'N': 0, 'P': 1, 'Y': 2}
data_gen['CLASS'] = data_gen['CLASS'].map(class_mapping)

# Convertir la variable categórica 'Gender' a numérica
data_gen = pd.get_dummies(data_gen, columns=['Gender'], drop_first=True)

# Eliminar filas con valores nulos si los hubiera
data_gen.dropna(inplace=True)

# Renombrar columnas para que no tengan caracteres especiales
data_gen.columns = data_gen.columns.str.replace('[^A-Za-z0-9_]+', '', regex=True)

data_gen_sample = data_gen

In [None]:
seed = 110425
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

In [None]:
#Resample de dataset manteniendo el original y agregando datos del dataset generado a las clases que tengan menos datos que los establecidos en num_sample_per_class
data_count = data['CLASS'].value_counts()
if ((0 in data_count) and (data_count[0]<num_sample_per_class)):
    class_0 = data_gen[data_gen['CLASS'] == 0].sample(n=num_sample_per_class-data_count[0], random_state=42)
if ((1 in data_count) and (data_count[1]<num_sample_per_class)):
    class_1 = data_gen[data_gen['CLASS'] == 1].sample(n=num_sample_per_class-data_count[1], random_state=42)
if ((2 in data_count) and (data_count[2]<num_sample_per_class)):
    class_2 = data_gen[data_gen['CLASS'] == 2].sample(n=num_sample_per_class-data_count[2], random_state=42) 
balanced_train_data_gen = pd.concat([class_0, class_1,class_2]).sample(frac=1, random_state=42).reset_index(drop=True)

data_gen_sample = balanced_train_data_gen

In [None]:

# 1. Separar características (x) y objetivo (y) de la muestra
x = data_sample.drop(columns=["CLASS"]).to_numpy()
y = data_sample["CLASS"].to_numpy()

# 2. Dividir en conjuntos de entrenamiento, validación 
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.20, random_state=42, stratify=y) # 20% para test
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.15, random_state=42, stratify=y_train) # 15% del resto para validación

x_gen = data_gen_sample.drop(columns=["CLASS"]).to_numpy()
y_gen = data_gen_sample["CLASS"].to_numpy()

# 3. Dividir en conjuntos de entrenamiento, validación y prueba
X_gen_train, X_gen_test, y_gen_train, y_gen_test = train_test_split(x_gen, y_gen, test_size=0.20, random_state=42, stratify=y_gen) # 20% para test
X_gen_train, X_gen_val, y_gen_train, y_gen_val = train_test_split(X_gen_train, y_gen_train, test_size=0.15, random_state=42, stratify=y_gen_train) # 15% del resto para validación

# 4. Regresar los datos a formato panda
X_train_orig_pd = pd.DataFrame(X_train)
y_train_orig_pd = pd.Series(y_train)
X_test_orig_pd = pd.DataFrame(X_test)
y_test_orig_pd = pd.Series(y_test)
X_val_orig_pd = pd.DataFrame(X_val)
y_val_orig_pd = pd.Series(y_val)

X_gen_train_pd = pd.DataFrame(X_gen_train)
y_gen_train_pd = pd.Series(y_gen_train)
X_gen_test_pd = pd.DataFrame(X_gen_test)
y_gen_test_pd = pd.Series(y_gen_test)
X_gen_val_pd = pd.DataFrame(X_gen_val)
y_gen_val_pd = pd.Series(y_gen_val)


# 5. Concatena los entrenamiento, testeos y validaciones del dataset original con los del generado y los regresa a numpy
X_train = (pd.concat([X_gen_train_pd, X_train_orig_pd]).sample(frac=1, random_state=42).reset_index(drop=True)).to_numpy()
y_train = (pd.concat([y_gen_train_pd, y_train_orig_pd]).sample(frac=1, random_state=42).reset_index(drop=True)).to_numpy()

X_test = (pd.concat([X_gen_test_pd, X_test_orig_pd]).sample(frac=1, random_state=42).reset_index(drop=True)).to_numpy()
y_test = (pd.concat([y_gen_test_pd, y_test_orig_pd]).sample(frac=1, random_state=42).reset_index(drop=True)).to_numpy()

X_val = (pd.concat([X_gen_val_pd, X_val_orig_pd]).sample(frac=1, random_state=42).reset_index(drop=True)).to_numpy()
y_val = (pd.concat([y_gen_val_pd, y_val_orig_pd]).sample(frac=1, random_state=42).reset_index(drop=True)).to_numpy()

# 6. Aplicar el escalado
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val) # Usar el mismo scaler del train
X_test = scaler.transform(X_test) # Usar el mismo scaler del train


In [None]:
#Aplicar Tensores
#Train
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)

#Val
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.long)

#Test
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

In [None]:
#Definir los Batchs del dataset
batchsize = 32
class DiabetesDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_data = DiabetesDataset(X_train, y_train)
val_data = DiabetesDataset(X_val, y_val)
test_data = DiabetesDataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size=batchsize, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batchsize, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batchsize, shuffle=False)

In [None]:
#Definir el Modelo a usar

class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.hc1 = nn.Linear(x.shape[1], 256) # Ajuste automático al número de columnas
        self.hc2 = nn.Linear(256, 128)
        self.hc3 = nn.Linear(128, 64)
        self.hc4 = nn.Linear(64, 3)
        self.act = nn.LeakyReLU()
        self.dp1 = nn.Dropout(0.4)
        self.dp2 = nn.Dropout(0.2)

    def forward(self, input):
        l1 = self.dp1(self.act(self.hc1(input)))
        l2 = self.dp2(self.act(self.hc2(l1)))
        l3 = self.act(self.hc3(l2))
        output = self.hc4(l3)
        return output

In [None]:
# Inicializa el Modelo, define el Learning Rate, el optimizador y el loss
testeo = MLP()
lr = 1e-4
opt = torch.optim.Adam(testeo.parameters(), lr=lr, weight_decay=1e-4)
loss_func = nn.CrossEntropyLoss()


In [None]:
testeo_path = f'/content/gdrive/My Drive/G11/Guardar_modelo/MLP/Multi/Multi_{num_sample_per_class}_pytorch.pth'
testeo_path_FULL = f'/content/gdrive/My Drive/G11/Guardar_modelo/MLP/Multi/Multi_{num_sample_per_class}_pytorch_FULL.pth'

In [None]:
#Cargar Pesos
#testeo.load_state_dict(torch.load(testeo_path))
#Cargar Modelo
#testeo = torch.load(testeo_path_FULL)

In [None]:
#Entrenamiento

n_epochs = 2000
history = {
    "TL" : [],
    "VL" : []
}
early_stopping = {
    "delta" : 1e-5,
    "patience": 10
}
best_val_loss = float("inf")
aux = 0

for i in range(n_epochs+1):
    testeo.train()
    epoch_loss = 0
    for X_batch, y_batch in train_loader:  
      output = testeo(X_batch)
      loss = loss_func(output, y_batch)  
      
      opt.zero_grad()
      loss.backward()
      opt.step()

      epoch_loss += loss.item()
    epoch_loss /= len(train_loader)
    history["TL"].append(epoch_loss)

    testeo.eval()
    epoch_loss = 0
    for X_batch, y_batch in val_loader:
      with torch.no_grad():
        preds_val = testeo(X_batch)
      val_loss = loss_func(preds_val, y_batch)

      epoch_loss += val_loss.item()
    epoch_loss /= len(val_loader)
    history["VL"].append(epoch_loss)
    
    if epoch_loss + early_stopping["delta"] < best_val_loss:
      best_val_loss = epoch_loss
      aux = 0
    else:
      aux += 1
      if aux >= early_stopping["patience"]:
        print(f"Terminando el entrenamiento en la época {i}")
        break
    

In [None]:
#Guardar Pesos
torch.save(testeo.state_dict(), testeo_path)
#Guardar Modelo Completo
torch.save(testeo, testeo_path_FULL)

In [None]:
#Grafico de loss en tre training y validacion
plt.plot(history["TL"], label="Train")
plt.plot(history["VL"], label="Val")
plt.xlabel('Epoch')
plt.ylabel('Loss Train')
plt.legend()
plt.show()

In [None]:
#Calculo de accuracy del cnjunto de entrenamiento y de testeo
#Train
with torch.no_grad():
    output_train = testeo(X_train)
y_hat_train = output_train.numpy()
y_hat_train = [np.argmax(i) for i in y_hat_train]
print(f'Training Accuracy: {accuracy_score(y_train, y_hat_train):.2f}')
#Testeo
with torch.no_grad():
    output = testeo(X_test)  
y_hat = output.numpy()
y_hat = [np.argmax(i) for i in y_hat]
print(f'Testing Accuracy: {accuracy_score(y_test, y_hat):.2f}')

#precision, recall f1-score support usando resultados de conjunto de testeo
print(classification_report(y_test, y_hat, target_names=['N', 'P', 'Y']))

cm =confusion_matrix(y_test, y_hat)
cm_plot = ConfusionMatrixDisplay(confusion_matrix=cm)
cm_plot.plot()
plt.show()