# Task 3
This serves as a template which will guide you through the implementation of this task. It is advised to first read the whole template and get a sense of the overall structure of the code before trying to fill in any of the TODO gaps.
This is the jupyter notebook version of the template. For the python file version, please refer to the file `template_solution.py`.

First, we import necessary libraries:

In [6]:
!pip install torch==2.2.2
!pip install torchvision==0.13.1

Collecting torchvision==0.13.1
  Downloading torchvision-0.13.1-cp310-cp310-win_amd64.whl (1.1 MB)
     ---------------------------------------- 1.1/1.1 MB 2.3 MB/s eta 0:00:00
Collecting torch==1.12.1
  Downloading torch-1.12.1-cp310-cp310-win_amd64.whl (162.2 MB)
     -------------------------------------- 162.2/162.2 MB 2.2 MB/s eta 0:00:00
Installing collected packages: torch, torchvision
  Attempting uninstall: torch
    Found existing installation: torch 2.2.2
    Uninstalling torch-2.2.2:
      Successfully uninstalled torch-2.2.2
Successfully installed torch-1.12.1 torchvision-0.13.1


In [29]:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
from torchvision import models, transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import torch.optim as optim
from torchvision.models import resnet18, ResNet18_Weights
# Add any other imports you need here

In [9]:
# The device is automatically set to GPU if available, otherwise CPU
# If you want to force the device to CPU, you can change the line to
# device = torch.device("cpu")
# When using the GPU, it is important that your model and all data are on the 
# same device.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [78]:
"""
Transform, resize and normalize the images and then use a pretrained model to extract 
the embeddings.
"""
# TODO: define a transform to pre-process the images
# The required pre-processing depends on the pre-trained model you choose 
# below. 
# See https://pytorch.org/vision/stable/models.html#using-the-pre-trained-models
train_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
#input_batch = train_dataset.unsqueeze(0)


# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't 
# run out of memory (VRAM if on GPU, RAM if on CPU)
batchsize=50
train_loader = DataLoader(dataset=train_dataset,
                          batch_size=batchsize,
                          shuffle=False,
                          pin_memory=True, num_workers=4)

# TODO: define a model for extraction of the embeddings (Hint: load a pretrained model,
# more info here: https://pytorch.org/vision/stable/models.html)


In [79]:
#model = nn.Module()
model=models.resnet50(weights=ResNet50_Weights.DEFAULT)
model.to(device)
model = torch.nn.Sequential(*list(model.children())[:-1])

#embedding_size = 1000 # Dummy variable, replace with the actual embedding size once you 
# pick your model

#determine embedding size
x=train_dataset[0][0].unsqueeze(0)
with torch.no_grad():
    embedding_size = model(x).squeeze().numpy().shape[0]
    print(embedding_size)
num_images = len(train_dataset)
embeddings = np.zeros((num_images, embedding_size))
# TODO: Use the model to extract the embeddings. Hint: remove the last layers of the 
# model to access the embeddings the model generates. 

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to C:\Users\excelsior/.cache\torch\hub\checkpoints\resnet50-11ad3fa6.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

2048


In [80]:
with torch.no_grad():
      model.eval()
      model.to(device)
      for batch_index, (X,_) in enumerate(tqdm(train_loader)):
        X = X.to(device)
        yp = model(X)
        yp = yp.squeeze().cpu().numpy()
        #start = batch_id*train_loader.batch_size
        #end = (batch_id + 1)*train_loader.batch_size
        #embeddings = [start:end, :] = yp
        embeddings[batch_index*train_loader.batch_size:(batch_index+1)*train_loader.batch_size,:] = yp
np.save('dataset/embeddings.npy', embeddings)

100%|██████████| 200/200 [10:11<00:00,  3.06s/it]


In [81]:
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/",
                                         transform=None)
    filenames = [s[0].split('\\')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load('dataset/embeddings.npy')
    

    # TODO: Normalize the embeddings
    embeddings= (embeddings-np.mean(embeddings))/np.std(embeddings)
    print(embeddings.shape)
    
    file_to_embedding = {} 
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    y = []
    
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]]))
        y.append(1)
        # Generating negative samples (data augmentation)
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append(0)
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y

Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory (VRAM if on GPU, RAM if on CPU)

In [82]:
def create_loader_from_np(X, y = None, train = True, batch_size=32, shuffle=True, num_workers = 4):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels
    
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        # Attention: If you get type errors you can modify the type of the
        # labels here
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), 
                                torch.from_numpy(y).type(torch.long))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader

TODO: define a model. Here, the basic structure is defined, but you need to fill in the details

