# Task 3
This serves as a template which will guide you through the implementation of this task. It is advised to first read the whole template and get a sense of the overall structure of the code before trying to fill in any of the TODO gaps.
This is the jupyter notebook version of the template. For the python file version, please refer to the file `template_solution.py`.

First, we import necessary libraries:

In [1]:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
import torchvision
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet50, ResNet50_Weights, inception_v3, Inception_V3_Weights
import torch.optim as optim

from sklearn.model_selection import train_test_split
# Add any other imports you need here

In [2]:
# The device is automatically set to GPU if available, otherwise CPU
# If you want to force the device to CPU, you can change the line to
# device = torch.device("cpu")
# When using the GPU, it is important that your model and all data are on the 
# same device.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# List available models
all_models = torchvision.models.list_models()
classification_models = torchvision.models.list_models(module=torchvision.models)

cpu


In [4]:
"""
Transform, resize and normalize the images and then use a pretrained model to extract 
the embeddings.
"""

# TODO: define a model for extraction of the embeddings (Hint: load a pretrained model,
# more info here: https://pytorch.org/vision/stable/models.html)


# Resnet50 - 0.62-0.63
# weights = ResNet50_Weights.IMAGENET1K_V1
# model = resnet50(weights=weights)
# model_reduced = torch.nn.Sequential(*list(model.children())[:-1])
# model_reduced.to(device)
# embedding_size = 2048 # Dummy variable, replace with the actual embedding size once you 
# train_transforms = weights.transforms()


# Inception V3
weights=Inception_V3_Weights.IMAGENET1K_V1
model = inception_v3(weights = weights)  # Set aux_logits=False to disable auxiliary classifier
model.fc = nn.Identity()
model_reduced = model
embedding_size = 2048 # Dummy variable, replace with the actual embedding size once you 
model_reduced.to(device)
train_transforms = weights.transforms()


# TODO: define a transform to pre-process the images
# The required pre-processing depends on the pre-trained model you choose 
# below. 
# See https://pytorch.org/vision/stable/models.html#using-the-pre-trained-models

train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)

num_images = len(train_dataset)
embeddings = np.zeros((num_images, embedding_size))

# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't 
# run out of memory (VRAM if on GPU, RAM if on CPU)
train_loader = DataLoader(dataset=train_dataset,
                          batch_size=64,
                          shuffle=False,
                          pin_memory=True, num_workers=12)

model_reduced.eval()
# print(model_reduced)
# Use the model to extract the embeddings
start = 0
with torch.no_grad():  # Disable gradient calculation
    for images, _ in train_loader:
        # Move images to the device
        images = images.to(device)
        output = model_reduced(images)
        output = np.squeeze(output.cpu().numpy())
        end = start + len(images)
        embeddings[start:end] = output
        start = end

np.save('dataset/embeddings.npy', embeddings)

In [5]:
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/",
                                         transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load('dataset/embeddings.npy')
    # TODO: Normalize the embeddings
    embeddings /= np.linalg.norm(embeddings, axis=1, keepdims=True)
    
    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    y = []
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]]))
        y.append(1)
        # Generating negative samples (data augmentation)
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append(0)
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y

Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory (VRAM if on GPU, RAM if on CPU)

In [6]:
def create_loader_from_np(X, y = None, train = True, batch_size=128, shuffle=True, num_workers = 16):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels
    
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        # Attention: If you get type errors you can modify the type of the
        # labels here
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), 
                                torch.from_numpy(y).type(torch.long))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers) 
    return loader

TODO: define a model. Here, the basic structure is defined, but you need to fill in the details

