# Zadanie

Tym razem zadanie polega na stworzeniu klasyfikatora obrazków działającego na 50 klasach z różnymi przedmiotami/zwierzętami itp. Do dyspozycji mają Państwo zbiór treningowy podzielony na odpowiednie podfoldery z klasami oraz zbiór testowy - bez podziału. Zbiór treningowy jest przygotowany w taki sposób by można go było łatwo załadować za pomocą klasy torchvision.ImageFolder wywołując np:
trainset = ImageFolder("data/train/", transform=train_transform)
Wówczas wszystkie przykłady zostaną przypisane do odpowiedniej klasy w zależności od tego w jakim podfolderze się znajdowały.
Jako że dane są bardzo duże to umieściłem je na OneDrive:
(train.zip i test_all.zip)

Proszę zwrócić szczególną uwagę na formę zwracanego rozwiązania, bo ostatnio większość z państwa zrobiła to byle jak i miałem bardzo dużo problemów z dodawaniem/usuwaniem niepotrzebnych wierszy itp. Tym razem nie będę poprawiał przesyłanych przez Państwa plików!
W ramach rozwiązania, proszę oddać poprzez teamsy plik archiwum .zip z kodem (w formie notebooka, lub skryptu/skryptów .py) oraz plikiem .csv z predykcjami na zbiorze testowym. BEZ dodatkowych podfolderów i BEZ danych. W ramach predykcji proszę zapisać tym razem dwie kolumny (bez nagłówków):
- Pierwszą kolumnę z nazwami plików testowych (uwaga pliki nazywają się np. 850043533599308.JPEG a nie 850043533599308.jpeg, 850043533599308.jpg czy 850043533599308). Proszę zwrócić na to uwagę bo mój skrypt ewaluacyjny inaczej nie zadziała.
- Drugą kolumnę z wartościami oznaczającą predykcję numeru klasy. Klasy ponumerowane są zgodnie z numeracją ze zbioru treningowego (startując od zera). Po utworzeniu datasetu mogą to państwo sprawdzić wywołując trainset.classes.

Uwaga: W zadaniu proszę nie wykorzystywać gotowych architektur o których wspominałem na zajęciach, poświęcimy temu zagadnieniu całe ćwiczenia.

# Importing libraries and modules

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
import csv
import natsort

from collections import Counter
from tqdm import tqdm
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

from torchvision.models import resnet18
from torchvision.utils import make_grid
from torchvision.datasets import ImageFolder
from torchvision.transforms import Compose, ToTensor, Normalize, RandomRotation, RandomAffine, RandomErasing, Grayscale, RandomHorizontalFlip

# Helper functions

In [None]:
class EarlyStopping():
    def __init__(self, tolerance=5, min_delta=0):

        self.tolerance = tolerance
        self.min_delta = min_delta
        self.counter = 0
        self.early_stop = False

    def __call__(self, train_loss, validation_loss):
        if (validation_loss - train_loss) > self.min_delta:
            self.counter +=1
            if self.counter >= self.tolerance:  
                self.early_stop = True

In [None]:
def get_device():
    """Returns the available device for computation.
    Returns:
        torch.device: available device for computation
    """
    compute_device = None
    if torch.cuda.is_available():
        compute_device = torch.device('cuda')
    elif torch.backends.mps.is_available():
        compute_device = torch.device('mps')
    else:
        compute_device = torch.device('cpu')
    
    print(f'device is {compute_device}')
    return compute_device


def save_predictions_to_csv(predictions, file_path):
    with open(file_path, "w") as f:
        writer = csv.writer(f)
        writer.writerows(predictions.items())

# Params and constants

In [None]:
random_seed = 100
device = get_device()
# device = torch.device('cpu')

train_data_path = 'train/'
test_data_opath = 'test_all/'
confusion_matrix_path = 'conf_matrix.png'

ROTATION_ANGLE = 15
RANDOM_AFFINE = 20
VAL_SET_FRACTION = 0.15
CROP_FACTOR = 0.9

