In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
import torch.optim as optim
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import random_split
from collections import Counter
import seaborn as sns
from sklearn.metrics import confusion_matrix
from torchvision.utils import draw_bounding_boxes
from torchvision.ops import box_convert, box_iou

In [None]:
SEED = 265
torch.manual_seed(SEED)
torch.set_default_dtype(torch.double)
device = (torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'))

## 2 Object Localization
#### First we load and inspect the localization_*** datasets

In [None]:
train_data = torch.load('data/localization_train.pt')
val_data = torch.load('data/localization_val.pt')
test_data = torch.load('data/localization_test.pt')

In [None]:
print(f'Train data size: {len(train_data)}')
print(f'Val data size: {len(val_data)}')
print(f'Test data size: {len(test_data)}')

In [None]:
first_img, first_label = train_data[0]

print(f'Shape of first image: {first_img.shape}')
print(f'Type of first image: {type(first_img)}')

print(f'\nShape of first label: {first_label.shape}')
print(f'Type of first label: {type(first_label)})')
first_label

In [None]:
for img, label in train_data:
    if label[0] == 0:
        print(label)

In [None]:
def count_instances(data, data_name=None) -> None:
    """Counts the number of instances of each class in a dataset"""
    counter = Counter([int(label[-1]) for _, label in data])
    sorted_counter = dict(sorted(counter.items()))
    if data_name is not None:
        print(f'Class distribution in {data_name}')
    for key, value in sorted_counter.items():
        print(f'{key}: {value}')

count_instances(train_data, 'Training Data')
count_instances(val_data, 'Validation Data')
count_instances(test_data, 'Test Data')

#### Plotting one image from each class

In [None]:
fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(8,3))

for i, ax in enumerate(axes.flat): 
    img, bbox = next((img, label[1:5]) for img, label in train_data if int(label[-1]) == i)
    img_height, img_width = train_data[0][0].shape[-2], train_data[0][0].shape[-1]

    img = (img * 255).byte()

    bbox[0] *= img_width
    bbox[1] *= img_height
    bbox[2] *= img_width
    bbox[3] *= img_height

    bbox = bbox.type(torch.uint8)

    converted_bbox = box_convert(bbox, in_fmt='cxcywh', out_fmt='xyxy')

    img_with_bbox = draw_bounding_boxes(img, converted_bbox.unsqueeze(0), colors='red')
    img_with_bbox  = img_with_bbox.numpy().transpose((1, 2, 0))
    ax.imshow(img_with_bbox, cmap='gray')
    ax.set_title(i)
    ax.axis('off')

In [None]:
def plot_class(data:torch.tensor, class_label:int, start_idx:int=0) -> None:
    """Plots a subplot with 10 images from a given class, starting at a chosen index"""
    class_images = [img for img, label in data if int(label[-1]) == class_label]
    bboxes = [label[1:5] for img, label in data if int(label[-1]) == class_label]
    _, axes = plt.subplots(nrows=2, ncols=5, figsize=(8,3))

    for i, ax in enumerate(axes.flat):

        idx = start_idx + i
        img = class_images[idx]
        bbox = bboxes[idx]

        img_height, img_width = train_data[0][0].shape[-2], train_data[0][0].shape[-1]

        img = (img * 255).byte()

        bbox[0] *= img_width
        bbox[1] *= img_height
        bbox[2] *= img_width
        bbox[3] *= img_height

        bbox = bbox.type(torch.uint8)

        converted_bbox = box_convert(bbox, in_fmt='cxcywh', out_fmt='xyxy')

        img_with_bbox = draw_bounding_boxes(img, converted_bbox.unsqueeze(0), colors='red')
        img_with_bbox  = img_with_bbox.numpy().transpose((1, 2, 0))
        ax.imshow(img_with_bbox, cmap='gray')
        plt.suptitle(f'CLASS {class_label} - Image {start_idx} to {idx}')
        ax.axis('off')

    plt.show()

