First, we import necessary libraries:

In [1]:
import numpy as np
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights, feature_extraction
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score
from tqdm import tqdm

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [114]:
"""
Transform, resize and normalize the images and then use a pretrained model to extract 
the embeddings.
"""

weights = ResNet50_Weights.DEFAULT
train_transforms = weights.transforms()

train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)

train_loader = DataLoader(dataset=train_dataset,
                          batch_size=64,
                          shuffle=False,
                          pin_memory=True, num_workers=8)


model = resnet50(weights=weights)
model.eval()
model.to(device)

embedding_size = 1000
num_images = len(train_dataset)
embeddings = np.zeros((num_images, embedding_size))


train_nodes, eval_nodes = feature_extraction.get_graph_node_names(resnet50()) # to see what kind of layers there are in resnet50
return_nodes = {'fc': 'fc'} # take the fc layer to get the embeddings
feature_extractor = feature_extraction.create_feature_extractor(model, return_nodes=return_nodes)

In [115]:
index = 64
for i, (loaded_features, loaded_labels) in enumerate(tqdm(train_loader)):
    features = feature_extractor(loaded_features)
    flatten_fts = features["fc"].squeeze()
    embeddings[i*index: (i + 1)*index] = flatten_fts.detach().numpy()

np.save('dataset/embeddings_fc.npy', embeddings)

100%|██████████| 157/157 [32:44<00:00, 12.51s/it]


In [110]:
img, label = train_dataset[0]
batch = img.unsqueeze(0)

features = feature_extractor(batch)
flatten_fts_2 = features["fc"].squeeze()
print(flatten_fts_2.size())

torch.Size([1000])


In [116]:
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/",
                                         transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load('dataset/embeddings_fc.npy')
    
    # Normalize the embeddings
    norm = np.linalg.norm(embeddings, axis=1).reshape(-1,1)
    embeddings = embeddings / norm

    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    y = []
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]]))
        y.append(1)
        # Generating negative samples (data augmentation)
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append(0)
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y

In [117]:
def create_loader_from_np(X, y = None, train = True, batch_size=64, shuffle=True, num_workers=4):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels
    
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), 
                                torch.from_numpy(y).type(torch.long))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader

In [126]:
class Net_2FClayers(nn.Module):

    def __init__(self):
        super().__init__()

        # Linear functions
        #self.fc1 = nn.Linear(6144, 128)
        self.fc1 = nn.Linear(3000, 128)
        self.fc2 = nn.Linear(128, 1)

        # Activation function
        self.relu = nn.ReLU()

        # Batch normalization
        self.bn1 = nn.BatchNorm1d(128)

    def forward(self, x):

        x = self.fc1(x)
        #x = self.bn1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = x.view(-1)
        
        return x

In [119]:
TRAIN_TRIPLETS = 'train_triplets.txt'

# load the training data
X, y = get_data(TRAIN_TRIPLETS)
# Create data loaders for the training data
train_loader = create_loader_from_np(X, y, train = True, batch_size=64)
# delete the loaded training data to save memory, as the data loader copies
del X
del y

In [120]:
TEST_TRIPLETS = 'test_triplets.txt'

# repeat for testing data
X_test, y_test = get_data(TEST_TRIPLETS, train=False)
test_loader = create_loader_from_np(X_test, train = False, batch_size=2048, shuffle=False)
del X_test
del y_test

In [127]:
"""
The training procedure of the model; it accepts the training data, defines the model 
and then trains it.

input: train_loader: torch.data.util.DataLoader, the object containing the training data
    
compute: model: torch.nn.Module, the trained model
"""
model = Net_2FClayers()
model.train()
model.to(device)

n_epochs = 10

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

for epoch in range(n_epochs):    
    valid_loss = 0.0
    running_loss = 0.0

    for i, ([X, y]) in enumerate(train_loader):
        y = y.type(torch.FloatTensor)
        if i == epoch: # split for validation
            model.eval()
            outputs_valid = model(X)
            loss_valid = criterion(outputs_valid, y)
            valid_loss += loss_valid.item()

        else: # training
            model.train()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
    print(f'Epoch {epoch+1} \t\t Training Loss: {running_loss / (len(train_loader)-1)} \t\t Validation Loss: {valid_loss}')

Epoch 1 		 Training Loss: 0.6932483560427214 		 Validation Loss: 0.6921693086624146
Epoch 2 		 Training Loss: 0.6931471824325368 		 Validation Loss: 0.6931471824645996


KeyboardInterrupt: 

In [85]:
model = Net_2FClayers()
model.train()
model.to(device)

n_epochs = 20

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

for epoch in range(n_epochs):     
    running_loss = 0.0

    for i, ([X, y]) in enumerate(train_loader):
        y = y.type(torch.FloatTensor)
        model.train()
        outputs = model(X)
        print(outputs)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    print(f'Epoch {epoch+1} \t\t Training Loss: {running_loss / (len(train_loader))}')


tensor([0.6167, 0.4203, 0.5884, 0.5120, 0.6517, 0.5977, 0.6719, 0.4861, 0.6359,
        0.5463, 0.6855, 0.5879, 0.5768, 0.5983, 0.7123, 0.6376, 0.4609, 0.7284,
        0.5629, 0.5957, 0.7609, 0.7034, 0.5750, 0.6600, 0.6882, 0.6491, 0.5554,
        0.5312, 0.6748, 0.5579, 0.5594, 0.6157, 0.6510, 0.5599, 0.7235, 0.6695,
        0.6605, 0.6418, 0.6019, 0.5702, 0.6476, 0.5775, 0.6654, 0.6434, 0.6474,
        0.5884, 0.5491, 0.5158, 0.5668, 0.8033, 0.4032, 0.5291, 0.5896, 0.5817,
        0.5631, 0.6190, 0.6053, 0.7628, 0.6669, 0.5606, 0.5153, 0.4577, 0.7028,
        0.7351], grad_fn=<ViewBackward0>)
tensor([0.6448, 0.6719, 0.5976, 0.6372, 0.5175, 0.5407, 0.5089, 0.6671, 0.6937,
        0.5744, 0.6136, 0.6321, 0.6172, 0.6805, 0.4073, 0.6457, 0.6347, 0.5876,
        0.6865, 0.6651, 0.7010, 0.6391, 0.5260, 0.5651, 0.6088, 0.6776, 0.6530,
        0.5476, 0.4927, 0.6128, 0.6061, 0.6868, 0.6687, 0.5984, 0.6623, 0.4918,
        0.5270, 0.6050, 0.7321, 0.5329, 0.6987, 0.6883, 0.5744, 0.6344, 0.5218

KeyboardInterrupt: 

In [86]:
"""
The testing procedure of the model; it accepts the testing data and the trained model and 
then tests the model on it.

input: model: torch.nn.Module, the trained model
       loader: torch.data.util.DataLoader, the object containing the testing data
        
compute: None, the function saves the predictions to a results.txt file
"""
model.eval()
predictions = []
# Iterate over the test data
with torch.no_grad(): # We don't need to compute gradients for testing
    for [x_batch] in test_loader:
        x_batch= x_batch.to(device)
        predicted = model(x_batch).view(-1,1)
        predicted = predicted.cpu().numpy()
        # Rounding the predictions to 0 or 1
        predicted[predicted >= 0.5] = 1
        predicted[predicted < 0.5] = 0
        predictions.append(predicted)
    predictions = np.vstack(predictions)
np.savetxt("results.txt", predictions, fmt='%i')
print("Results saved to results.txt")

Results saved to results.txt