In [None]:
torch.manual_seed(random_seed)
np.random.seed(random_seed)

# Loading data

In [None]:
img_transformations = Compose([
    ToTensor(),
    Grayscale(),
    Normalize((0.5), (0.5)),
    RandomHorizontalFlip(0.2),
    RandomErasing(0.2),
    RandomRotation(ROTATION_ANGLE),
    RandomAffine(RANDOM_AFFINE)
])

In [None]:
img_dataset = ImageFolder(train_data_path, transform=img_transformations)

In [None]:
img_dataset

In [None]:
class_names, class_idx = img_dataset.find_classes(train_data_path)
class_names

In [None]:
num_classes = len(class_names)
num_classes

# Split data to train and validation sets

In [None]:
VAL_SET_SIZE = int(len(img_dataset) * VAL_SET_FRACTION)

### Datasets

In [None]:
train_dataset, val_dataset = torch.utils.data.random_split(img_dataset, [len(img_dataset) - VAL_SET_SIZE, VAL_SET_SIZE])

In [None]:
print(f'Train set size: {len(train_dataset)}')
print(f'Val set size: {len(val_dataset)}')

assert sum([len(train_dataset), len(val_dataset)]) == len(img_dataset)

### DataLoaders

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=512, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=1024)

# EDA

In [None]:
class_counts_dataset_full = Counter(img_dataset.targets)

In [None]:
def plot_class_distribution(class_labels, class_counts, class_names, label):
    plt.figure(figsize=(15, 5))
    plt.bar(class_labels, class_counts)

    plt.title(label)
    plt.ylabel('Count')
    plt.xlabel('Class')
    plt.xticks(range(num_classes), class_names, rotation=90)

    plt.tight_layout()
    plt.show()

In [None]:
plot_class_distribution(class_counts_dataset_full.keys(), class_counts_dataset_full.values(), class_names, label='Class distribution for the whole dataset')

Some imbalances in the class labels for `bread` and `carbon` class

In [None]:
plt.figure(figsize=(10, 10))

sample_images, sample_labels = iter(train_dataloader).next()

def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

imshow(make_grid(sample_images))

In [None]:
input_shape = train_dataset[0][0].shape
input_shape

# Model

In [None]:
class ClassificationModelTemplate(nn.Module):
    def __init__(self, num_classes, device, callbacks) -> None:
        super().__init__()
        self.num_classes = num_classes
        self.device = device
        self.callbacks = callbacks
        self.train_report = {
            'train_loss_history': [],
            'val_loss_history': []
        }

    def forward(self, X):
        return X
    
    def predict(self, X):
        self.eval()
        y_pred = self.forward(X)
        return torch.tensor([torch.argmax(pred) for pred in y_pred])
    
    def fit(self, train_dataloader, loss_func, optimizer, epochs, val_dataloader=None):
        self.train()
        for epoch in range(epochs):
            print('================================')
            print(f'Epoch {epoch}')
            train_epoch_loss = self._train_one_epoch(train_dataloader, loss_func, optimizer)
            avg_epoch_loss = train_epoch_loss / len(train_dataloader)
            self._log_training_loss(avg_epoch_loss, epoch)
            
            if val_dataloader is not None:
                val_loss = self.perform_validation(loss_func, val_dataloader)
                avg_val_loss = val_loss / len(val_dataloader)
                self._log_validation_loss(avg_val_loss)
            
            if self.callbacks is not None:
                if self.callbacks.get['early_stopping'] is not None:
                    self.callbacks['early_stopping'](avg_epoch_loss, avg_val_loss)
                    if self.callbacks['early_stopping'].early_stop:
                        print('Stopping early')
                        break
    
    def _train_one_epoch(self, train_dataloader, loss_func, optimizer):
        epoch_loss = 0.0
        for batch_samples, batch_labels in tqdm(train_dataloader):
            X = batch_samples.to(self.device)
            y = batch_labels.to(self.device)
            
            y_pred = self.forward(X)
            loss = loss_func(y_pred, y)
            self._optimize_params(loss, optimizer)
            epoch_loss += loss.item()
        return epoch_loss
            
    def _optimize_params(self, loss, optimizer):
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    def perform_validation(self, loss_func, val_dataloader):
        val_loss = 0.0
        with torch.no_grad():
            for batch_samples, batch_labels in val_dataloader:
                X = batch_samples.to(self.device)
                y = batch_labels.to(self.device)
                
                y_pred = self.forward(X)
                loss = loss_func(y_pred, y)
                val_loss += loss.item()
        return val_loss
    
    def _log_training_loss(self, epoch_loss, epoch):
        self.train_report['train_loss_history'].append(epoch_loss)
        print(f'Train Loss: {epoch_loss}')
    
    def _log_validation_loss(self, epoch_loss):
        self.train_report['val_loss_history'].append(epoch_loss)
        print(f'Val Loss: {epoch_loss}')
    
    def get_conf_matrix(self, dataloader):
        confusion_matrix = torch.zeros(self.num_classes, self.num_classes)
        with torch.no_grad():
            for batch_samples, batch_labels in dataloader:
                X = batch_samples.to(self.device)
                y = batch_labels.to(self.device)
                
                y_pred = self.predict(X)
                for t, p in zip(y.view(-1), y_pred.view(-1)):
                        confusion_matrix[t.long(), p.long()] += 1
        return confusion_matrix
    
    def class_accuracy(self, dataloader):
        conf_matrix = self.get_conf_matrix(dataloader)
        return conf_matrix.diag()/conf_matrix.sum(1)


