In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.optim.lr_scheduler import _LRScheduler
import torch.utils.data as data

from torchvision.transforms.functional import to_pil_image
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torchinfo import summary

from vision_models.alexnet import AlexNet
from vision_models.resnet import ResNet, cfgs, resnet50_config
from vision_models.vgg import VGG, get_vgg_layers, vgg_configs

from sklearn import decomposition
from sklearn import manifold
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix, roc_auc_score, roc_curve, auc, precision_recall_curve, precision_recall_fscore_support, classification_report
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize
from sklearn.multiclass import OneVsRestClassifier
from sklearn.utils.multiclass import unique_labels
from matplotlib.colors import Normalize
from tqdm.notebook import tqdm, trange
import matplotlib.pyplot as plt
import numpy as np
import optuna

from itertools import cycle
from scipy import interp
import copy
from collections import namedtuple
import os
import random
import shutil
import time
import collections
import math
import pandas as pd
import warnings

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

warnings.filterwarnings("ignore")

In [None]:
SEED = 1996

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.mps.manual_seed(SEED)
torch.use_deterministic_algorithms(True)
torch.backends.cudnn.deterministic = True

In [None]:
batch_size = 64

transform_train = transforms.Compose(
    [
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.4992, 0.4839, 0.4827], std=[0.2325, 0.2332, 0.2327]
        ),
    ]
)

transform_test = transforms.Compose(
    [
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.4992, 0.4839, 0.4827], std=[0.2325, 0.2332, 0.2327]
        ),
    ]
)

In [None]:
# Define constants
DATA_DIR = (
    "CarDD_release_folders_single"
)
TRAIN_FOLDERS = ["train"]
VAL_TEST_FOLDERS = ["val", "test"]

try:
    # Create train datasets
    train_dataset = datasets.ImageFolder(
        os.path.join(DATA_DIR, TRAIN_FOLDERS[0]), transform=transform_train
    )

    # Create validation and test datasets
    val_dataset = datasets.ImageFolder(
        os.path.join(DATA_DIR, VAL_TEST_FOLDERS[0]), transform=transform_test
    )
    test_dataset = datasets.ImageFolder(
        os.path.join(DATA_DIR, VAL_TEST_FOLDERS[1]), transform=transform_test
    )

except FileNotFoundError as e:
    print(f"Error: {e}. Please make sure the data directory is correct.")

except Exception as e:
    print(f"Error: {e}. An error occurred while creating the datasets.")

In [None]:
train_iter = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, drop_last=True
)

valid_iter = torch.utils.data.DataLoader(
    val_dataset, batch_size, shuffle=False, drop_last=True
)

test_iter = torch.utils.data.DataLoader(
    test_dataset, batch_size, shuffle=False, drop_last=False
)

In [None]:
def count_images_per_category(base_dir):
    # Initialize a dictionary to store image counts per category
    category_counts = {}

    # Initialize a variable to keep track of the total image count
    total_count = 0

    # Iterate through the subdirectories (categories)
    for category in os.listdir(base_dir):
        category_dir = os.path.join(base_dir, category)
        if os.path.isdir(category_dir):
            # Count the number of image files in the category directory
            image_count = len(
                [
                    file
                    for file in os.listdir(category_dir)
                    if file.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".bmp"))
                ]
            )
            category_counts[category] = image_count

            # Add the count to the total
            total_count += image_count

    # Include the total count in the dictionary
    category_counts["Total"] = total_count

    return category_counts

In [None]:
count_images_per_category(
    "CarDD_release_folders_single/train"
)

In [None]:
count_images_per_category(
    "CarDD_release_folders_single/val"
)

In [None]:
# means = torch.zeros(3)
# stds = torch.zeros(3)

# for img, label in train_dataset:
#     means += torch.mean(img, dim = (1,2))
#     stds += torch.std(img, dim = (1,2))

# means /= len(train_dataset)
# stds /= len(train_dataset)

# print(f'Calculated means: {means}')
# print(f'Calculated stds: {stds}')