plot_class(train_data, 3, 10)

#### Defining a normalizer and a preprocessor TBD

In [None]:
imgs = torch.stack([img for img, _ in train_data])

# Define normalizer
normalizer_pipe = transforms.Normalize(
    imgs.mean(dim=(0, 2, 3)), 
    imgs.std(dim=(0, 2, 3))
    )

# Define preprocessor including the normalizer
preprocessor = transforms.Compose([
            normalizer_pipe
        ])

In [None]:
train_data = torch.load('data/localization_train.pt')
val_data = torch.load('data/localization_val.pt')
test_data = torch.load('data/localization_test.pt')

In [None]:
train_data = [(preprocessor(img), label) for img, label in train_data]
val_data = [(preprocessor(img), label) for img, label in val_data]
test_data = [(preprocessor(img), label) for img, label in test_data]

#### Defining the loss function

In [None]:
class LossFn(nn.Module):
    """Custom loss function"""
    def __init__(self):
        super().__init__()
        self.L_a = nn.BCEWithLogitsLoss()  # detection loss
        self.L_b = nn.MSELoss()  # localization loss
        self.L_c = nn.CrossEntropyLoss()  # classification loss

    def forward(self, y_pred, y_true):

        det_pred = y_pred[:, 0]
        bbox_pred = y_pred[:, 1:5]
        class_pred = y_pred[:, 5:]

        det_true = y_true[:, 0]
        bbox_true = y_true[:, 1:5]
        class_true = y_true[:, -1].long()

        L_a = self.L_a(det_pred, det_true)

        object_detected = det_true == 1

        L_b = self.L_b(bbox_pred[object_detected], bbox_true[object_detected])
        L_c = self.L_c(class_pred[object_detected], class_true[object_detected])

        return L_a + L_b + L_c

#### Function to compute size of fully connected layer

In [None]:
def get_output_size(input_size, layer):
    H_in = input_size[0]
    W_in = input_size[1]
    C_in = layer.in_channels
    C_out = layer.out_channels
    kernel_size = layer.kernel_size
    padding = layer.padding
    stride = layer.stride

    H_out = (H_in+2*padding[0]-kernel_size[0])/stride[0]
    W_out = (W_in+2*padding[1]-kernel_size[1])/stride[1]

    return H_out * W_out * C_out

In [None]:
conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1, device=device, dtype=torch.double)
conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1, device=device, dtype=torch.double)
conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1, device=device, dtype=torch.double)

In [None]:
def output_pipe(layers, input_size, num_outputs):
    output_size = input_size
    for layer in layers:
        output_size = get_output_size(output_size, layer)
        
    

#### Defining models

In [None]:
class MyCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1, device=device, dtype=torch.double)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1, device=device, dtype=torch.double)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1, device=device, dtype=torch.double)
        self.fc1 = nn.Linear(12*15*64, 15, device=device, dtype=torch.double)
        self.flatten = nn.Flatten()

    def forward(self, x):
        out = F.relu(self.conv1(x))
        out = F.max_pool2d(out, 2, stride=2)
        out = F.relu(self.conv2(out))
        out = F.max_pool2d(out, 2, stride=2)
        out = F.relu(self.conv3(out))
        out = self.flatten(out)
        out = self.fc1(out)
        return out

In [None]:
def compute_performance(model, loader):
    '''
    Function that uses a model to predict and calculate accuracy
    '''
    model.eval()
    correct = 0
    total = 0
    iou_sum = 0

    with torch.inference_mode():
        for imgs, labels in loader:
            imgs = imgs.to(device=device, dtype=torch.double)
            labels = labels.to(device=device, dtype=torch.double)

            outputs = model(imgs)

            det_pred = F.sigmoid(outputs[:, 0])
            object_detected = det_pred > 0.5

            label_pred = outputs[:, 5:]
            _, class_pred = torch.max(label_pred, dim=1)

            det_true = labels[:, 0].int()
            class_true = labels[:, -1].int()

            total += labels.shape[0]
            correct += ((object_detected == 0) & (det_true == 0)).sum()
            correct += ((object_detected == 1) & (det_true == 1) & (class_pred == class_true)).sum()

            bbox_pred = outputs[:, 1:5]
            bbox_true = labels[:, 1:5]

            iou_sum += box_iou(bbox_pred[object_detected], bbox_true[object_detected]).sum()
            print(iou_sum)

    acc =  correct / total
    iou = iou_sum / total

    performance = (acc + iou) / 2
    
    return acc, iou, performance

