### *Import* libraries

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torchvision.utils import save_image
from torchvision import datasets, transforms
import numpy as np
import pandas as pd
from scipy.spatial import distance
from statistics import median
from torch.utils.data import Dataset, DataLoader

train_size = 12000
test_size = 300

## Load data from csv
Please set path according to where the image and text files are placed.

In [None]:
image_vectors = pd.read_csv('./features.csv', header=None)
image_vectors = np.array(image_vectors)

text_vectors = pd.read_csv('./word2vec.csv', header=None)
text_vectors = np.array(text_vectors)

model_save_path = './Checkpoints/model_state.pt'

print(image_vectors.shape, text_vectors.shape)

(12305, 512) (12305, 512)


### Encoder
Separate fully connected layers for image vectors and text vectors. Both are brought to a shared vector space of dimension = 50.

In [None]:
class Encoder(nn.Module):

    def __init__(self,input_vec_dim):
        
        super(Encoder,self).__init__()
        
        self.img_dims=[input_vec_dim,300,200,50]
        self.fc1_img=nn.Linear(self.img_dims[0],self.img_dims[1])
        self.fc2_img=nn.Linear(self.img_dims[1],self.img_dims[2])
        self.fc3_img=nn.Linear(self.img_dims[2],self.img_dims[3])
        
        self.txt_dims=[input_vec_dim,300,200,50]
        self.fc1_txt=nn.Linear(self.txt_dims[0],self.txt_dims[1])
        self.fc2_txt=nn.Linear(self.txt_dims[1],self.txt_dims[2])
        self.fc3_txt=nn.Linear(self.txt_dims[2],self.txt_dims[3])

    def forward(self,img,txt):

        x=F.relu(self.fc1_img(img))
        x=F.relu(self.fc2_img(x))
        x=F.relu(self.fc3_img(x))

        y=F.relu(self.fc1_txt(txt))
        y=F.relu(self.fc2_txt(y))
        y=F.relu(self.fc3_txt(y))

        return F.relu(torch.add(x,y))

### Decoder
Separate fully connected layers for image vectors and text vectors in the latent space(shared vector space). Restore their original dimension of 512.

In [None]:
class Decoder(nn.Module):

    def __init__(self,output_vec_dim):
        
        super(Decoder,self).__init__()
        
        self.img_dims=[output_vec_dim,200,300,512]
        self.fc1_img=nn.Linear(self.img_dims[0],self.img_dims[1])
        self.fc2_img=nn.Linear(self.img_dims[1],self.img_dims[2])
        self.fc3_img=nn.Linear(self.img_dims[2],self.img_dims[3])
        
        self.txt_dims=[output_vec_dim,200,300,512]
        self.fc1_txt=nn.Linear(self.txt_dims[0],self.txt_dims[1])
        self.fc2_txt=nn.Linear(self.txt_dims[1],self.txt_dims[2])
        self.fc3_txt=nn.Linear(self.txt_dims[2],self.txt_dims[3])

    def forward(self,rep):

        x=F.relu(self.fc1_img(rep))
        x=F.relu(self.fc2_img(x))
        x=F.relu(self.fc3_img(x))

        y=F.relu(self.fc1_txt(rep))
        y=F.relu(self.fc2_txt(y))
        y=F.relu(self.fc3_txt(y))

        combined=F.relu(torch.cat((x,y),1))
        return combined


## Corrnet Model
Makes use of the Encoder and Decoder Neural Networks.

In [None]:
class Corrnet(nn.Module):
    def __init__(self,input_vec_dim,common_rep_dim):
        super(Corrnet,self).__init__()
        self.encoder=Encoder(input_vec_dim)
        self.decoder=Decoder(common_rep_dim)

    def forward(self,img,txt):
        common_rep=self.encoder(img,txt)
        combined=self.decoder(common_rep)
        return combined


class corrnet_dataset(Dataset):
  def __init__(self,img,txt):
    self.img = img
    self.txt = txt
    if self.img.shape[0]!=self.txt.shape[0]:
      raise Exception("Different no. of samples")

  def __len__(self):
    return self.img.shape[0]

  def __getitem__(self, index):
    # note that this isn't randomly selecting. It's a simple get a single item that represents an x and y
    _img = self.img[index]
    _txt = self.txt[index]

    return _img, _txt

### Create Dataloader object
We create a dataloader object from our dataset. DataLoader, from torch.utils. Data shuffles the data and creates batches of size 32 for us.