In [None]:
def normalize_image(image):
    image_min = image.min()
    image_max = image.max()
    image.clamp_(min=image_min, max=image_max)
    image.add_(-image_min).div_(image_max - image_min + 1e-5)
    return image


def plot_images(images, labels, classes, normalize=True):
    """
    Plot a grid of images with their corresponding labels.

    Args:
        images (list of tensors): A list of images to plot.
        labels (list of int): A list of corresponding labels for each image.
        classes (list of str): A list of class names.
        normalize (bool): Whether to normalize the images.
    """
    n_images = len(images)
    rows = cols = int(n_images ** 0.5)
    fig, axs = plt.subplots(rows, cols, figsize=(15, 15))

    for i, ax in enumerate(axs.flatten()):
        image = images[i]

        if normalize:
            image = normalize_image(image)

        ax.imshow(image.permute(1, 2, 0).cpu().numpy())
        label = classes[labels[i]]
        ax.set_title(label)
        ax.axis('off')

    fig.tight_layout()
    return fig


N_IMAGES = 10
images, labels = next(iter(train_iter))
image_label_pairs = list(zip(images, labels))
random.shuffle(image_label_pairs)
shuffled_images, shuffled_labels = zip(*image_label_pairs)
classes = train_dataset.classes

fig = plot_images(shuffled_images[:N_IMAGES], shuffled_labels[:N_IMAGES], classes)
plt.show()



In [None]:
device = torch.device('mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Set criterion
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)

In [None]:
def count_parameters(model):
    parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f'The model has {parameters:,} trainable parameters')

In [None]:
def calculate_topk_accuracy(y_pred, y, k=2):
    with torch.no_grad():
        batch_size = y.shape[0]
        _, top_pred = y_pred.topk(k, 1)
        top_pred = top_pred.t()
        correct = top_pred.eq(y.view(1, -1).expand_as(top_pred))
        correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim=True)
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
        acc_1 = correct_1 / batch_size
        acc_k = correct_k / batch_size
    return acc_1, acc_k


def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim=True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc


In [None]:
def train(model, iterator, optimizer, criterion, scheduler, device):
    """
    Trains the model on the given data iterator.

    Args:
        model: A PyTorch model to train.
        iterator: A PyTorch data iterator that generates (x, y) tuples.
        optimizer: The PyTorch optimizer used for training.
        criterion: A PyTorch loss function to compute the loss.
        scheduler: The learning rate scheduler used for training.
            If no learning rate scheduler is used, set this to None.
        device: The device to use for evaluation (e.g., "cpu" or "cuda" or "mps").

    Returns:
        A tuple containing the epoch loss, top-1 accuracy, and top-5 accuracy
        (if applicable).
    """

    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0

    model.train()

    for (x, y) in tqdm(iterator, desc="Training", leave=False):
        x = x.to(device)
        y = y.to(device)

        optimizer.zero_grad()

        if isinstance(model, ResNet):
            # Calculate top-1 and top-5 accuracy for ResNet models
            y_pred, _ = model(x)
            acc_1, acc_5 = calculate_topk_accuracy(y_pred, y)
            epoch_acc_1 += acc_1.item()
            epoch_acc_5 += acc_5.item()
        else:
            # Calculate regular accuracy for other models
            y_pred = model(x)
            acc = calculate_accuracy(y_pred, y)
            epoch_acc_1 += acc.item()

        loss = criterion(y_pred, y)

        loss.backward()

        optimizer.step()

        if scheduler:
            scheduler.step()

        epoch_loss += loss.item()

    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)

    return epoch_loss, epoch_acc_1


