### Import the required packages

In [1]:
import torch
import random
import torchvision
import numpy as np
import pandas as pd
import torch.nn as nn
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import DataLoader, random_split

### Load the data and create the Train, Validation and Test datasets

In [3]:
# Used for finding the mean and std of the dataset
#mean = X_train.train_data.float().mean() / 255 # = 0.1307
#std = X_train.train_data.float().std() / 255 # = 0.3081

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=True,
    transform=transform,
    download=True
)

test_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=False,
    transform=transform,
    download=True
)

train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

### Check the data

In [None]:
print(f'train_dataset data shape: {train_dataset.dataset.data[train_dataset.indices].shape}')
print(f'train_dataset targets shape: {train_dataset.dataset.targets[train_dataset.indices].shape}')

print(f'val_dataset data shape: {val_dataset.dataset.data[val_dataset.indices].shape}')
print(f'val_dataset targets shape: {val_dataset.dataset.targets[val_dataset.indices].shape}')

print(f'test_dataset data shape: {test_dataset.data.shape}')
print(f'test_datset targets shape: {test_dataset.targets.shape}')

print(f'Classes: {train_dataset.dataset.classes}')

for i in range(6):
    plt.subplot(2, 3, i+1)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(train_dataset.dataset.data[train_dataset.indices][i], cmap='gray')
    plt.xlabel(f'Label: {train_dataset.dataset.targets[train_dataset.indices][i]}')

### Check the data distribution

In [None]:
y_train = train_dataset.dataset.targets[train_dataset.indices].numpy()
y_val = val_dataset.dataset.targets[val_dataset.indices].numpy()
y_test = test_dataset.targets.numpy()
    
datasets = {
    'Train': y_train,
    'Val': y_val,
    'Test': y_test
}
    
data = []
for set_name, y_data in datasets.items():
    unique, counts = np.unique(y_data, return_counts=True)
    for digit, count in zip(unique, counts):
        data.append({
            'Dataset': set_name,
            'Digit': digit,
            'Count': count
        })
    
df = pd.DataFrame(data)
    
plt.figure(figsize=(15, 8))
df_pivot = df.pivot(index='Digit', columns='Dataset', values='Count')
df_pivot.plot(kind='bar', width=0.8)
    
plt.title('Digit distribution in MNIST datasets')
plt.xlabel('Digit')
plt.ylabel('Count')
plt.legend(title='Dataset', loc='upper right')
plt.grid(True, alpha=0.3)
    
plt.show()

### Model class and helper functions

In [3]:
class Net(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(Net, self).__init__()

        self.flatten = nn.Flatten()
        self.l1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.l2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = self.flatten(x)
        out = self.l1(x)
        out = self.relu(out)
        out = self.l2(out)
        return out
    
def create_dataloaders(dataset, batch_size, num_workers, shuffle):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
        pin_memory=True
    )
    
def weights_init(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        torch.nn.init.zeros_(m.bias) 

def create_model(input_size, n_neurons, num_classes, learning_rate, eta_minus, eta_plus, min_step, max_step, device):
    model = Net(input_size, n_neurons, num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    
    optimizer = torch.optim.Rprop(
        model.parameters(),
        lr=learning_rate,
        etas=(eta_minus, eta_plus),
        step_sizes=(min_step, max_step)
    )

    model.apply(weights_init)

    return model, criterion, optimizer

def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs, patience, min_delta, verbose):
    train_losses = []
    val_losses = []
    val_accuracies = []
    patience_counter = 0
    best_val_loss = float('inf')
    best_model_state = None
    n_total_steps = len(train_loader)

    for epoch in range(num_epochs):
        
        total_train_loss = 0.0
        n_train_samples = 0

        # Training phase
        model.train()
        for i, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item() * labels.size(0)
            n_train_samples += labels.size(0)

            if verbose and (i+1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{n_total_steps}]')

        avg_train_loss = total_train_loss / n_train_samples
        train_losses.append(avg_train_loss)

        # Validation phase
        val_loss, val_accuracy = validate_model(model, val_loader, criterion, device)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        # Early stopping check
        if best_val_loss - val_loss > min_delta:
            best_val_loss = val_loss
            patience_counter = 0
            best_model_state = model.state_dict().copy()
        else:
            patience_counter += 1

        print(f'Epoch [{epoch+1}/{num_epochs}], Train loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')

        if patience_counter >= patience:
            print(f'Early stopping triggered at epoch {epoch+1}')
            break

    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return train_losses, val_losses, val_accuracies
    
def validate_model(model, val_loader, criterion, device):
    total_loss = 0.0
    n_samples = 0
    n_correct = 0
    
    model.eval()
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            total_loss += loss.item() * labels.size(0)
            n_samples += labels.size(0)

            _, predicted = torch.max(outputs, 1)
            n_correct += (predicted == labels).sum().item()

        avg_loss = total_loss / n_samples
        accuracy = 100.0 * n_correct / n_samples
    
    return avg_loss, accuracy

def plot_predictions(model, test_loader, device, num_images=1):
    model.eval()
    
    dataiter = iter(test_loader)
    images, labels = next(dataiter)
    
    indices = np.random.randint(0, len(images), num_images)
    
    with torch.no_grad():
        images = images.to(device)
        outputs = model(images.reshape(-1, 28*28))
        probabilities = nn.functional.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs.data, 1)
        
        for idx in indices:
            plt.figure(figsize=(12,4))
            
            plt.subplot(1, 2, 1)
            plt.imshow(images[idx].cpu().reshape(28, 28), cmap='gray')
            plt.title(f'True: {labels[idx].item()}\nPredicted: {predicted[idx].item()}')
            plt.axis('off')
            
            plt.subplot(1, 2, 2)
            probs = probabilities[idx].cpu().numpy()
            plt.bar(range(10), probs)
            plt.xticks(range(10))
            plt.ylim([0, 1])
            plt.title('Prediction probabilities')
            
            plt.tight_layout()
            plt.show()