In [7]:
class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """
 
    def __init__(self):
        super().__init__()  
        self.fc1 = nn.Linear(6144, 256)   
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)   
        self.fc4 = nn.Linear(64, 1)   

        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(64)


    def forward(self, x):
        # print(x.shape)
        x = self.fc1(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.bn1(x)
        x = F.relu(x)

        x = self.fc2(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.bn2(x)
        x = F.relu(x)

        x = self.fc3(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.bn3(x)
        x = F.relu(x)  

        x = self.fc4(x)

        return x

In [8]:
TRAIN_TRIPLETS = 'train_triplets.txt'

# load the training data
X, y = get_data(TRAIN_TRIPLETS)
# Create data loaders for the training data

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)


train_loader_train = create_loader_from_np(X_train, y_train, train = True, batch_size=64)
train_loader_valid = create_loader_from_np(X_val, y_val, train = True, batch_size=64)
# delete the loaded training data to save memory, as the data loader copies
del X
del y

KeyError: '02461'

In [None]:
TEST_TRIPLETS = 'test_triplets.txt'

# repeat for testing data
X_test, y_test = get_data(TEST_TRIPLETS, train=False)
test_loader = create_loader_from_np(X_test, train = False, batch_size=64, shuffle=False)
del X_test
del y_test

In [None]:
"""
The training procedure of the model; it accepts the training data, defines the model 
and then trains it.

input: train_loader: torch.data.util.DataLoader, the object containing the training data
    
compute: model: torch.nn.Module, the trained model
"""
# Define loss function and optimizer


model = Net()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

n_epochs = 5
# TODO: define a loss function, optimizer and proceed with training. Hint: use the part 
# of the training data as a validation split. After each epoch, compute the loss on the 
# validation split and print it out. This enables you to see how your model is performing 
# on the validation data before submitting the results on the server. After choosing the 
# best model, train it on the whole training data.
for epoch in range(n_epochs):   
    running_loss = 0.0 
    model.train()    
    for [X, y] in train_loader_train:
        
        # print(y.shape)
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        # X = X.squeeze(1)
        outputs = model(X)
        # print(outputs.shape)
        # print(outputs)
        # print(y.shape)
        loss = criterion(outputs.flatten(), y.to(torch.float))   
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * X.size(0)
    
    epoch_loss = running_loss / len(train_loader_train.dataset)
    # print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {epoch_loss:.4f}")


    # Validation loop
    model.eval()  
    val_running_loss = 0.0

    correct = 0
    total = 0
    with torch.no_grad():
        for X_val, y_val in train_loader_valid:
            X_val, y_val = X_val.to(device), y_val.to(device)
            val_outputs = model(X_val)
            val_loss = criterion(val_outputs.flatten(), y_val.to(torch.float))
            val_running_loss += val_loss.item() * X_val.size(0)
            
            val_outputs[val_outputs >= 0.5] = 1
            val_outputs[val_outputs < 0.5] = 0  
            val_outputs = val_outputs.view(-1)  
          
            total += y_val.size(0)
            # print(total)
            # print((val_outputs == y_val).shape)
            correct += (val_outputs == y_val).sum().item()
            # print(correct)
            # print("next")


    accuracy = correct / total
    val_epoch_loss = val_running_loss / len(train_loader_valid.dataset)
    # print(f"Validation Loss: {val_epoch_loss:.4f}")
    print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {epoch_loss:.4f}, Validation Loss : {val_epoch_loss:.4f}, Accuracy {accuracy:.2%} ")


In [None]:
"""
The testing procedure of the model; it accepts the testing data and the trained model and 
then tests the model on it.

input: model: torch.nn.Module, the trained model
       loader: torch.data.util.DataLoader, the object containing the testing data
        
compute: None, the function saves the predictions to a results.txt file
"""
model.eval()
predictions = []
# Iterate over the test data
with torch.no_grad(): # We don't need to compute gradients for testing
    for [x_batch] in test_loader:
        x_batch= x_batch.to(device)
        # x_batch = x_batch.unsqueeze(1)

        predicted = model(x_batch)
        predicted = predicted.cpu().numpy()
        # Rounding the predictions to 0 or 1
        predicted[predicted >= 0.5] = 1
        predicted[predicted < 0.5] = 0
        predictions.append(predicted)
    predictions = np.vstack(predictions)
np.savetxt("results.txt", predictions, fmt='%i')
print("Results saved to results.txt")