Create a function that return a CNN model

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim

MAP_ACTIVATE_FUNCTIONS = {
    'relu': nn.ReLU,
    'sigmoid': nn.Sigmoid,
    'tanh': nn.Tanh,
    'leakyrelu': nn.LeakyReLU,
    'selu': nn.SELU
}

MAP_OPTIMIZERS = {
    'adam': optim.Adam,
    'adamw': optim.AdamW,
    'sgd': optim.SGD,
    'rmsprop': optim.RMSprop
}


In [11]:
# Verifica si CUDA está disponible
cuda_available = torch.cuda.is_available()
print(f"¿CUDA está disponible? {cuda_available}")

# Usamos cuda si se puede, sino la cpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando el dispositivo: {device}")

¿CUDA está disponible? True
Usando el dispositivo: cuda


In [12]:
def build_cnn_from_individual(individual, num_channels, px_h, px_w, num_classes):
    """ 
    Function to build a CNN model based on a dictionary of parameters.
    """
    layers = []
    num_layers = individual['num_conv_layers']
    fully_connected = individual['fully_connected']
    dropout = individual['dropout']
    activations = individual['activation']
    
    out_channels_previous_layer = num_channels
    activation_functions = [MAP_ACTIVATE_FUNCTIONS[act] for act in activations]

    for i in range(num_layers):
        out_channels = individual['filters'][i]
        kernel_size = individual['kernel_sizes'][i]
        
        conv_layer = nn.Conv2d(out_channels_previous_layer, out_channels, kernel_size=kernel_size, padding=1)
        layers.append(conv_layer)
        
        if out_channels_previous_layer > 1 or i > 0:
            layers.append(nn.BatchNorm2d(out_channels))
        
        # Use activation function from the list
        layers.append(activation_functions[i % len(activation_functions)]())
        
        if i < num_layers - 1:
            layers.append(nn.MaxPool2d(kernel_size=2, stride=2))  # Use a stride of 2 for down-sampling
        else:
            layers.append(nn.MaxPool2d(kernel_size=2, stride=1))  # Final layer might have stride of 1 to preserve size

        out_channels_previous_layer = out_channels

    # Temporarily create the model to calculate the output size from convolution layers
    temp_model = nn.Sequential(*layers)

    # Create a dummy tensor to calculate the output size
    dummy_input = torch.zeros(1, num_channels, px_h, px_w)
    output_size = temp_model(dummy_input).view(-1).shape[0]

    layers.append(nn.Flatten())
    
    # Adding fully connected layers
    for i in range(fully_connected):
        layers.append(nn.Linear(in_features=output_size, out_features=output_size))
        if dropout > 0:
            layers.append(nn.Dropout(dropout))
            dropout -= 1  # Decrease the dropout for each layer

    layers.append(nn.Linear(output_size, num_classes))
    return nn.Sequential(*layers)

In [13]:
individual = {
    'num_conv_layers': 2,
    'fully_connected': 2,
    'filters': [3, 3] ,
    'kernel_sizes': [3, 3],
    'dropout': 0.2,
    'activation': ['relu', 'sigmoid']
}
num_channels = 3
px_h = 32
px_w = 32
num_classes = 10

In [14]:
model = build_cnn_from_individual(individual, num_channels, px_h, px_w, num_classes).to(device)
print(model)

Sequential(
  (0): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU()
  (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (4): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (5): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (6): Sigmoid()
  (7): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False)
  (8): Flatten(start_dim=1, end_dim=-1)
  (9): Linear(in_features=675, out_features=675, bias=True)
  (10): Dropout(p=0.2, inplace=False)
  (11): Linear(in_features=675, out_features=675, bias=True)
  (12): Linear(in_features=675, out_features=10, bias=True)
)


entrenamos y testeamos la red con MINST dataset

In [15]:
# train and test the model using MINST dataset
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.optim as optim
from tqdm import tqdm
import numpy as np

# Load the MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=4, shuffle=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=4, shuffle=False)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train test the function
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10, patience=10):
    best_val_loss = np.inf  # Comienza con un valor muy alto
    patience_counter = 0
    
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        # Barra de progreso para entrenamiento
        train_progress = tqdm(train_loader, desc="Training", leave=False)

        for images, labels in train_progress:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            train_progress.set_postfix(loss=loss.item())

        train_acc = 100 * correct / total
        print(f"Train Loss: {running_loss/len(train_loader):.4f}, Train Accuracy: {train_acc:.2f}%")

        # Evaluar en el conjunto de validación
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        val_progress = tqdm(val_loader, desc="Validation", leave=False)
        with torch.no_grad():
            for images, labels in val_progress:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_acc = 100 * correct / total
        print(f"Validation Loss: {val_loss/len(val_loader):.4f}, Validation Accuracy: {val_acc:.2f}%")

        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Guardar el mejor modelo
            torch.save(model.state_dict(), 'best_model.pth')
            print("New best model saved!")
        else:
            patience_counter += 1
            print(f"No improvement in validation loss. Patience counter: {patience_counter}/{patience}")

        if patience_counter >= patience:
            print("Early stopping triggered. Training stopped.")
            break


Files already downloaded and verified
Files already downloaded and verified


In [16]:
train_model(model, trainloader, testloader, criterion, optimizer, epochs=10, patience=5)

Epoch 1/10


                                                                            

Train Loss: 1.9534, Train Accuracy: 29.81%


                                                                

Validation Loss: 1.7990, Validation Accuracy: 35.02%
New best model saved!
Epoch 2/10


                                                                             

Train Loss: 1.7472, Train Accuracy: 37.27%


                                                               

KeyboardInterrupt: 