# Project 3

#### Importing libraries

In [1]:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import vit_l_16, ViT_L_16_Weights
import torch.optim as optim

In [2]:
device = torch.device("mps")

#### Extracting image embeddings using pre-trained Vision Transformer model

In [3]:
"""
Transform, resize and normalize the images and then use a pretrained model to extract the embeddings.
"""
# transform images
batch = 64
train_transforms = ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1.transforms()
train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch, shuffle=False, pin_memory=True, num_workers=10)

# extract embeddings using pre-trained model
model = vit_l_16(weights=ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1)
embedding_size = model.heads[0].in_features
num_images = len(train_dataset)
embeddings = np.zeros((num_images, embedding_size))
model.heads[0] = nn.Identity()
model.to(device)
i = 0
with torch.no_grad():
    for inputs, _ in train_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        embeddings[batch*i : batch*(i+1)] = outputs.cpu().numpy()
        del inputs
        del outputs
        print(i, end="--")
        i += 1
np.save('dataset/embeddings-ViT_L_16.npy', embeddings)

#### Preparing training data

In [4]:
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.
    input: file: string, the path to the file containing the triplets
           train: boolean, whether the data is for training or testing
    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    train_dataset = datasets.ImageFolder(root="dataset/", transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load('dataset/embeddings-ViT_L_16.npy')
    # normalize embeddings
    embeddings = (embeddings - np.mean(embeddings, axis=1)[:, np.newaxis]) / np.std(embeddings, axis=1)[:, np.newaxis]

    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    y = []

    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]]))
        y.append(1)
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append(0)
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y

In [5]:
def create_loader_from_np(X, y=None, train=True, batch_size=batch, shuffle=True, num_workers=10):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.
    input: X: numpy array, the features
           y: numpy array, the labels
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), torch.from_numpy(y).type(torch.float))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=True, num_workers=num_workers)
    return loader

In [6]:
TRAIN_TRIPLETS = 'train_triplets.txt'
X, y = get_data(TRAIN_TRIPLETS)
train_loader = create_loader_from_np(X[0:round(0.8*X.shape[0])], y[0:round(0.8*len(y))], train=True, batch_size=batch)
valid_loader = create_loader_from_np(X[round(0.8*X.shape[0]):], y[round(0.8*len(y)):], train=True, batch_size=batch)
train_loader_final = create_loader_from_np(X, y, train=True, batch_size=batch)
del X
del y

#### Defining neural network for prediction

In [7]:
layer1_size = 200
layer2_size = 200
dropout_prop = 0.5