In [None]:
def get_data_loader():
    img = torch.from_numpy(image_vectors[:train_size].astype(np.float32))
    txt = torch.from_numpy(text_vectors[:train_size].astype(np.float32))

    loader = DataLoader(corrnet_dataset(img, txt), batch_size=32, shuffle=True)

    return loader

### Call to make the Model

In [None]:
def make_model(load_pretrained=True):

    Model=Corrnet(512,50)
    optimizer = optim.Adam(Model.parameters(), lr=0.001)

    if (load_pretrained and os.path.exists(model_save_path)):
      Model.load_state_dict(torch.load(model_save_path))

    return Model,optimizer
    # print(Model)

### Correlation Part of Loss
Given image vector 'x' and text vector 'y', find their correlation.

In [None]:
def correlation(x, y, lamda=0.02):

  '''
    x, y are n x 50 dimensional vectors obtained from the respective n x 512 embeddings
  '''

  x_mean = torch.mean(x, dim = 0) # Along the y-axis, that is, average of all feature vectors
  y_mean = torch.mean(y, dim = 0) # 1 x 50 dimensional
  x_centered = torch.sub(x, x_mean) # calculates xi - X_mean n x 50 dimensional
  y_centered = torch.sub(y, y_mean) # calculates yi - Y_mean
  corr_nr = torch.sum(torch.mul(x_centered, y_centered)) # The numerator
  # print(list(corr_nr.shape))
  corr_dr1 = torch.sqrt(torch.sum(torch.square(x_centered)))
  corr_dr2 = torch.sqrt(torch.sum(torch.square(y_centered)))
  corr_dr = corr_dr1 * corr_dr2
  corr = -lamda * corr_nr / corr_dr
  # print(corr.item()) # Should decrease ideally
  return corr

In [None]:
# Own implementation of criterion loss
def own_mse_loss(inp, target):
  L = (inp - target) ** 2
  return torch.mean(L)

In [None]:
criterion = nn.MSELoss()

In [None]:
own_mse_loss(e, f) # First divided by the dimension of the vector, then the individual MSEs are squared.
criterion(e, f)
# Just like in ML lecture

tensor(1.3333, dtype=torch.float64)

### Training
Set number of epochs as required.

In [None]:
def train():
    epochs = 100
    for e in range(epochs):
        ind = 1
        L = []
        err=[[],[],[],[]]
        for img,txt in dataset:

            # img-> 224*224*3 array
            # txt -> string

            concat_inputs=torch.cat((img,txt),1)

            opt.zero_grad()

            res_combined_input=corrnet(img,txt)
            res_img_input=corrnet(img,torch.zeros_like(txt))
            res_txt_input=corrnet(torch.zeros_like(img),txt)

            err1 = criterion(res_combined_input,concat_inputs)
            err2 = criterion(res_img_input,concat_inputs)
            err3 = criterion(res_txt_input,concat_inputs)
            err4 = correlation(
                corrnet.encoder(img, torch.zeros_like(txt)),
                corrnet.encoder(torch.zeros_like(img), txt)
            )
            
            loss = (err1 + err2 + err3 + err4)

            loss.backward()

            
            L.append(loss.item())
            err[0].append(err1.item())
            err[1].append(err2.item())
            err[2].append(err3.item())
            err[3].append(err4.item())
            opt.step()

            ind+=1
        
        print("Epoch: {}:, Loss: {}".format(e, np.mean(L)))
        for i in range(len(err)):
          print("err{}: {}".format(i+1,np.mean(err[i])),end="\t")
        print("\n")

        if(e%10==0):
          torch.save(corrnet.state_dict(),model_save_path)


### Prediction
Given image and text vectors, predict their euclidean and cosine distances.

In [None]:
def predict(img, txt):
  img_vecs = corrnet.encoder(img, torch.zeros_like(txt))
  txt_vecs = corrnet.encoder(torch.zeros_like(img), txt)

  euc = []
  cos = []
  for img_vec, txt_vec in zip(img_vecs, txt_vecs):
    euc.append(distance.euclidean(img_vec.cpu().detach().numpy(), txt_vec.cpu().detach().numpy()))
    cos.append(distance.cosine(img_vec.cpu().detach().numpy(), txt_vec.cpu().detach().numpy()))

  return np.array(euc), np.array(cos)