In [None]:
def evaluate(model, iterator, criterion, device):
    """
    Evaluates the model on the given data iterator.

    Args:
        model: A PyTorch model to evaluate.
        iterator: A PyTorch data iterator that generates (x, y) tuples.
        criterion: A PyTorch loss function to compute the loss.
        device: The device to use for evaluation (e.g., "cpu" or "cuda" or "mps").

    Returns:
        A tuple containing the epoch loss, top-1 accuracy, and top-5 accuracy
        (if applicable).
    """

    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0

    model.eval()

    with torch.no_grad():

        for (x, y) in tqdm(iterator, desc="Evaluating", leave=False):

            x = x.to(device)
            y = y.to(device)

            if isinstance(model, ResNet):
                # Calculate top-1 and top-5 accuracy for ResNet models
                y_pred, _ = model(x)
                acc_1, acc_5 = calculate_topk_accuracy(y_pred, y)
                epoch_acc_1 += acc_1.item()
                epoch_acc_5 += acc_5.item()
            else:
                # Calculate regular accuracy for other models
                y_pred = model(x)
                acc = calculate_accuracy(y_pred, y)
                epoch_acc_1 += acc.item()

            loss = criterion(y_pred, y)
            epoch_loss += loss.item()

    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)

    return epoch_loss, epoch_acc_1


In [None]:
def get_predictions(model, iterator):
    """
    Get predictions for a PyTorch model on a given data iterator.

    Args:
        model: The PyTorch model to get predictions for.
        iterator: The data iterator for getting predictions.

    Returns:
        Tuple: A tuple containing the predicted images, labels, and probabilities.
    """

    model.eval()

    images = []
    labels = []
    probs = []

    with torch.no_grad():

        for (x, y) in tqdm(iterator):

            x = x.to(device)

            if isinstance(model, ResNet):
                y_pred, _ = model(x)
            else:
                y_pred = model(x)

            y_prob = F.softmax(y_pred, dim=-1)

            images.append(x.cpu())
            labels.append(y.cpu())
            probs.append(y_prob.cpu())

    images = torch.cat(images, dim=0)
    labels = torch.cat(labels, dim=0)
    probs = torch.cat(probs, dim=0)

    return images, labels, probs

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


In [None]:
def train_eval(EPOCHS, model, optimizer, scheduler):

    model = model.to(device)
    counter = 0
    patience = 8
    best_valid_loss = float('inf')
    best_epoch = 0
    best_val_acc = float('inf')
    train_accuracy = []
    valid_accuracy = []
    train_losses = []
    valid_losses = []

    for epoch in trange(EPOCHS, desc="EPOCHS"):
        start_time = time.monotonic()

        train_loss, train_acc = train(model, train_iter, optimizer, criterion, scheduler, device)
        valid_loss, valid_acc = evaluate(model, valid_iter, criterion, device)

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            best_val_acc = valid_acc
            best_epoch = epoch
            filename = f"{model.__class__.__name__}_{optimizer.__class__.__name__}.pt"
            torch.save(model.state_dict(), filename)
            counter = 0
        else:
            counter += 1
            # increase patience counter on no improvement
            if counter >= patience:
                print(f"Validation loss hasn't improved in {patience} epochs. Stopping early.")
                break

        end_time = time.monotonic()
        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        train_losses.append(train_loss)
        valid_losses.append(valid_loss)
        train_accuracy.append(train_acc)
        valid_accuracy.append(valid_acc)

        print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f"\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:6.2f}% |")
        print(f"\tValid Loss: {valid_loss:.3f} | Valid Acc: {valid_acc*100:6.2f}% |")

    print(f"Best epoch: {best_epoch+1}, Best validation accuracy: {best_val_acc}, Best validation loss: {best_valid_loss}")

    return train_accuracy, valid_accuracy, train_losses, valid_losses


In [None]:
test = pd.read_csv('CarDD_release_folders_single/ground_truth.csv')
y_test = np.array(test.drop(['Title'], axis=1))

