In [None]:
!pip install torch torchvision matplotlib
!pip install --upgrade torch torchvision

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import random
import numpy as np
import json
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
import matplotlib.pyplot as plt
import time
from torch.amp import GradScaler, autocast
import os
from google.colab import drive

In [None]:
drive.mount('/content/drive')
if not os.path.exists('/content/drive/MyDrive'):
    raise RuntimeError("Google Drive not mounted correctly!")

In [None]:
BATCH_SIZE = 64

In [None]:
# Custom Cutout
"""class Cutout(object):
    def __init__(self, size):
        self.size = size

    def __call__(self, img):
        if isinstance(img, Image.Image):
            img = np.array(img)

        h, w = img.shape[:2]
        mask = np.ones((h, w), np.float32)
        y = np.random.randint(h)
        x = np.random.randint(w)
        y1 = np.clip(y - self.size // 2, 0, h)
        y2 = np.clip(y + self.size // 2, 0, h)
        x1 = np.clip(x - self.size // 2, 0, w)
        x2 = np.clip(x + self.size // 2, 0, w)
        if len(img.shape) == 2:  # Handle grayscale images
            img = img * mask
        else:
            img = img * mask[:, :, np.newaxis]

        return Image.fromarray(np.uint8(img))"""

"""# Mixup function
def mixup_data(x, y, alpha=1.0):
    if alpha > 0.0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1.0
    batch_size = x.size(0)
    index = torch.randperm(batch_size)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam"""

# Data transformations with additional augmentation
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    #transforms.RandomRotation(15),
    #transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    #transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    #transforms.RandomGrayscale(p=0.1),
    #transforms.RandomErasing(p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3)),
    #Cutout(size=8),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-100 dataset
start_time = time.time()
train_dataset = torchvision.datasets.CIFAR100(
    root='./data',
    train=True,
    download=True,
    transform=transform_train
)
test_dataset = torchvision.datasets.CIFAR100(
    root='./data',
    train=False,
    download=True,
    transform=transform_test
)
print(f"Dataset loading time: {time.time() - start_time:.2f} seconds")

# Split training and validation sets
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [train_size, val_size])

# Data loaders
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)
val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

# Debugging: Check DataLoader outputs
for i, (inputs, labels) in enumerate(train_loader):
    print(f"Batch {i}: inputs shape: {inputs.shape}, labels shape: {labels.shape}")
    if i == 10:  # Test first 10 batches
        break
print(f"Data loading for 10 batches completed.")


In [None]:
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()

        # Layer convolutivi
        self.conv1 = nn.Conv2d(3, 64, kernel_size=5)  # 3 input channels, 64 output channels
        self.conv2 = nn.Conv2d(64, 64, kernel_size=5)  # 64 input channels, 64 output channels

        # Layer fully connected
        self.fc1 = nn.Linear(64 * 5 * 5, 384)  # Dimensione calcolata per input 32x32 con due conv e max-pooling
        self.fc2 = nn.Linear(384, 192)
        self.fc3 = nn.Linear(192, 100)  # Classificatore lineare per CIFAR-100

    def forward(self, x):
        # Layer convolutivi con ReLU e max-pooling
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)  # Max pooling 2x2
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)  # Max pooling 2x2

        # Flatten per i layer fully connected
        x = torch.flatten(x, 1)

        # Layer fully connected con ReLU
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))

        # Classificatore lineare
        x = self.fc3(x)

        # Softmax per probabilità
        x = F.log_softmax(x, dim=1)
        return x

In [None]:
# Early Stopping Class
class EarlyStopping:
    def __init__(self, patience=10, delta=0, path='/content/drive/MyDrive/Early1checkpoint.pt', verbose=False):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta
        self.path = path
        self.verbose = verbose

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f"Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...")
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss



In [None]:

# Training and evaluation function with gradient clipping and label smoothing
# def train_and_evaluate(model, optimizer, scheduler, criterion, train_loader, val_loader, num_epochs, device,      early_stopping, alpha=1.0):
def train_and_evaluate(model, optimizer, scheduler, criterion, train_loader, val_loader, num_epochs, device, type_optimezer):

    model.train()
    scaler = GradScaler(device='cuda')  # Initialize mixed precision scaler
    accumulation_steps = 4  # Gradient accumulation

    for epoch in range(num_epochs):
        epoch_start = time.time()

        # Training loop
        train_loss_total = 0
        train_correct, train_total = 0, 0
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            # inputs, labels_a, labels_b, lam = mixup_data(inputs, labels, alpha)
            optimizer.zero_grad()

            with autocast(device_type='cuda'):  # Mixed precision forward pass
                outputs = model(inputs)
                loss = criterion(outputs, labels) / accumulation_steps

            scaler.scale(loss).backward()

            if (batch_idx + 1) % accumulation_steps == 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient clipping
                scaler.step(optimizer)
                scaler.update()

            train_loss_total += loss.item() * accumulation_steps
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()

        train_loss = train_loss_total / len(train_loader)
        train_acc = 100. * train_correct / train_total

        # Validation loop
        model.eval()
        val_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
                with autocast(device_type='cuda'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        val_acc = 100. * correct / total
        val_loss /= len(val_loader)

        if epoch == 2 and train_acc < 1.0:
            print(f"Epoch {epoch+1}: Train accuracy {train_acc:.2f}% too low, skipping to next trial...")
            return None, None, None
        if epoch == 3 and train_acc < 5.0:
            print(f"Epoch {epoch+1}: Train accuracy {train_acc:.2f}% too low, skipping to next trial...")
            return None, None, None

        epoch_end = time.time()
        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")

        # Adjust learning rate
        scheduler.step()

        # # Early stopping
        # early_stopping(val_loss, model)
        # if early_stopping.early_stop:
        #     print("Early stopping at epoch:", epoch + 1)
        #     break



    return train_loss, val_loss, val_acc

In [None]:
def random_search(train_loader, val_loader, model_class, device, num_trials, num_epochs):
    param_space = {
        'lr': [1e-4, 5e-4, 1e-3, 1e-2, 5e-2],
        'weight_decay': [1e-6, 1e-5, 1e-4, 1e-3, 1e-2],
        'betas': [(0.8, 0.9), (0.9, 0.999), (0.95, 0.98)],
        'eps': [1e-8, 1e-4, 1e-6]
    }

    results = []

    for trial in range(num_trials):

        lr = random.choice(param_space['lr'])
        weight_decay = random.choice(param_space['weight_decay'])
        betas = random.choice(param_space['betas'])
        eps = random.choice(param_space['eps'])

        print(f"Trial {trial + 1}/{num_trials} | lr: {lr} | weight_decay: {weight_decay} | betas: {betas} | eps: {eps}")

        model = model_class().to(device)
        optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas, eps=eps)
        scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)
        criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
        # early_stopping = EarlyStopping(verbose=True)

        # train_loss, val_loss, val_acc = train_and_evaluate(
        #     model, optimizer, scheduler, criterion, train_loader, val_loader, num_epochs, device=device, early_stopping=early_stopping
        # )
        train_loss, val_loss, val_acc = train_and_evaluate(
            model, optimizer, scheduler, criterion, train_loader, val_loader, num_epochs, device=device,
        )

        if train_loss is None:
            print(f"Skipping trial {trial + 1} due to low accuracy.")
            continue

        results.append({
            'lr': lr,
            'weight_decay': weight_decay,
            'betas': betas,
            'eps': eps,
            'val_loss': val_loss,
            'val_acc': val_acc
        })

    results.sort(key=lambda x: x['val_acc'], reverse=True)
    best_hyperparams = results[0]

    with open('/content/drive/MyDrive/best_hyperparams_AdamW.json', 'w') as f:
        json.dump(best_hyperparams, f)
    print("\nBest Hyperparameters saved to /content/drive/MyDrive/best_hyperparams_AdamW.json")

    return best_hyperparams