In [None]:
def print_metrics():
  img_test = torch.from_numpy(image_vectors[train_size: train_size+test_size].astype(np.float32))
  txt_test = torch.from_numpy(text_vectors[train_size: train_size+test_size].astype(np.float32))

  mr = []
  top_1_count = 0
  top_5_count = 0
  top_10_count = 0

  for i in range(test_size):
    img_array = np.zeros((test_size, 512))
    for k in range(test_size):
      img_array[k] = img_test[i]
        
    txt_array = np.zeros((test_size, 512))
    for j in range(test_size):
      txt_array[j] = txt_test[j]
    
    predictions = list(predict(torch.from_numpy(txt_array.astype(np.float32)), torch.from_numpy(img_array.astype(np.float32)))[1])
    pred_i = predictions[i]
    predictions.sort()
    rank = predictions.index(pred_i)
    if rank < 10:
      top_10_count += 1
    if rank < 5:
      top_5_count += 1
    if rank < 1:
      top_1_count += 1
    mr.append(rank+1)     

  print('Median Rank(img->txt):', median(mr)*100/test_size, '%')
  print('R@1(img->txt):', top_1_count*100/test_size, '%')
  print('R@5(img->txt):', top_5_count*100/test_size, '%')
  print('R@10(img->txt):', top_10_count*100/test_size, '%')

  mr = []
  top_1_count = 0
  top_5_count = 0
  top_10_count = 0

  for i in range(test_size):
    img_array = np.zeros((test_size, 512))
    for k in range(test_size):
      img_array[k] = img_test[k]
        
    txt_array = np.zeros((test_size, 512))
    for j in range(test_size):
      txt_array[j] = txt_test[i]
    
    predictions = list(predict(torch.from_numpy(txt_array.astype(np.float32)), torch.from_numpy(img_array.astype(np.float32)))[1])
    pred_i = predictions[i]
    predictions.sort()
    rank = predictions.index(pred_i)
    if rank < 10:
      top_10_count += 1
    if rank < 5:
      top_5_count += 1
    if rank < 1:
      top_1_count += 1
    mr.append(rank+1)     

  print('Median Rank(txt->img):', median(mr)*100/test_size, '%')
  print('R@1(txt->img):', top_1_count*100/test_size, '%')
  print('R@5(txt->img):', top_5_count*100/test_size, '%')
  print('R@10(txt->img):', top_10_count*100/test_size, '%')

In [None]:
if __name__ == "__main__":
    corrnet, opt = make_model(False)
    dataset = get_data_loader() # Insert the embeddings inside this function
    train()

Epoch: 0:, Loss: 0.45059243845939634
err1: 0.15434641776482264	err2: 0.15449427553017933	err3: 0.1594334157705307	err4: -0.017681670394105217	

Epoch: 1:, Loss: 0.3435710775852203
err1: 0.11604118392864864	err2: 0.11724428355693817	err3: 0.12859890355666478	err4: -0.018313292883336545	

Epoch: 2:, Loss: 0.31804039669036865
err1: 0.10606649239857992	err2: 0.10808032415310542	err3: 0.12238489532470703	err4: -0.018491314599911372	

Epoch: 3:, Loss: 0.30300241525967914
err1: 0.10008199085791905	err2: 0.10277518971761068	err3: 0.11873472032944361	err4: -0.01858948708573977	

Epoch: 4:, Loss: 0.2925619030793508
err1: 0.09614095455408096	err2: 0.09936030350128809	err3: 0.11572767208019892	err4: -0.018667027155558267	

Epoch: 5:, Loss: 0.2851675442059835
err1: 0.09341880738735199	err2: 0.0970039712190628	err3: 0.11348733776807785	err4: -0.018742571656902633	

Epoch: 6:, Loss: 0.2796692452430725
err1: 0.09140770405530929	err2: 0.09530282364288965	err3: 0.11174346117178599	err4: -0.0187847442080

### Find out metrics for the Model on the given Data.

In [None]:
print_metrics()

Median Rank(img->txt): 50.0 %
R@1(img->txt): 0.3333333333333333 %
R@5(img->txt): 2.6666666666666665 %
R@10(img->txt): 4.333333333333333 %
Median Rank(txt->img): 48.833333333333336 %
R@1(txt->img): 0.3333333333333333 %
R@5(txt->img): 2.6666666666666665 %
R@10(txt->img): 4.666666666666667 %