def plot_results(train_losses, val_losses, val_accuracies):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot losses
    epochs = range(1, len(train_losses) + 1)
    ax1.plot(epochs, train_losses, '--', label='Train', alpha=0.7)
    ax1.plot(epochs, val_losses, '-', label='Validation')
    
    ax1.set_title('Training and Validation Loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.grid(True, alpha=0.3)
    ax1.legend()
    
    # Plot validation accuracy
    ax2.plot(epochs, val_accuracies, label='Validation')
    
    ax2.set_title('Validation Accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy (%)')
    ax2.grid(True, alpha=0.3)
    ax2.legend()
    
    plt.tight_layout()
    plt.show()

def plot_fold_results(fold_train_losses, fold_val_losses, fold_val_accuracies):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot losses
    for fold_idx, (train_losses, val_losses) in enumerate(zip(fold_train_losses, fold_val_losses)):
        epochs = range(1, len(train_losses) + 1)
        ax1.plot(epochs, train_losses, '--', label=f'Fold {fold_idx+1} (Train)', alpha=0.7)
        ax1.plot(epochs, val_losses, '-', label=f'Fold {fold_idx+1} (Val)')
    
    ax1.set_title('Training and Validation Loss per Fold')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.grid(True, alpha=0.3)
    ax1.legend()
    
    # Plot validation accuracies
    for fold_idx, acc in enumerate(fold_val_accuracies):
        epochs = range(1, len(acc) + 1)
        ax2.plot(epochs, acc, label=f'Fold {fold_idx+1}')
    
    ax2.set_title('Validation Accuracy per Fold')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy (%)')
    ax2.grid(True, alpha=0.3)
    ax2.legend()
    
    plt.tight_layout()
    plt.show()

def get_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    elif torch.backends.mps.is_available():
        return torch.device('mps')
    else:
        return torch.device('cpu')

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

### Model definition, training and testing (no kfold for fast evaluation)

In [None]:
seed = 42

set_seed(seed)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyperparameters for the neural network
input_size = 784
n_neurons = 128
num_classes = 10

# Hyperparameters for the training
num_epochs = 300
batch_size = len(train_dataset)
learning_rate = 0.01

# Hyperparameters for the Rprop optimizer
eta_minus = 0.5
eta_plus = 1.2
min_step = 1e-6
max_step = 50

# Hyperparameters for early stopping
patience = 3
min_delta = 0.040

train_loader = create_dataloaders(train_dataset, batch_size, num_workers=8, shuffle=True)
val_loader = create_dataloaders(val_dataset, batch_size, num_workers=8, shuffle=False)
test_loader = create_dataloaders(test_dataset, batch_size, num_workers=8, shuffle=False)

model, criterion, optimizer = create_model(
    input_size, 
    n_neurons, 
    num_classes, 
    learning_rate, 
    eta_minus, 
    eta_plus, 
    min_step, 
    max_step, 
    device
)

train_losses, val_losses, val_accuracies = train_model(
    model, 
    train_loader, 
    val_loader, 
    criterion, 
    optimizer, 
    device, 
    num_epochs, 
    patience,
    min_delta, 
    verbose=True
)

plot_results(train_losses, val_losses, val_accuracies)

### Model testing

In [None]:
_, acc = validate_model(model, test_loader, criterion, device)
print(f'Test accuracy: {acc:.2f}%')

plot_predictions(model, test_loader, device)

### KFold cross validation with random search

In [None]:
seed = 42

set_seed(seed)

device = get_device()

# Hyperparameters for the random search
n_trials = 10

# Hyperparameters for the k-fold cross-validation
k_folds = 5

# Hyperparameters for the neural network
input_size = 28 * 28
num_classes = 10

# Hyperparameters for the training
num_epochs = 300
train_batch_size = None # degined after splitting the dataset (batch learning)
val_batch_size = None # defined after splitting the dataset (batch learning)
learning_rate = 0.01

# Hyperparameters for the Early Stopping
patience = 3
min_delta = 0.040

trial_results = []

param_space = {
    'n_neurons': [32, 64, 128, 256, 512],
    'eta_minus': [0.7, 0.6, 0.5],
    'eta_plus': [1.4, 1.3, 1.2],
    'min_step': [1e-7, 1e-6, 1e-5],
    'max_step': [70, 60, 50]
}

for trial in range(n_trials):
    params = {
        'n_neurons': random.choice(param_space['n_neurons']),
        'eta_minus': float(np.random.choice(param_space['eta_minus'])),
        'eta_plus': float(np.random.choice(param_space['eta_plus'])),
        'min_step': float(np.random.choice(param_space['min_step'])),
        'max_step': float(np.random.choice(param_space['max_step']))
    }

    print(f"\nTrial {trial+1}/{n_trials}")
    print(f"Testing parameters: {params}")
    
    fold_train_losses = []
    fold_val_losses = []
    fold_val_accuracies = []
    
    kfold = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=seed)
        
    targets = train_dataset.dataset.targets[train_dataset.indices]
    
    for fold, (train_ids, val_ids) in enumerate(kfold.split(train_dataset, targets)):
        print(f'FOLD {fold+1}/{k_folds}')
        
        train_subset = torch.utils.data.Subset(train_dataset, train_ids)
        val_subset = torch.utils.data.Subset(train_dataset, val_ids)
        
        train_batch_size = len(train_subset)
        val_batch_size = len(val_subset)

        train_loader = create_dataloaders(train_subset, train_batch_size, num_workers=8, shuffle=True)
        val_loader = create_dataloaders(val_subset, val_batch_size, num_workers=8, shuffle=False)

        model, criterion, optimizer = create_model(
            input_size, params['n_neurons'], num_classes, 
            learning_rate, params['eta_minus'], params['eta_plus'], 
            params['min_step'], params['max_step'], device
        )
        
        train_losses, val_losses, val_accuracies = train_model(
            model, train_loader, val_loader, 
            criterion, optimizer, device, 
            num_epochs, patience, min_delta,
            verbose=False
        )

        fold_train_losses.append(train_losses)
        fold_val_losses.append(val_losses)
        fold_val_accuracies.append(val_accuracies)
    
    plot_fold_results(fold_train_losses, fold_val_losses, fold_val_accuracies)

    best_fold_accuracies = [max(acc_list) for acc_list in fold_val_accuracies]
    avg_accuracy = np.mean(best_fold_accuracies)
    std_accuracy = np.std(best_fold_accuracies)
    trial_results.append((params, avg_accuracy, std_accuracy))
    
    print(f"\nTrial {trial + 1} completed:")
    print(f"Parameters: {params}")
    print(f"Average accuracy across folds: {avg_accuracy:.4f} ± {std_accuracy:.4f}")

