In [28]:
import torchvision
from numpy import random
from sklearn.model_selection import train_test_split
from torch.utils.data.sampler import SubsetRandomSampler
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
import os
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import matplotlib.pyplot as plt
from torchvision import models
from torchvision.models import resnet50, ResNet50_Weights
from torch.nn.modules.loss import BCEWithLogitsLoss

In [29]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### Tensor transformation of training dataset and Embeddings generation function with pretrained model

In [30]:
def generate_embeddings():
    """
    Transform, resize and normalize the images and then use a pretrained model to extract 
    the embeddings.
    """
    # TODO: define a transform to pre-process the images
    
    train_transforms = transforms.Compose([transforms.ToTensor(),
                                           transforms.Resize(256),
                                           transforms.CenterCrop(224),
                                           transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

    train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms) 
    
    # Hint: adjust batch_size and num_workers to your PC configuration, so that you don't 
    # run out of memory
    train_loader = DataLoader(dataset=train_dataset,
                              batch_size=1,
                              shuffle=False,
                              pin_memory=True, num_workers=8)

    # TODO: define a model for extraction of the embeddings (Hint: load a pretrained model,
    #  more info here: https://pytorch.org/vision/stable/models.html)
    model = resnet50(weights=ResNet50_Weights.DEFAULT)
    embeddings = []
    embedding_size = model.fc.in_features 
    num_images = len(train_dataset)
    embeddings = np.zeros((num_images, embedding_size))
    # TODO: Use the model to extract the embeddings. Hint: remove the last layers of the 
    # model to access the embeddings the model generates. 

    # Removal of last layer of pretrained model
    newmodel = torch.nn.Sequential(*(list(model.children())[:-1]))
    print(newmodel)

    #embeddings creation 
    counter = 0
    for idx, (data, target) in enumerate(train_loader):
        print(counter)
        counter = counter + 1
        with torch.no_grad():
            newmodel.eval()
            tmp1 = newmodel.forward(data)
            tmp2 = torch.reshape(tmp1,(2048,))
            embeddings[idx,:] = tmp2.numpy()

    np.save('dataset/embeddings.npy', embeddings)

### Training Data processing (Tripplets, Embeddings Normalization, Data Augmentation)

In [31]:
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/",
                                         transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load('dataset/embeddings.npy')
    
    # TODO: Normalize the embeddings across the dataset
 
    shape = np.shape(embeddings)
    
    for i in range(shape[1]):
        mean = np.mean(embeddings[:,i])
        std = np.std(embeddings[:,i])
        embeddings[:,i] = (embeddings[:,i]- mean)/std
    

    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    y = []
    
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]]))
        y.append(1)
        # Generating negative samples (data augmentation)
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append(0)
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y

### Training and Test Data definition and tensor creation 

In [32]:
# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory
def create_loader_from_np(X, y = None, train = True, batch_size=64, shuffle=True, num_workers = 8):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels
    
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), 
                                torch.from_numpy(y).type(torch.long))
                
        # Define train/test split
        train_indices, val_indices = train_test_split(list(range(len(dataset))), test_size=0.2, random_state=42)
        # Define samplers for train and test data
        train_sampler = SubsetRandomSampler(train_indices)
        val_sampler = SubsetRandomSampler(val_indices)
        
        train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
        val_loader = torch.utils.data.DataLoader(dataset, batch_size=len(val_indices), sampler=val_sampler)
        
        return train_loader, val_loader
        
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
        loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
        return loader

### Definition of Neural Network model class

In [33]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Define network architecture
        self.fc1 = nn.Linear(6144, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.dropout1 = nn.Dropout(p=0.7)
        
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(p=0.5)
        
        self.fc3 = nn.Linear(128, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.dropout3 = nn.Dropout(p=0.5)
        
        self.fc4 = nn.Linear(64, 1)

    def forward(self, x):
        # Forward pass through the network
        
        x = F.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)
        
        x = F.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)
        
        x = F.relu(self.bn3(self.fc3(x)))
        x = self.dropout3(x)
        
        x = self.fc4(x)
        x = torch.sigmoid(x)
        
        return x

### Model Training function, performance evaluation