class ClassificationModel(ClassificationModelTemplate):
    def __init__(self, input_channels, num_classes, device, callbacks: dict=None) -> None:
        super().__init__(num_classes, device, callbacks)
        
        self.conv_pooling_stack = nn.Sequential(
            nn.Conv2d(input_channels, 64, kernel_size=5, padding='same'),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Dropout(0.6),
            nn.MaxPool2d(2, 2), # 32x32x16
            nn.Conv2d(64, 128, kernel_size=3),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.Dropout(0.6),
            nn.MaxPool2d(2, 2), #15x15x32
            nn.Conv2d(128, 256, kernel_size=3),
            nn.ReLU(),
            nn.Dropout(0.6),
            nn.MaxPool2d(2, 2) #6x6x64
        )
        self.flatten = nn.Flatten(1)
        
        self.fully_connected_stack = nn.Sequential(
            nn.Linear(256*6*6, 2048),
            nn.ReLU(),
            nn.Dropout(0.6),
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.6),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.6),
            nn.Linear(128, 64),
            nn.ReLU()
        )
        self.output_layer = nn.Linear(64, self.num_classes)
    
    def forward(self, X):
        X = self.conv_pooling_stack(X)
        X = self.flatten(X)
        X = self.fully_connected_stack(X)
        return self.output_layer(X)
    

# Transfer learning for tests
class ResNetPretrained(ClassificationModelTemplate):
    def __init__(self, num_classes, device, callbacks: dict=None) -> None:
        super().__init__(num_classes, device, callbacks)
        self.resnet_model = resnet18(pretrained=True)
        num_features = self.resnet_model.fc.in_features
        self.resnet_model.fc = nn.Linear(num_features, num_classes)
    
    def forward(self, X):
        return self.resnet_model(X)
    
    def predict(self, X):
        self.resnet_model.eval()
        y_pred = self.forward(X)
        return torch.tensor([torch.argmax(pred) for pred in y_pred])
    
    def parameters(self):
        return self.resnet_model.parameters()

In [None]:
# https://paperswithcode.com/sota/incremental-learning-on-cifar-100-50-classes-2


# ResNet for performance testing
class ResidualBlock(nn.Module):
    def __init__(self, input_channels, out_channels, stride=1, downsample = None):
        super().__init__()
        self.conv1 = nn.Sequential(
                        nn.Conv2d(input_channels, out_channels, kernel_size=3, stride=stride, padding=1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU())
        
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                        nn.BatchNorm2d(out_channels))
        
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.out_channels = out_channels
        
    def forward(self, X):
        residual = X
        out = self.conv1(X)
        out = self.conv2(out)
        if self.downsample is not None:
            residual = self.downsample(X)
        out += residual
        out = self.relu(out)
        return out