In [88]:
class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """
    def __init__(self):
        """
        The constructor of the model.
        """
        super().__init__()
        self.fc1 = nn.Linear(embedding_size*3, 2048)
        self.fc2=nn.Linear(4096,2048)
        self.fc=nn.Linear(2048,4096)
        self.fc4=nn.Linear(2048,1)
        self.dropout = nn.Dropout(0.2)
        
    def forward(self, x):
        """
        The forward pass of the model.

        input: x: torch.Tensor, the input to the model

        output: x: torch.Tensor, the output of the model
        """
        x = self.fc1(x)
        x = torch.sigmoid(x)
        x = self.dropout(x)
        #x = self.fc(x)
        #x=self.dropout(x)
        #x=self.fc2(x)
        #x = F.sigmoid(x)
        x=self.fc4(x)
        x = torch.sigmoid(x)
        return x

In [84]:
TRAIN_TRIPLETS = 'train_triplets.txt'

# load the training data
X, y = get_data(TRAIN_TRIPLETS)
# Create data loaders for the training data
train_loader = create_loader_from_np(X, y, train = True, batch_size=32)
# delete the loaded training data to save memory, as the data loader copies
del X
del y


(10000, 2048)


In [85]:
TEST_TRIPLETS = 'test_triplets.txt'

# repeat for testing data
X_test, y_test = get_data(TEST_TRIPLETS, train=False)
test_loader = create_loader_from_np(X_test, train = False, batch_size=2048, shuffle=False)
del X_test
del y_test

(10000, 2048)


In [91]:
"""
The training procedure of the model; it accepts the training data, defines the model 
and then trains it.

input: train_loader: torch.data.util.DataLoader, the object containing the training data
    
compute: model: torch.nn.Module, the trained model
"""
model = Net()
model.train()
model.to(device)
n_epochs = 20

criterion = nn.BCELoss()
optim = torch.optim.SGD(model.parameters(), lr=0.001,momentum=0.9)

# TODO: define a loss function, optimizer and proceed with training. Hint: use the part 
# of the training data as a validation split. After each epoch, compute the loss on the 
# validation split and print it out. This enables you to see how your model is performing 
# on the validation data before submitting the results on the server. After choosing the 
# best model, train it on the whole training data.
for epoch in range(n_epochs):   
    print(f'Current epoche:{epoch}')
    run_loss = 0
    for i, [X, y] in enumerate(train_loader):
        X = X.to(device)
        y = y.to(device)
            
        optim.zero_grad()
        y_pred=model(X).squeeze().float()
        #y_pred=model(X).squeeze()
       
        #loss=criterion(y_pred,y.double())
        loss=criterion(y_pred,y.float())
        loss.backward()
        optim.step()
        
        run_loss = run_loss + loss.item()
        if i % 1000 == 999:    # print every 1000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {run_loss / 1000:.3f}')
            run_loss = 0


Current epoche:0
[1,  1000] loss: 0.693
[1,  2000] loss: 0.688
[1,  3000] loss: 0.684
Current epoche:1
[2,  1000] loss: 0.663
[2,  2000] loss: 0.630
[2,  3000] loss: 0.599
Current epoche:2
[3,  1000] loss: 0.567
[3,  2000] loss: 0.555
[3,  3000] loss: 0.556
Current epoche:3
[4,  1000] loss: 0.542
[4,  2000] loss: 0.534
[4,  3000] loss: 0.529
Current epoche:4
[5,  1000] loss: 0.515
[5,  2000] loss: 0.518
[5,  3000] loss: 0.514
Current epoche:5
[6,  1000] loss: 0.500
[6,  2000] loss: 0.494
[6,  3000] loss: 0.502
Current epoche:6
[7,  1000] loss: 0.484
[7,  2000] loss: 0.482
[7,  3000] loss: 0.486
Current epoche:7
[8,  1000] loss: 0.466
[8,  2000] loss: 0.473
[8,  3000] loss: 0.472
Current epoche:8
[9,  1000] loss: 0.453
[9,  2000] loss: 0.456
[9,  3000] loss: 0.452
Current epoche:9
[10,  1000] loss: 0.438
[10,  2000] loss: 0.437
[10,  3000] loss: 0.441
Current epoche:10
[11,  1000] loss: 0.423
[11,  2000] loss: 0.431
[11,  3000] loss: 0.422
Current epoche:11
[12,  1000] loss: 0.415
[12, 

In [92]:
"""
The testing procedure of the model; it accepts the testing data and the trained model and 
then tests the model on it.

input: model: torch.nn.Module, the trained model
       loader: torch.data.util.DataLoader, the object containing the testing data
        
compute: None, the function saves the predictions to a results.txt file
"""
model.eval()
predictions = []
# Iterate over the test data
with torch.no_grad(): # We don't need to compute gradients for testing
    for [x_batch] in test_loader:
        x_batch= x_batch.to(device)
        predicted = model(x_batch)
        predicted = predicted.cpu().numpy()
        # Rounding the predictions to 0 or 1
        predicted[predicted >= 0.5] = 1
        predicted[predicted < 0.5] = 0
        predictions.append(predicted)
    predictions = np.vstack(predictions)
np.savetxt("results.txt", predictions, fmt='%i')
print("Results saved to results.txt")

Results saved to results.txt
