## Imports

In [29]:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset, Dataset
import os
import torch
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torchvision.models import resnet50, ResNet50_Weights
from sklearn.model_selection import train_test_split

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 


## Neural Network

In [30]:
class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """
    def __init__(self):
        """
        The constructor of the model.
        """
        super().__init__()
        self.fc1 = nn.Linear(2048, 512)
        self.fc2 = nn.Linear(512, 512)

    def forward(self, x):
        """
        The forward pass of the model.

        input: x: torch.Tensor, the input to the model

        output: x: torch.Tensor, the output of the model
        """
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x

##  Generate embedding

In [31]:
# generate embedding for each image in the dataset
if(os.path.exists('dataset/embeddings.npy') == False):
    """
    Transform, resize and normalize the images and then use a pretrained model to extract 
    the embeddings.
    """
    weights = ResNet50_Weights.DEFAULT
    train_transforms = transforms.Compose([transforms.ToTensor()
                                       , transforms.Resize(256) 
                                       , transforms.CenterCrop(224)
                                       , transforms.Normalize([0.6110, 0.5012, 0.3752], [0.2575, 0.2659, 0.2801])
                                       ])

    train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
    train_loader = DataLoader(dataset=train_dataset,
                                batch_size=50,
                                shuffle=False, num_workers=8)
    model = resnet50(weights=weights)
    model.eval()
    for param in model.parameters():
        param.requires_grad = False


    model.fc = nn.Sequential()

    embeddings = []
    embedding_size = 2048
    num_images = len(train_dataset)
    
    for i, (features, labels) in enumerate(train_loader):
        print(i)
        embeddings.append(model(features).T.numpy())

    np.save('dataset/embeddings.npy', embeddings)

## Load data

In [32]:
class TripletsDataset(Dataset):
    def __init__(self, triplets, file_to_tensor):
        super().__init__()
        self.triplets = triplets
        self.file_to_tensor = file_to_tensor

    def __len__(self):
        return len(self.triplets)
    
    def __getitem__(self, index):
        line = self.triplets[index].split()
        a = self.file_to_tensor[line[0]]
        p = self.file_to_tensor[line[1]]
        n = self.file_to_tensor[line[2]]
        return a, p, n


In [33]:
def get_data(file, train=True):   
    
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)
    
    train_dataset = datasets.ImageFolder(root="dataset/", transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '').replace('food\\', '') for s in train_dataset.samples]

    embeddings = np.swapaxes(np.load('dataset/embeddings.npy'), 1, 2)

    file_to_tensor = {}
    for i in range(200):
        for j in range(50):
            file_to_tensor[filenames[j+i*50]] = torch.tensor(embeddings[i][j])

    return triplets, file_to_tensor

TRAIN_TRIPLETS = 'train_triplets.txt'
TEST_TRIPLETS = 'test_triplets.txt'

# load the training and testing data
triplets, file_to_tensor = get_data(TRAIN_TRIPLETS)
triplets_test, file_to_tensor_test = get_data(TEST_TRIPLETS, train=False)


full_dataset = TripletsDataset(triplets, file_to_tensor)
train_size = int(0.9 * len(full_dataset))
validation_size = len(full_dataset) - train_size

train_dataset, validation_dataset = torch.utils.data.random_split(full_dataset, [train_size, validation_size])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=64, shuffle=False)

test_dataset = TripletsDataset(triplets_test, file_to_tensor_test)
test_loader = DataLoader(test_dataset, batch_size=2048, shuffle=False)


## Training

In [None]:
# Training
model = Net()
model.to(device)
n_epochs = 5
optimizer = optim.SGD(model.parameters(), lr=0.01) 
criterion = nn.TripletMarginWithDistanceLoss(distance_function=nn.CosineSimilarity())
dist = nn.CosineSimilarity()

last_score = 0
for epoch in range(n_epochs):
    print(f'epoch={epoch}')  
    running_loss = 0.0 
    model.train()
    i = 0    
    for a, p, n in train_loader:
        a_out = model(a)
        p_out = model(p)
        n_out = model(n)
        optimizer.zero_grad()
        loss = criterion(a_out, p_out, n_out)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        i += 1
        if i % 500 == 499:    # print every 500 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 500:.3f}')
            running_loss = 0.0
    model.eval()
    running_loss = 0.0
    total = 0
    n_correct = 0
    for a, p, n in validation_loader:
        a_out = model(a)
        p_out = model(p)
        n_out = model(n)

        res_cos = dist(a_out, p_out)-dist(a_out, n_out)
        res_cos[res_cos >= 0] = 0
        res_cos[res_cos < 0] = 1

        n_correct += res_cos.sum().item()

        loss = criterion(a_out, p_out, n_out)
        running_loss += loss.item()
        total += len(a_out)
    score = n_correct/total
    print(score)
    if (last_score >= score):
        break
    else:
        last_score = score
        torch.save(model.state_dict, './model_saved')


## Testing

In [35]:
# Testing
model.eval()
dist = nn.CosineSimilarity()
predictions = []
# Iterate over the test data
k = 0
with torch.no_grad(): # We don't need to compute gradients for testing
    for a, p, n in test_loader:
        predicted_a = model(a)
        predicted_p = model(p)
        predicted_n = model(n)
        
        prediction = dist(predicted_a, predicted_p) - dist(predicted_a, predicted_n)
        prediction[prediction >= 0] = 1
        prediction[prediction < 0] = 0
        for x in prediction.numpy():
            predictions.append(x)

    predictions = np.stack(predictions)
    
np.savetxt("results.txt", predictions, fmt='%i')
print("Results saved to results.txt")

Results saved to results.txt