In [None]:
def plot_roc_curve(y_test, probs, class_labels, NN):
    """
    Plots the ROC curves for a given set of true labels and predicted probabilities for each class.
    
    Args:
    y_test (np.array): True class labels of shape (n_samples, n_classes).
    probs (np.array): Predicted probabilities of shape (n_samples, n_classes).
    class_labels (list): List of class labels.
    """
    n_classes = len(class_labels)
    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    # Compute ROC curve and ROC area for each class
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test[:, i], probs[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Compute micro-average ROC curve and ROC area
    fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), probs.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    # Plot ROC curves
    plt.figure(figsize=(10, 10))
    plt.plot([0, 1], [0, 1], 'k--', label='Random Guess')

    colors = ['darkorange', 'green', 'blue', 'red', 'purple', 'yellow']
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], color=colors[i], lw=2,
                 label=f'ROC curve of {class_labels[i]} (area = {roc_auc[i]:.2f})')

    # Compute macro-average ROC curve and ROC area
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(n_classes):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= n_classes
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    # Plot macro-average ROC curve
    plt.plot(fpr["macro"], tpr["macro"], color='navy', lw=2, linestyle=':',
             label=f'macro-average ROC curve (area = {roc_auc["macro"]:.2f})')

    # Plot micro-average ROC curve
    plt.plot(fpr["micro"], tpr["micro"], color='deeppink', lw=2, linestyle=':',
             label=f'micro-average ROC curve (area = {roc_auc["micro"]:.2f})')

    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{NN} ROC Curves')
    plt.legend(loc="lower right")
    plt.savefig(f"{NN}_roc_curve.png", bbox_inches='tight')
    plt.show()


In [None]:
def plot_confusion_matrix(y_true: np.ndarray, y_pred: np.ndarray) -> None:
    """
    Plot confusion matrix using matplotlib and sklearn metrics.

    Args:
    - y_true (np.ndarray): true labels
    - y_pred (np.ndarray): predicted labels

    Returns:
    - None
    """
    y_pred = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_true, axis=1)

    cm = confusion_matrix(y_true, y_pred)
    class_names = unique_labels(y_true, y_pred)

    # Plot raw confusion matrix
    disp = ConfusionMatrixDisplay(cm, display_labels=class_names)
    disp.plot(cmap=plt.cm.Blues)

    # Plot normalized confusion matrix
    disp = ConfusionMatrixDisplay.from_predictions(
        y_true, y_pred, normalize="true", cmap=plt.cm.Blues)

    # Calculate true positives, false positives, and false negatives
    tp = cm[0, 0]
    fp = cm[1, 0]
    fn = cm[0, 1]

    # Calculate precision, recall, and F1-score
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1_score = 2 * precision * recall / (precision + recall)

    print(classification_report(y_true, y_pred, target_names=['dent', 'glass shatter',
                                                               'lamp broken', 'scratch', 'tire flat']))


In [None]:
def plot_loss_acc(NN, train_losses, valid_losses, train_acc, valid_acc):
    fig, axs = plt.subplots(1, 2, figsize=(12, 6))

    fig.suptitle(f"Loss and Accuracy plots for {NN}")

    axs[0].plot(train_losses, label='Training loss')
    axs[0].plot(valid_losses, label='Validation loss')
    axs[0].set_xlabel("Epochs")
    axs[0].set_ylabel("Loss")
    axs[0].legend(frameon=False)

    axs[1].plot(train_acc, label='Training Accuracy')
    axs[1].plot(valid_acc, label='Validation Accuracy')
    axs[1].set_xlabel("Epochs")
    axs[1].set_ylabel("Accuracy")
    axs[1].legend(frameon=False)

    plt.savefig(f"{NN}_plot.png", bbox_inches='tight')
    plt.show()

### Download pretrained models

In [None]:
vgg11_bn = models.vgg11_bn(weights='DEFAULT')
vgg16_bn = models.vgg16_bn(weights='DEFAULT')
vgg19_bn = models.vgg19_bn(weights='DEFAULT')

alexnet = models.alexnet(weights='DEFAULT')

In [None]:
# Function to calculate in_features into VGG classifier and AdaptivePool output_size 

def calculate_out_features_and_adaptivepool(feature_extractor, input_size):
    in_features = torch.randn(1, 3, input_size, input_size)
    out_features = feature_extractor(in_features).view(1, -1).size(1)
    
    hxw = int(np.sqrt((out_features / 2) / 256))

    # for alexnet
    # hxw = int(np.sqrt(out_features / 256))
    
    return out_features, hxw


feature_extractor = models.vgg11(pretrained=True).features
input_size = 128 

out_features, hxw = calculate_out_features_and_adaptivepool(feature_extractor, input_size)