best_trial = max(trial_results, key=lambda x: x[1])
print("\nBest trial results:")
print(f"Parameters: {best_trial[0]}")
print(f"Accuracy: {best_trial[1]:.4f} ± {best_trial[2]:.4f}")

### Defenition and training of a model with the best hyperparameter found

In [None]:
best_params = best_trial[0]

device = get_device()

# Hyperparameters for the neural network
input_size = 28 * 28
num_classes = 10

# Hyperparameters for the training
num_epochs = 300
train_batch_size = len(train_dataset) 
val_batch_size = len(val_dataset)
learning_rate = 0.01

# Hyperparameters for the Early Stopping
patience = 3
min_delta = 0.040

train_loader = create_dataloaders(train_dataset, train_batch_size, num_workers=8, shuffle=True) 
test_loader = create_dataloaders(test_dataset, val_batch_size, num_workers=8, shuffle=False)

best_model, criterion, optimizer = create_model(
    input_size=input_size,
    n_neurons=best_params['n_neurons'],
    num_classes=num_classes, 
    learning_rate=learning_rate,
    eta_minus=best_params['eta_minus'],
    eta_plus=best_params['eta_plus'],
    min_step=best_params['min_step'],
    max_step=best_params['max_step'],
    device=device
)

train_losses, val_losses, val_accuracies = train_model(
    best_model, train_loader, val_loader, 
    criterion, optimizer, device, 
    num_epochs, patience, min_delta,
    verbose=False
)

plot_results(train_losses, val_losses, val_accuracies)

### Best model testing

In [None]:
_, acc = validate_model(best_model, test_loader, criterion, device)
print(f'Test accuracy: {acc:.2f}%')

plot_predictions(best_model, test_loader, device)