In [34]:
def train_model(train_loader, val_loader):
    """
    The training procedure of the model; it accepts the training data, defines the model 
    and then trains it.

    input: train_loader: torch.data.util.DataLoader, the object containing the training data
    
    output: model: torch.nn.Module, the trained model
    """
    model = Net() 
    model.train()
    model.to(device)
    n_epochs = 10
    
    # TODO: define a loss function, optimizer and proceed with training. Hint: use the part 
    # of the training data as a validation split. After each epoch, compute the loss on the 
    # validation split and print it out. This enables you to see how your model is performing 
    # on the validation data before submitting the results on the server. After choosing the 
    # best model, train it on the whole training data.
    
    #loss and optimizer from tutorial
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
    
    losses = []
    loss_mean = []
    

    for epoch in range(n_epochs):
        for [X,y] in train_loader:
            # zero the parameter gradients
            optimizer.zero_grad()
            
            # forward + backward + optimize
            y_pred = model.forward(X)
            y_pred_rounded = torch.round(y_pred)
            
            #make loss work
            y = y.view(y.size()[0], 1)
            y = y.float()

            loss = criterion(y_pred, y)
            losses.append(loss.item())
            loss.backward()
            optimizer.step()
            
        loss_mean.append(np.mean(losses)) 
    
    # Plot the loss
    if not os.path.exists('loss_results'):
        os.makedirs('loss_results')
        
    axes = plt.plot([i for i in range(n_epochs)], loss_mean)
    plt.savefig("loss_results/loss_plot.pdf")
    plt.show()
    
    # Evaluate model performance
    with torch.no_grad():
        model.eval()
        losses = []
        for [X, y] in val_loader:
            y_pred = model.forward(X)
            
            y = y.view(y.size()[0], 1)
            y = y.float()
            
            loss = criterion(y_pred, y)
            losses.append(loss.item())
        val_loss = np.mean(losses)
        print(f'Validation Loss: {val_loss:.4f}')
    
    return model

### Testing fo model to test data and predictions/results extraction

In [35]:
def test_model(model, loader):
    """
    The testing procedure of the model; it accepts the testing data and the trained model and 
    then tests the model on it.

    input: model: torch.nn.Module, the trained model
           loader: torch.data.util.DataLoader, the object containing the testing data
        
    output: None, the function saves the predictions to a results.txt file
    """
    model.eval()
    predictions = []
    # Iterate over the test data
    with torch.no_grad(): # We don't need to compute gradients for testing
        for [x_batch] in loader:
            x_batch= x_batch.to(device)
            predicted = model(x_batch)
            predicted = predicted.cpu().numpy()
            # Rounding the predictions to 0 or 1
            predicted[predicted >= 0.5] = 1
            predicted[predicted < 0.5] = 0
            predictions.append(predicted)
        predictions = np.vstack(predictions)
    np.savetxt("results.txt", predictions, fmt='%i')

In [36]:
# Main function. You don't have to change this
if __name__ == '__main__':
    TRAIN_TRIPLETS = 'train_triplets.txt'
    TEST_TRIPLETS = 'test_triplets.txt'

    # generate embedding for each image in the dataset
    os.system('say "embeddings generation start"')
    if(os.path.exists('dataset/embeddings.npy') == False):
        generate_embeddings()
        
    os.system('say "embeddings generation done"')

    # load the training and testing data
    X, y = get_data(TRAIN_TRIPLETS)
    X_test, _ = get_data(TEST_TRIPLETS, train=False)

    # Create data loaders for the training and testing data
    train_loader, val_loader = create_loader_from_np(X, y, train = True, batch_size=64)
    test_loader = create_loader_from_np(X_test, train = False, batch_size=2048, shuffle=False)

    # define a model and train it
    os.system('say "start training"')
    model = train_model(train_loader,val_loader)
    os.system('say "training done"')
    
    # test the model on the test data
    os.system('say "start testing"')
    test_model(model, test_loader)
    os.system('say "testing done"')
    
    #save stats
    
    currentmodel = Net()
    info_str = str(currentmodel)
    with open('loss_results/net_info.txt', 'w') as f:
        f.write(info_str)
    
    
    print("Results saved to results.txt")
    os.system('say "finished"')

RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x6144 and 512x256)