print("out_features:", out_features)
print("hxw:", hxw)

In [None]:
def run_experiment(model, model_name, output_size, in_features, architecture='VGG', num_classes=5, lr=1e-4, l2=False, epochs=10, device=device, optimizer='Adamax'):
    """
    A function to run the entire pipeline: model modification, training, evaluation, and plotting.
    """
    # Modify the model
    model.avgpool = nn.AdaptiveAvgPool2d(output_size=output_size)
    
    if architecture == 'VGG':
        new_classifier = nn.Sequential(
            nn.Linear(in_features, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, num_classes)
        )
    elif architecture == 'AlexNet':
        new_classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )
    else:
        raise ValueError(f"Unsupported architecture: {architecture}")

    model.classifier[0].in_features = in_features
    model.classifier = new_classifier
    
    count_parameters(model)
    
    # Prepare parameters and optimizers
    params = [
        {'params': model.features.parameters(), 'lr': lr / 10},
        {'params': model.classifier.parameters()}
    ]

    if l2:
        optimizers = {
            'AdamW': optim.AdamW(params, lr=lr, weight_decay=0.01),
            'Adamax': optim.Adamax(params, lr=lr, weight_decay=0.01),
            'SGD': optim.SGD(params, lr=lr, weight_decay=0.01)
        }
    else:
        optimizers = {
            'AdamW': optim.AdamW(params, lr=lr),
            'Adamax': optim.Adamax(params, lr=lr),
            'SGD': optim.SGD(params, lr=lr)
        }
    
    chosen_optimizer = optimizers.get(optimizer)
    if chosen_optimizer is None:
        raise ValueError(f"Unsupported optimizer: {optimizer}")
        
    # Train and evaluate the model
    train_accuracy, valid_accuracy, train_losses, valid_losses = train_eval(epochs, model, chosen_optimizer, None)
    
    # Plotting and evaluation
    plot_loss_acc(model_name, train_losses, valid_losses, train_accuracy, valid_accuracy)
    model.to(device)
    images, labels, probs = get_predictions(model, test_iter)
    images, labels = images.to(device), labels.to(device)
    plot_roc_curve(y_test, probs, val_dataset.classes, model_name)
    plot_confusion_matrix(y_test, probs)


### AlexNet

In [None]:
run_experiment(alexnet, 'AlexNet', output_size=(3, 3), in_features=2304, architecture='AlexNet')

### VGG11

In [None]:
run_experiment(vgg11_bn, 'VGG11_bn', output_size=(4, 4), in_features=8192)

### VGG16

In [None]:
run_experiment(vgg16_bn, 'VGG16_bn', output_size=(4, 4), in_features=8192)

### With L2 Regularisation

In [None]:
run_experiment(alexnet, 'AlexNet', output_size=(3, 3), in_features=2304, architecture='AlexNet', l2=True)

In [None]:
run_experiment(vgg11_bn, 'VGG11_bn', output_size=(4, 4), in_features=8192, l2=True)

In [None]:
run_experiment(vgg16_bn, 'VGG16_bn', output_size=(4, 4), in_features=8192, l2=True)

In [None]:
# # Predict Images

# predicted_labels = torch.argmax(probs, dim=1)

# images = images.cpu()
# labels = labels.cpu()
# random_indices = random.sample(range(len(images)), 10)
# # random_indices = list(range(len(images)))[:10]

# fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(12, 6))
# axes = axes.ravel()

# mean, std = [0.4992, 0.4839, 0.4827], [0.2260, 0.2268, 0.2264]

# for i, idx in enumerate(random_indices):
#     image = images[idx].cpu()  # Move image tensor to CPU

#     # Apply normalization
#     for channel in range(3):
#         image[channel] = (image[channel] * std[channel]) + mean[channel]

#     image = to_pil_image(image)  # Convert tensor to PIL format
#     axes[i].imshow(image)
#     axes[i].set_title(f"Predicted: {val_dataset.classes[predicted_labels[idx]]}\nTrue: {val_dataset.classes[labels[idx]]}")
#     axes[i].axis('off')

# plt.tight_layout()
# plt.show()