In [None]:
# model 2

In [None]:
# model 3

In [None]:
# model 4

In [None]:
# model 5

In [None]:
def plot_loss(train_loss:list, val_loss:list, title:str) -> None:
    """Plots the training and validation loss"""
    _, ax = plt.subplots()
    ax.plot(np.arange(1,len(train_loss)+1), train_loss, label='Training loss')
    ax.plot(np.arange(1,len(val_loss)+1), val_loss, label='Validation loss')
    ax.set_title(title)
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.legend()

In [None]:
def train(n_epochs, optimizer, model, loss_fn, train_loader, val_loader):
    
    n_batch_train = len(train_loader)
    n_batch_val = len(val_loader)

    losses_train = []
    losses_val = []

    optimizer.zero_grad(set_to_none=True)
    
    for epoch in range(1, n_epochs + 1):
        
        loss_train = 0
        loss_val = 0

        model.train()

        for imgs, labels in train_loader:

            imgs = imgs.to(device=device, dtype=torch.double)
            labels = labels.to(device=device, dtype=torch.double)

            outputs = model(imgs)
            
            loss = loss_fn(outputs, labels)
            loss.backward()
            
            optimizer.step()
            optimizer.zero_grad()

            loss_train += loss.item()
            
        model.eval()

        with torch.inference_mode(): # <-- Equivalent to no_grad, if no error is provided this is preferred.
            for imgs, labels in val_loader:

                imgs = imgs.to(device=device, dtype=torch.double)
                labels = labels.to(device=device, dtype=torch.double)

                outputs = model(imgs)

                loss = loss_fn(outputs, labels)
                loss_val += loss.item()
            
        losses_train.append(loss_train / n_batch_train)
        losses_val.append(loss_val / n_batch_val)

        #if epoch == 1 or epoch % 10 == 0:
        print('{}  |  Epoch {}  |  Training loss {:.3f}'.format(datetime.now().strftime('%H:%M:%S'), epoch, loss_train / n_batch_train))
        print('{}  |  Epoch {}  |  Validation loss {:.3f}'.format(datetime.now().strftime('%H:%M:%S'), epoch, loss_val / n_batch_val))

    train_acc, train_iou, train_performance = compute_performance(model, train_loader)
    val_acc, val_iou, val_performance = compute_performance(model, val_loader)
    print(f'Training performance: Accuracy = {train_acc}, IOU = {train_iou}, Overall = {train_performance}')
    print(f'Training performance: Accuracy = {val_acc}, IOU = {val_iou}, Overall = {val_performance}')

    return losses_train, losses_val, train_performance, val_performance

In [None]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size=512, shuffle=False)
val_loader = torch.utils.data.DataLoader(val_data, batch_size=512, shuffle=False)

torch.manual_seed(SEED)
model = MyCNN()
model.to(device=device)

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0, weight_decay=0)
loss_fn = LossFn()

In [None]:
loss_train, loss_val, train_perform, val_perform = train(
    n_epochs=5,
    optimizer=optimizer,
    model=model,
    loss_fn=loss_fn,
    train_loader=train_loader,
    val_loader=val_loader
)

plot_loss(loss_train, loss_val, 'Model')

In [None]:
acc, iou, performance = compute_performance(model, train_loader)

In [None]:
iou

# 3 Object Detection