# Convolutional Neural Network

### Libraries

In [None]:
# import libraries

import os
import cv2
import torch

import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F

from google.colab import drive
from xml.etree import ElementTree as ET
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms
from torch import nn
from tqdm import tqdm
from sklearn.metrics import accuracy_score

### Data preparation

In [None]:
# mount drive on colab notebook

drive.mount('/content/drive')

In [None]:
# unzip data files

!unzip "/content/drive/MyDrive/02 - tagged1.zip" -d "/content"

In [None]:
# major variables

photos_dir = '/content/photos'
renders_dir = '/content/renders'

In [None]:
def parse_xml(xml_file):
    '''
    Read the xml file and return the bounding box coordinates
    '''
    tree = ET.parse(xml_file)
    root = tree.getroot()
    bounding_boxes = []
    for obj in root.findall('object'):
        bbox = obj.find('bndbox')
        xmin = int(bbox.find('xmin').text)
        ymin = int(bbox.find('ymin').text)
        xmax = int(bbox.find('xmax').text)
        ymax = int(bbox.find('ymax').text)
        bounding_boxes.append([xmin, ymin, xmax, ymax])
    return bounding_boxes

In [None]:
def load_data(data_dir):
    '''
    Returns a list of images and labels for each image
    '''
    image_paths = []
    num_legos = []
    for subdir, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith('.jpg'):
                n = int(subdir.split(os.sep)[-1])
                image_paths.append(os.path.join(subdir, file))
                num_legos.append(n)
    combined = list(zip(image_paths, num_legos))
    combined.sort()
    image_paths, num_legos = zip(*combined)
    image_paths = np.asarray(image_paths)
    num_legos = torch.Tensor(num_legos).to(torch.int64)
    return image_paths, num_legos

In [None]:
# load data

image_paths, num_legos = load_data(photos_dir)

In [None]:
# work with defined train test split

train_test_split = np.genfromtxt('/content/drive/MyDrive/train_test_split.csv', delimiter=',', dtype=None, encoding=None)

train_test_ids = {
    'train': [],
    'test': []
}
for index, row in enumerate(train_test_split):
    if row[1] == '1':
      train_test_ids['test'].append(index - 1)
    elif row[1] == '0':
      train_test_ids['train'].append(index - 1)

len(train_test_ids['train']), len(train_test_ids['test'])

In [None]:
# validation set

indices = train_test_ids['test']
np.random.shuffle(indices, )

test_size = 0.4 * len(indices)
split = int(np.floor(test_size))
train_test_ids['valid'], train_test_ids['test'] = indices[split:], indices[:split]

len(train_test_ids['train']), len(train_test_ids['valid']), len(train_test_ids['test'])

In [None]:
# class distribution in training data

num_legos_train = num_legos[train_test_ids['train']]
plt.hist(num_legos_train, bins=range(1, max(num_legos_train)), align='left', rwidth=0.8)
plt.xlabel('Number of Legos')
plt.ylabel('Frequency')
plt.title('Number of Legos Distribution')
plt.show()

In [None]:
# undersampling of larger class in training data

indices = []
for i in train_test_ids['train']:
    if num_legos[i] == 1:
        indices.append(i)
np.random.shuffle(indices, )
leftovers_size = 0.8 * len(indices)
split = int(np.floor(leftovers_size))
_, leftovers = indices[split:], indices[:split]
for i in leftovers:
    train_test_ids['train'].remove(i)

num_legos_train = num_legos[train_test_ids['train']]
plt.hist(num_legos_train, bins=range(1, max(num_legos_train)), align='left', rwidth=0.8)
plt.xlabel('Number of Legos')
plt.ylabel('Frequency')
plt.title('Number of Legos Distribution')
plt.show()

In [None]:
class LegosDataset(Dataset):
    '''
    Dataset class for the legos dataset
    '''
    def __init__(self, images_filenames, num_legos, transform=None):
        self.images_filenames = images_filenames
        self.transform = transform
        self.labels = num_legos

    def __len__(self):
        return len(self.images_filenames)

    def __getitem__(self, idx):
        image_filename = self.images_filenames[idx]
        label = self.labels[idx]
        image = cv2.imread(image_filename)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform is not None:
            image = self.transform(image)
        return image, label

In [None]:
# train, valid and test datasets

batch_size = 32
num_workers = 2

transform = transforms.Compose(
    [
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]
)

train_dataset = LegosDataset(image_paths[train_test_ids['train']], num_legos[train_test_ids['train']], transform=transform)
valid_dataset = LegosDataset(image_paths[train_test_ids['valid']], num_legos[train_test_ids['valid']], transform=transform)
test_dataset = LegosDataset(image_paths[train_test_ids['test']], num_legos[train_test_ids['test']], transform=transform)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, num_workers=num_workers, shuffle=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, num_workers=num_workers, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, num_workers=num_workers, shuffle=False)

### Model definition

In [None]:
# get cpu or gpu device for training

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
# TODO: change definition