class ResNetImplementation(ClassificationModelTemplate):
    def __init__(self, residual_block, layers, num_classes, device, callbacks: dict=None) -> None:
        super().__init__(num_classes, device, callbacks)
        self.inplanes = 64
        
        self.conv1 = nn.Sequential(
                        nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                        nn.BatchNorm2d(64),
                        nn.ReLU())
        
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.residual_block_stack = nn.Sequential(
            self._make_layer(residual_block, 64, layers[0], stride=1),
            self._make_layer(residual_block, 128, layers[1], stride=2),
            self._make_layer(residual_block, 256, layers[2], stride=2),
            self._make_layer(residual_block, 512, layers[3], stride=2)
        )

        self.avgpool = nn.AvgPool2d(2, stride=1)
        self.output_layer = nn.Linear(512, num_classes)
    
    def _make_layer(self, residual_block, planes, residual_blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes),
            )
        layers = []
        layers.append(residual_block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for _ in range(1, residual_blocks):
            layers.append(residual_block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    def forward(self, X):
        X = self.conv1(X)
        X = self.maxpool(X)
        X = self.residual_block_stack(X)

        X = self.avgpool(X)
        X = X.view(X.size(0), -1)
        return self.output_layer(X)
    
    def predict(self, X):
        self.eval()
        y_pred = self.forward(X)
        return torch.tensor([torch.argmax(pred) for pred in y_pred])
    
    

# Training

In [None]:
early_stopping = EarlyStopping(5, 0.3)

callbacks = {'early_stopping': early_stopping}

In [None]:
# model = ResNetClassificationModel(num_classes, device).to(device)
# model = ResNetImplementation(ResidualBlock, [3, 4, 6, 3], num_classes, device).to(device)
model = ClassificationModel(1, num_classes, device).to(device)

model

In [None]:
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)

# Training

In [None]:
model.fit(train_dataloader, loss_func, optimizer, epochs=35, val_dataloader=val_dataloader)

In [None]:
plt.plot(model.train_report['train_loss_history'], label='Train')
plt.plot(model.train_report['val_loss_history'], label='Validation')
plt.legend()


# Validation

### Accuracy per class

In [None]:
train_acc_per_class = model.class_accuracy(train_dataloader)
val_acc_per_class = model.class_accuracy(val_dataloader)


In [None]:
print(f'Train acc: {torch.mean(train_acc_per_class)}')
print(f'Val acc: {torch.mean(val_acc_per_class)}')

# Testing

In [None]:
class TestDataSet(Dataset):
    def __init__(self, main_dir, transform):
        self.main_dir = main_dir
        self.transform = transform
        self.images = natsort.natsorted(os.listdir(main_dir))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_loc = os.path.join(self.main_dir, self.images[idx])
        image = Image.open(img_loc).convert("RGB")
        tensor_image = self.transform(image)
        return tensor_image
    
    def getImageName(self, idx):
        return self.images[idx]

In [None]:
def predict_for_test_dataset(test_dataset, test_dataloader, model):
    with torch.no_grad():
        predictions = []
        for batch in test_dataloader:
            predictions.append(model.predict(batch.to(model.device)))
        predictions_flat = [prediction.item() for sublist in predictions for prediction in sublist]
    return dict(zip(test_dataset.images, predictions_flat))

In [None]:
test_dataset = TestDataSet(test_data_opath, transform=img_transformations)
test_dataloader = DataLoader(test_dataset , batch_size=24, shuffle=False, drop_last=False)

In [None]:
predictions = predict_for_test_dataset(test_dataset, test_dataloader, model)

In [None]:
save_predictions_to_csv(predictions, 'poniedzialek_Kulesza_Tomaszewski_results.csv')