In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

## Model

In [2]:
class Config:
    def __init__(self):
        # --- ARQUITECTURA ---
        self.input_dim = 100
        self.output_dim = 10
        self.hidden_layers = [128, 64]  # número y tamaño de capas
        self.activation = 'relu'        # relu, tanh, sigmoid, leakyrelu
        self.weight_init = 'xavier'     # he, xavier, normal, uniform
        self.use_batch_norm = True
        self.dropout_rate = 0.3
        self.skip_connections = False

        # --- OPTIMIZACIÓN ---
        self.optimizer = 'adam'         # sgd, adam, rmsprop, adamw
        self.learning_rate = 1e-3
        self.momentum = 0.9
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.epsilon = 1e-8
        self.batch_size = 32
        self.epochs = 50
        self.gradient_clipping = 1.0
        self.scheduler = 'steplr'       # steplr, cosine, none

        # --- REGULARIZACIÓN ---
        self.L1_lambda = 0.0
        self.L2_lambda = 1e-4
        self.early_stopping_patience = 5

        # --- FUNCIÓN DE PÉRDIDA ---
        self.loss_function = 'crossentropy'  # mse, crossentropy

In [3]:
class MLP(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        layers = []
        in_dim = config.input_dim
        self.config = config
        self.skip_connections = config.skip_connections

        for i, hidden_dim in enumerate(config.hidden_layers):
            layers.append(nn.Linear(in_dim, hidden_dim))

            if config.use_batch_norm:
                layers.append(nn.BatchNorm1d(hidden_dim))

            act = self._get_activation(config.activation)
            layers.append(act)

            if config.dropout_rate > 0:
                layers.append(nn.Dropout(config.dropout_rate))

            in_dim = hidden_dim

        self.hidden = nn.Sequential(*layers)
        self.output_layer = nn.Linear(in_dim, config.output_dim)

        self._initialize_weights(config.weight_init)

    def forward(self, x):
        if not self.skip_connections:
            x = self.hidden(x)
            return self.output_layer(x)
        else:
            # Skip connections simples (residual sum)
            out = x
            for layer in self.hidden:
                prev = out
                out = layer(out)
                if isinstance(layer, nn.Linear) and prev.shape == out.shape:
                    out = out + prev
            return self.output_layer(out)

    def _get_activation(self, name):
        name = name.lower()
        if name == 'relu':
            return nn.ReLU()
        elif name == 'tanh':
            return nn.Tanh()
        elif name == 'sigmoid':
            return nn.Sigmoid()
        elif name == 'leakyrelu':
            return nn.LeakyReLU(0.01)
        else:
            raise ValueError(f"Función de activación no soportada: {name}")

    def _initialize_weights(self, init_type):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                if init_type == 'xavier':
                    nn.init.xavier_uniform_(m.weight)
                elif init_type == 'he':
                    nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
                elif init_type == 'normal':
                    nn.init.normal_(m.weight, 0.0, 0.02)
                elif init_type == 'uniform':
                    nn.init.uniform_(m.weight, -0.1, 0.1)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0.0)

In [4]:
def train_model(model, config, train_data, val_data=None, device='cpu'):
    model.to(device)

    # Definir función de pérdida
    if config.loss_function == 'mse':
        criterion = nn.MSELoss()
    else:
        criterion = nn.CrossEntropyLoss()

    # Optimizer
    if config.optimizer == 'adam':
        optimizer = optim.Adam(model.parameters(), lr=config.learning_rate,
                               betas=(config.beta1, config.beta2), eps=config.epsilon,
                               weight_decay=config.L2_lambda)
    elif config.optimizer == 'adamw':
        optimizer = optim.AdamW(model.parameters(), lr=config.learning_rate)
    elif config.optimizer == 'sgd':
        optimizer = optim.SGD(model.parameters(), lr=config.learning_rate,
                              momentum=config.momentum, weight_decay=config.L2_lambda)
    elif config.optimizer == 'rmsprop':
        optimizer = optim.RMSprop(model.parameters(), lr=config.learning_rate)
    else:
        raise ValueError(f"Optimizador no soportado: {config.optimizer}")

    # Scheduler
    if config.scheduler == 'steplr':
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
    elif config.scheduler == 'cosine':
        scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
    else:
        scheduler = None

    # DataLoader
    train_loader = DataLoader(train_data, batch_size=config.batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=config.batch_size) if val_data else None

    best_val_loss = np.inf
    patience_counter = 0

    for epoch in range(config.epochs):
        model.train()
        train_loss = 0.0

        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            outputs = model(xb)
            loss = criterion(outputs, yb)

            # Regularización L1
            if config.L1_lambda > 0:
                l1_norm = sum(p.abs().sum() for p in model.parameters())
                loss = loss + config.L1_lambda * l1_norm

            loss.backward()

            # Gradient clipping
            if config.gradient_clipping:
                nn.utils.clip_grad_norm_(model.parameters(), config.gradient_clipping)

            optimizer.step()
            train_loss += loss.item()

        # Validation
        if val_loader:
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for xb, yb in val_loader:
                    xb, yb = xb.to(device), yb.to(device)
                    outputs = model(xb)
                    loss = criterion(outputs, yb)
                    val_loss += loss.item()

            val_loss /= len(val_loader)
            print(f"Epoch [{epoch+1}/{config.epochs}] - "
                  f"Train Loss: {train_loss/len(train_loader):.4f} - Val Loss: {val_loss:.4f}")

            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                best_model_state = model.state_dict().copy()
            else:
                patience_counter += 1
                if patience_counter >= config.early_stopping_patience:
                    print("⏹️ Early stopping activado.")
                    model.load_state_dict(best_model_state)
                    break
        else:
            print(f"Epoch [{epoch+1}/{config.epochs}] - Train Loss: {train_loss/len(train_loader):.4f}")

        if scheduler:
            scheduler.step()

    return model