class ConvolutionalNeuralNetwork(nn.Module):
    '''
    CNN for a regression task
    '''
    def __init__(self):
        super(ConvolutionalNeuralNetwork, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.Linear(64 * 28 * 28, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        logits = self.layers(x)
        return logits

In [None]:
# put model in device

model = ConvolutionalNeuralNetwork().to(device)
print(model)

### Model training

In [None]:
def epoch_iter(dataloader, model, loss_fn, optimizer=None, is_train=True):
    '''
    Function for one epoch iteration
    '''
    if is_train:
        assert optimizer is not None, "When training, please provide an optimizer"
    num_batches = len(dataloader)
    if is_train:
        model.train()
    else:
        model.eval()
    total_loss = 0.0
    preds = []
    labels = []
    with torch.set_grad_enabled(is_train):
        for batch, (X, y) in enumerate(tqdm(dataloader)):
            X, y = X.float().to(device), y.float().to(device)
            pred = model(X).squeeze()
            loss = loss_fn(pred, y)
            if is_train:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            total_loss += loss.item()
            preds.extend(pred.view(-1).cpu().detach().numpy())
            labels.extend(y.view(-1).cpu().numpy())
    return total_loss / num_batches, np.mean((np.array(labels) - np.array(preds))**2)

In [None]:
def train(model, model_name, num_epochs, train_dataloader, validation_dataloader, loss_fn, optimizer):
    '''
    Function for training the model
    '''
    train_history = {'loss': [], 'accuracy': []}
    val_history = {'loss': [], 'accuracy': []}
    best_val_loss = np.inf
    print("Start training...")

    for t in range(num_epochs):
        print(f"Epoch {t+1}/{num_epochs}")
        train_loss, train_acc = epoch_iter(train_dataloader, model, loss_fn, optimizer)
        print(f"Train loss: {train_loss:.3f}, Train accuracy: {train_acc:.3f}")
        val_loss, val_acc = epoch_iter(validation_dataloader, model, loss_fn, is_train=False)
        print(f"Validation loss: {val_loss:.3f}, Validation accuracy: {val_acc:.3f}")

        # save model when val loss improves
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            save_dict = {
                'model': model.state_dict(),
                'optimizer': optimizer.state_dict(),
                'epoch': t
            }
            torch.save(save_dict, model_name + '_best_model.pth')

        # save latest model
        save_dict = {
            'model': model.state_dict(),
            'optimizer': optimizer.state_dict(),
            'epoch': t
        }
        torch.save(save_dict, model_name + '_latest_model.pth')

        # save training history
        train_history['loss'].append(train_loss)
        train_history['accuracy'].append(train_acc)
        val_history['loss'].append(val_loss)
        val_history['accuracy'].append(val_acc)

    print("Finished")
    return train_history, val_history

In [None]:
# loss function

loss_fn = nn.MSELoss()

In [None]:
# learning rate

optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # TODO: change optimizer (regression task)

In [None]:
# train model

num_epochs = 3 # TODO: change number of epochs to 50 or so (low value is for testing)

train_history, val_history = train(model, 'lego_counter', num_epochs, train_dataloader, valid_dataloader, loss_fn, optimizer)

### Training evolution analysis

In [None]:
def plotTrainingHistory(train_history, val_history):
    '''
    Plot the training history of the model
    '''
    plt.subplot(2, 1, 1)
    plt.title('Cross Entropy Loss')
    plt.plot(train_history['loss'], label='train')
    plt.plot(val_history['loss'], label='val')
    plt.legend(loc='best')

    plt.subplot(2, 1, 2)
    plt.title('Classification Accuracy')
    plt.plot(train_history['accuracy'], label='train')
    plt.plot(val_history['accuracy'], label='val')

    plt.tight_layout()
    plt.legend(loc='best')
    plt.show()

In [None]:
# visualize training history

plotTrainingHistory(train_history, val_history)

### Model testing

In [None]:
# load best model

model = ConvolutionalNeuralNetwork().to(device)
checkpoint = torch.load('lego_counter_best_model.pth')
model.load_state_dict(checkpoint['model'])

In [None]:
# evaluate model on test data

test_loss, test_acc = epoch_iter(test_dataloader, model, loss_fn, is_train=False)
print(f"Test loss: {test_loss:.3f}, Test accuracy: {test_acc:.3f}")

In [None]:
def show_predictions(model, dataloader):
    '''
    Display images along with their true and predicted labels
    '''
    model.eval() 
    all_preds = []
    all_labels = []
    all_images = []
    with torch.no_grad(): 
        for images, labels in dataloader:  
            images, labels = images.to(device), labels.to(device)
            preds = model(images) 
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_images.extend(images.cpu().numpy())
    for i in range(len(all_images)):
        plt.imshow(all_images[i].transpose((1, 2, 0)))
        plt.title(f'True label: {all_labels[i]}, Predicted label: {all_preds[i]}')
        plt.show()

In [None]:
# view predictions

show_predictions(model, test_dataloader)