# Deep Learning Project: CNN

## Import

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, TensorDataset
from torchvision.datasets import ImageFolder
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np
import random
import os
import shutil
import kagglehub
import gc

## Configuration

In [None]:
use_standard_augmentation = False
use_cutmix = False
use_fewshot = False

seed = 1
batch_size = 64
learning_rate = 0.001

model_dir = "/content/ovr_models"
os.makedirs(model_dir, exist_ok=True)

## Device

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Use device: {device}")

## Seed

In [None]:
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(seed)

## Augmentation

In [None]:
if use_standard_augmentation:
    transform_train = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
    ])
    print("Using standard augmentation.")
elif use_cutmix:
    transform_train = transforms.ToTensor()
    print("Using cutmix augmentation.")
else:
    transform_train = transforms.ToTensor()
    print("Using no augmentation.")

transform_test = transforms.ToTensor()

## Download dataset

In [None]:
cinic_path = kagglehub.dataset_download("mengcius/cinic10")

## Init sets

In [None]:
train_set = ImageFolder(f"{cinic_path}/train", transform=transform_train)
valid_set = ImageFolder(f"{cinic_path}/valid", transform=transform_test)
test_set = ImageFolder(f"{cinic_path}/test", transform=transform_test)
classes = train_set.classes

## Model definition

In [None]:
class ConvolutionalNeuralNetwork(nn.Module):
    def __init__(self, conv_layers, fc_layers, n_classes=1, dropout=0.4):
        super().__init__()
        layers = []
        in_channels = 3
        for out_channels, kernel_size in conv_layers:
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size, padding=1))
            layers.append(nn.BatchNorm2d(out_channels))
            layers.append(nn.ReLU())
            layers.append(nn.MaxPool2d(2))
            in_channels = out_channels
        self.conv = nn.Sequential(*layers)

        dummy = torch.zeros(1, 3, 32, 32)
        with torch.no_grad():
            out = self.conv(dummy)
        conv_out_dim = out.view(1, -1).shape[1]

        fc = []
        for size in fc_layers:
            fc.append(nn.Linear(conv_out_dim, size))
            fc.append(nn.ReLU())
            fc.append(nn.Dropout(dropout))
            conv_out_dim = size
        fc.append(nn.Linear(conv_out_dim, n_classes))
        self.fc = nn.Sequential(*fc)

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)


## Cut-mix implementation

In [None]:
def apply_cutmix(inputs, labels, alpha=1.0):
    lam = torch.distributions.Beta(alpha, alpha).sample().item()
    rand_index = torch.randperm(inputs.size(0), device=inputs.device)

    target_a = labels
    target_b = labels[rand_index]

    bbx1, bby1, bbx2, bby2 = rand_bbox_torch(inputs.size(), lam, device=inputs.device)
    inputs[:, :, bbx1:bbx2, bby1:bby2] = inputs[rand_index, :, bbx1:bbx2, bby1:bby2]

    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (inputs.size(-1) * inputs.size(-2)))

    return inputs, target_a, target_b, lam