class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """
    def __init__(self):
        """
        The constructor of the model.
        """
        super().__init__()
        self.fc1 = nn.Linear(3*embedding_size, layer1_size)
        self.dropout1 = nn.Dropout(dropout_prop)
        self.fc2 = nn.Linear(layer1_size, layer2_size)
        self.dropout2 = nn.Dropout(dropout_prop)
        self.fc3 = nn.Linear(layer2_size, 1)

    def forward(self, x):
        """
        The forward pass of the model.
        input: x: torch.Tensor, the input to the model
        output: x: torch.Tensor, the output of the model
        """
        x = self.fc1(x)
        x = F.elu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = F.elu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        return x

#### Training and evaluating neural network

In [8]:
"""
The training procedure of the model; it accepts the training data, defines the model and then trains it.
input: train_loader: torch.data.util.DataLoader, the object containing the training data
compute: model: torch.nn.Module, the trained model
"""
model = Net()
model.train()
model.to(device)

n_epochs = 25
patience = 3
min_delta = 0.001
best_val_loss = float('inf')
epochs_no_improve = 0
loss_function = nn.BCEWithLogitsLoss()
learn_rate = 0.0003
optimizer = optim.Adam(model.parameters(), lr=learn_rate)

for epoch in range(n_epochs):

    train_loss = 0.0
    train_corrects = 0
    for batch_id, (X, y) in enumerate(train_loader):
        model.train()
        X = X.to(device)
        y = y.to(device)
        optimizer.zero_grad()
        output = model(X)
        loss = loss_function(torch.flatten(output), y)
        loss.backward()
        optimizer.step()
        preds = torch.round(F.sigmoid(torch.flatten(output)))
        train_loss += loss.item() * X.size(0)
        train_corrects += torch.sum(preds == y.data)
        if batch_id % 300 == 0:
            print('Epoch {}, Batch {}'.format(epoch+1, batch_id), end=" -- ")
    epoch_train_loss = train_loss / len(train_loader.dataset)
    epoch_train_acc = 100 * train_corrects.cpu() / len(train_loader.dataset)
    print('Epoch {}, training loss {}, accuracy: ({:.0f}%)'.format(epoch+1, epoch_train_loss, epoch_train_acc))

    valid_loss = 0.0
    valid_corrects = 0
    with torch.no_grad():
        for X, y in valid_loader: 
            model.eval()
            X = X.to(device)   
            y = y.to(device)        
            output_valid = model(X)
            loss_valid = loss_function(torch.flatten(output_valid), y)
            preds_valid = torch.round(F.sigmoid(torch.flatten(output_valid)))
            valid_loss += loss_valid.item() * X.size(0)
            valid_corrects += torch.sum(preds_valid == y.data)
    epoch_valid_loss = valid_loss / len(valid_loader.dataset)
    epoch_valid_acc = 100 * valid_corrects.cpu() / len(valid_loader.dataset)
    print('Epoch {}, validation loss {}, accuracy: ({:.0f}%)'.format(epoch+1, epoch_valid_loss, epoch_valid_acc))

    if epoch_valid_loss < best_val_loss - min_delta:
        best_val_loss = epoch_valid_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1

    if epochs_no_improve >= patience:
        print(f'Early stopping after {epoch+1} epochs.')
        break

Epoch 1, Batch 0 -- Epoch 1, Batch 300 -- Epoch 1, Batch 600 -- Epoch 1, Batch 900 -- Epoch 1, Batch 1200 -- Epoch 1, training loss 0.5557088509613873, accuracy: (71%)
Epoch 1, validation loss 0.4968480159182173, accuracy: (75%)
Epoch 2, Batch 0 -- Epoch 2, Batch 300 -- Epoch 2, Batch 600 -- Epoch 2, Batch 900 -- Epoch 2, Batch 1200 -- Epoch 2, training loss 0.5037619577208897, accuracy: (75%)
Epoch 2, validation loss 0.4866225994991313, accuracy: (77%)
Epoch 3, Batch 0 -- Epoch 3, Batch 300 -- Epoch 3, Batch 600 -- Epoch 3, Batch 900 -- Epoch 3, Batch 1200 -- Epoch 3, training loss 0.4780532561019119, accuracy: (77%)
Epoch 3, validation loss 0.4714181385780997, accuracy: (77%)
Epoch 4, Batch 0 -- Epoch 4, Batch 300 -- Epoch 4, Batch 600 -- Epoch 4, Batch 900 -- Epoch 4, Batch 1200 -- Epoch 4, training loss 0.4530173528347658, accuracy: (78%)
Epoch 4, validation loss 0.44862223434906934, accuracy: (79%)
Epoch 5, Batch 0 -- Epoch 5, Batch 300 -- Epoch 5, Batch 600 -- Epoch 5, Batch 900 

#### Training final neural network

In [None]:
model = Net()
model.train()
model.to(device)
n_epochs = 15
optimizer = optim.Adam(model.parameters(), lr=learn_rate)
for epoch in range(n_epochs):
    train_loss = 0.0
    train_corrects = 0
    for batch_id, (X, y) in enumerate(train_loader_final):
        X = X.to(device)
        y = y.to(device)
        optimizer.zero_grad()
        output = model(X)
        loss = loss_function(torch.flatten(output), y)
        loss.backward()
        optimizer.step()
        preds = torch.round(F.sigmoid(torch.flatten(output)))
        train_loss += loss.item() * X.size(0)
        train_corrects += torch.sum(preds == y.data)
        if batch_id % 400 == 0:
            print('Epoch {}, Batch {}'.format(epoch+1, batch_id), end=" -- ")
    epoch_train_loss = train_loss / len(train_loader_final.dataset)
    epoch_train_acc = 100 * train_corrects.cpu() / len(train_loader_final.dataset)
    print('Epoch {}, training loss {}, accuracy: ({:.0f}%)'.format(epoch+1, epoch_train_loss, epoch_train_acc))    

#### Making predictions on test data

In [10]:
TEST_TRIPLETS = 'test_triplets.txt'
X_test, y_test = get_data(TEST_TRIPLETS, train=False)
test_loader = create_loader_from_np(X_test, train=False, batch_size=2048, shuffle=False)
del X_test
del y_test

In [13]:
"""
The testing procedure of the model; it accepts the testing data and the trained model and then tests the model on it.
input: model: torch.nn.Module, the trained model
       loader: torch.data.util.DataLoader, the object containing the testing data     
compute: None, the function saves the predictions to a results.txt file
"""
model.eval()
predictions = []
with torch.no_grad():
    for [x_batch] in test_loader:
        x_batch = x_batch.to(device)
        predicted = F.sigmoid(torch.flatten(model(x_batch)))
        predicted = predicted.cpu().numpy()[:, np.newaxis]
        predicted[predicted >= 0.5] = 1
        predicted[predicted < 0.5] = 0
        predictions.append(predicted)
    predictions = np.vstack(predictions)
np.savetxt("results.txt", predictions, fmt='%i')
print("Results saved to results.txt")

Results saved to results.txt