def rand_bbox_torch(size, lam, device='cpu'):
    W = size[2]
    H = size[3]
    cut_rat = torch.sqrt(torch.tensor(1. - lam))
    cut_w = (W * cut_rat).int()
    cut_h = (H * cut_rat).int()

    cx = torch.randint(W, (1,), device=device).item()
    cy = torch.randint(H, (1,), device=device).item()

    bbx1 = max(cx - cut_w.item() // 2, 0)
    bby1 = max(cy - cut_h.item() // 2, 0)
    bbx2 = min(cx + cut_w.item() // 2, W)
    bby2 = min(cy + cut_h.item() // 2, H)

    return bbx1, bby1, bbx2, bby2

## OvR functions

In [None]:
def create_ovr_dataset(dataset, target_class, max_per_class=None):
    data, targets = [], []
    count_pos, count_neg = 0, 0
    for x, y in dataset:
        label = 1 if y == target_class else 0

        # jeśli few-shot aktywny i przekroczony limit, pomiń
        if max_per_class is not None:
            if label == 1 and count_pos >= max_per_class:
                continue
            if label == 0 and count_neg >= 9 * max_per_class:
                continue

        data.append(x)
        targets.append(label)

        if label == 1:
            count_pos += 1
        else:
            count_neg += 1

    return TensorDataset(torch.stack(data), torch.tensor(targets))

def evaluate_binary_model(model, dataloader):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = torch.sigmoid(model(inputs).squeeze())
            preds = (outputs > 0.5).float()
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
    acc = accuracy_score(y_true, y_pred)
    return acc

def train_ovr_model(model, data_loader, valid_loader, test_loader, criterion, optimizer, epochs=5):
    model.train()
    train_accs, valid_accs, test_accs = [], [], []
    for epoch in range(epochs):
        total_loss = 0.0
        for inputs, labels in data_loader:
          inputs, labels = inputs.to(device, non_blocking=True), labels.float().to(device, non_blocking=True)

          if use_cutmix:
            inputs, target_a, target_b, lam = apply_cutmix(inputs, labels)

          optimizer.zero_grad()
          outputs = model(inputs).squeeze()

          if use_cutmix:
            loss = lam * criterion(outputs, target_a) + (1 - lam) * criterion(outputs, target_b)
          else:
            loss = criterion(outputs, labels)

          loss.backward()
          optimizer.step()
          total_loss += loss.item()

        train_acc = evaluate_binary_model(model, data_loader)
        train_accs.append(train_acc)

        valid_acc = evaluate_binary_model(model, valid_loader)
        valid_accs.append(valid_acc)

        test_acc = evaluate_binary_model(model, test_loader)
        test_accs.append(test_acc)

        print(f"Epoch {epoch+1}, loss: {(total_loss / len(data_loader)):.4f}, train acc: {train_acc:.4f}, valid acc: {valid_acc:.4f}, test acc: {test_acc:.4f}")

    return train_accs, valid_accs, test_accs

def predict_ovr(models, dataloader):
    outputs_all = []
    for model in models:
        model.eval()
        preds = []
        with torch.no_grad():
            for inputs, _ in dataloader:
                inputs = inputs.to(device, non_blocking=True)
                outputs = torch.sigmoid(model(inputs).squeeze())
                preds.append(outputs.cpu())
        outputs_all.append(torch.cat(preds).unsqueeze(1))
    return torch.cat(outputs_all, dim=1).argmax(dim=1)


## Save model function

In [None]:
def save_ovr_model(model, class_index):
    path = os.path.join(model_dir, f"model_class_{class_index}.pt")
    torch.save(model.state_dict(), path)

## Train

In [None]:
print("Training OvR Ensemble...")
ovr_models = []
ovr_stats = []
conv_arch = [(32, 3), (64, 3)]
fc_arch = [128]

dataset_to_use = train_set
max_per_class=None
if use_fewshot:
    print("Using few-shot: 100 samples per each class")
    max_per_class = 100

for i, cls in enumerate(classes):
    print(f"Training model for class {cls}...")
    binary_train = create_ovr_dataset(dataset_to_use, i, max_per_class)
    binary_valid = create_ovr_dataset(valid_set, i, None)
    binary_test  = create_ovr_dataset(test_set, i, None)

    loader = DataLoader(binary_train, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
    vloader = DataLoader(binary_valid, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
    tloader = DataLoader(binary_test,  batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

    model = ConvolutionalNeuralNetwork(conv_arch, fc_arch, n_classes=1, dropout=0.4).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.BCEWithLogitsLoss()

    train_accs, valid_accs, test_accs = train_ovr_model(model, loader, vloader, tloader, criterion, optimizer, epochs=25)
    ovr_stats.append((train_accs, valid_accs, test_accs))

    # Save model and clean RAM
    save_ovr_model(model, i)
    del model
    del loader
    del vloader
    del tloader
    gc.collect()
    torch.cuda.empty_cache()

## Load models

In [None]:
os.makedirs(model_dir, exist_ok=True)

def load_ovr_models(num_classes, conv_arch, fc_arch):
    models = []
    for i in range(num_classes):
        model = ConvolutionalNeuralNetwork(conv_arch, fc_arch, n_classes=1, dropout=0.4).to(device)
        path = os.path.join(model_dir, f"model_class_{i}.pt")
        model.load_state_dict(torch.load(path, map_location=device))
        model.eval()
        models.append(model)
    return models

ovr_models = load_ovr_models(len(classes), conv_arch, fc_arch)
print("Models loaded for evaluation.")

## Evaluation

In [None]:
print("\n Evaluation:")

test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

y_true = []
for _, labels in test_loader:
    y_true.extend(labels.numpy())

y_pred = predict_ovr(ovr_models, test_loader)

acc = accuracy_score(y_true, y_pred)
print(f"Accuracy OvR ensemble: {acc:.4f}")

## Charts

In [None]:
for i, cls in enumerate(classes):
    train_accs, valid_accs, test_accs = ovr_stats[i]
    plt.plot(train_accs, label='Train')
    plt.plot(valid_accs, label='Valid')
    plt.plot(test_accs,  label='Test')
    plt.title(f'Accuracy for class: {cls}')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    plt.show()


## Confusion matrix

In [None]:
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
disp.plot(cmap="Blues", xticks_rotation=45)
plt.title("Confusion matrix")
plt.show()