In [None]:
#connection with google drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#set seed and import needed libraries
SEED = 1234
! pip install pyprind
import os
import json
import nltk
import torch
import random
import pyprind
import torchtext
import numpy as np
import pandas as pd
nltk.download('punkt')
import seaborn as sns
import torch.nn as nn
from tqdm import tqdm
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
from torchtext import data
import torch.optim as optim
import matplotlib.pyplot as plt
%matplotlib inline
import torch.nn.functional as F
from torchtext.legacy import data
torch.backends.cudnn.deterministic = True
from sklearn.metrics import classification_report
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn.metrics import precision_score, recall_score, f1_score
torch.cuda.init()

In [None]:
#use cuda if it's available
is_cuda = torch.cuda.is_available()
print("Cuda Status on system is {}".format(is_cuda))
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu' )

In [None]:
#load file pt for images
import tarfile
t = tarfile.open('/content/drive/MyDrive/coco_object/coco_object_features.tar.gz', 'r')
t.extractall("/content/drive/MyDrive/pt/")

In [None]:
#count number of pt's file
path, dirs, files = next(os.walk("/content/drive/MyDrive/coco_object_features/features"))
file_count = len(files)
print(file_count)

In [None]:
#create a vocab using glove, than add a vector for absent values
vocab = torchtext.vocab.Vectors("/content/drive/MyDrive/embedding/glove.6B.300d.txt")
vector_400000 = torch.zeros(300)
vector_400000.view(1,300).shape
vocab.vectors = torch.cat((vocab.vectors, vector_400000.view(1,300)), dim=0)

In [None]:
#load dataset
validation1 = pd.read_json("/content/drive/MyDrive/Text_Image_Datasets/Doc1.json")

In [None]:
#create a dataframe which has for every row only one caption_id and one iamge_id.
#the same caption_id will be repated for the first n rows. Where n is the number of images in a pool.
df = pd.DataFrame(columns = ["cap_id", "img_id"])
for i in range(len(validation1)):
  for y in range(250):
    caption_id = validation1.id[i]
    image_id = validation1.images_id[i][y]
    row = {"cap_id":caption_id, "img_id":image_id}
    df = df.append(row, ignore_index = True)

In [None]:
#Create a dict for captions.
#The keys are the captions_ids and the values are a list of the words indexes (from the vocab)
vocabolario1 = {}              
for i in range(len(validation1)):
  id = validation1['id'][i]
  caption = validation1['caption'][i].lower()
  caption = nltk.word_tokenize(caption)
  word_idx=[]
  tensor_idx=[]
  for word in caption:          
    try:
      index = vocab.stoi[word]
      word_idx.append(index)
    except KeyError:
      word_idx.append(400000) 
  while len(word_idx) <13:
    word_idx.append(400000)
  word_idx = word_idx[:12]
  for idx in word_idx:
    tensor_idx.append(np.array(vocab.vectors[idx]))
  vocabolario1[id]=torch.tensor(word_idx)

In [None]:
#Create dict for images
#The keys are the images_ids, the values are dictionaries with keys:"classes","features" 
#and values are indexes of classes(using vocab) and features tensors.
images_id = set(df['img_id'])
vocabolario2 = {}
for image_id in images_id:
  file_pt = torch.load("/content/drive/MyDrive/coco_object_features/features/"+'0' * (12-len(str(image_id))) + str(image_id) + '.pt',map_location="cuda:0")
  
  classes_list=[]
  for tag in file_pt['classes']:
    try:
      tag = tag.decode("utf-8")
      tag = tag.lower()
      index = vocab.stoi[tag]

      classes_list.append(np.array([index]))
    except KeyError:
      classes_list.append(np.array([400000]))
    
  file_pt['classes'] = torch.tensor(classes_list)
  file_pt["features"] = torch.tensor(file_pt["features"])

  vocabolario2[image_id] = file_pt

In [None]:
#Create Dataset class
class Dataset(Dataset):
    def __init__(self,tabella):
        self.samples = tabella
    def __len__(self):
      return len(self.samples)
    def __getitem__(self, idx):
      caption_id= self.samples.cap_id.iloc[idx]
      image_id = self.samples.img_id.iloc[idx]

      return torch.tensor(caption_id), torch.tensor(image_id)

In [None]:
#Start Dataloader
dataset = Dataset(df)
dataloader = DataLoader(dataset, batch_size=250, num_workers=2)

In [None]:
#define the class for the neural network
class CRM(nn.Module): #cross retrieval match
  def __init__(self, vocab_dim, embedding_language_dim, embedding_classes_dim, embedding_features_dim, hidden_dim, output_dim):
        super().__init__()

        self.embeddings_language = nn.Embedding(vocab_dim, embedding_language_dim) 
        self.embeddings_language.load_state_dict({'weight': (vocab.vectors)})
        self.rnn_language = nn.RNN(embedding_language_dim, hidden_dim, batch_first = True)

        embedding_img_dim = embedding_classes_dim + embedding_features_dim
        self.embeddings_visual = nn.Embedding(vocab_dim, embedding_classes_dim)
        self.embeddings_visual.load_state_dict({'weight': (vocab.vectors)})
        self.rnn_visual = nn.RNN(embedding_img_dim, hidden_dim, batch_first = True)

        self.dropout = nn.Dropout(p=0.2)


        self.fc = nn.Linear(hidden_dim + hidden_dim, output_dim)

  def forward(self, caption_idx, embedding_features, classes_idx):

          embedding_text = self.embeddings_language(caption_idx)
          _,hidden_language = self.rnn_language(embedding_text)

          embedding_classes = self.embeddings_visual(classes_idx)
          embedding_image = torch.cat((embedding_features, embedding_classes), dim=-1)
          _, hidden_visual = self.rnn_visual(embedding_image)
          

          hidden = torch.cat((hidden_language, hidden_visual), dim = -1)
          
          #import pdb; pdb.set_trace()


          out = self.fc(hidden)
          out = self.dropout(out)
          #out = m(out)


          return out

In [None]:
#load classifer
model_name="model_50_lesspadding.pt" 
model = torch.load("/content/drive/MyDrive/models/"+model_name)

In [None]:
#set RNN features
VOCAB_DIM = len(vocab.vectors)
EMBEDDING_CLASSES_DIM = 300
EMBEDDING_FEATURES_DIM = 2048
EMBEDDING_LANGUAGE_DIM = 300
HIDDEN_DIM = 450
OUTPUT_DIM = 2

In [None]:
#set weights and criterion
class_weights = torch.tensor([1.0, 1.0]).cuda()
model = model.to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
criterion = criterion.to(device)

In [None]:
#Creating a function that uses batch results to covert ids into thier value using the dictionaries upon created
def multi_dict(ids_list, dictionary_to_use,work_on_images=True):
  features_for_CRM= []
  classes_for_CRM= []
  captions_for_CRM= []
  if work_on_images == True:
    for simple_id in ids_list:
      features_for_CRM.append(dictionary_to_use[simple_id.item()]['features'])
      classes_for_CRM.append(dictionary_to_use[simple_id.item()]['classes'])
    return torch.stack(features_for_CRM).to(device), torch.stack(classes_for_CRM).to(device)
  if work_on_images == False:
    for simple_id in ids_list:
      captions_for_CRM.append(dictionary_to_use[simple_id.item()])
    return torch.stack(captions_for_CRM).to(device)

In [None]:
#function that calculates the position of the correct image after ordering by score
def ranking_tracker(prediction,batch):
  y_true = int(batch[1][0]) #verificare
  df = pd.DataFrame(columns = ["id_image", "scores"])
  for i in range(250):
    row = {"id_image": batch[1][i].item(), "scores": prediction[i][1].item()}
    df=df.append(row, ignore_index=True)
  df = df.sort_values("scores", ascending=False)
  df = df.reset_index()
  df = df[["id_image", "scores"]]
  position_true = df[df.id_image.isin([y_true])].index[0] + 1
  return position_true

In [None]:
#function that calulates scores(probability of correct association (image-text))
def positive_scores(model, iterator, criterion):
  
  model.eval() 
  with torch.no_grad():
      bar = pyprind.ProgBar(len(iterator), bar_char='█')
      positions = []
      for batch in tqdm(iterator):
        prediction = model(multi_dict(batch[0], vocabolario1,work_on_images=False), multi_dict(batch[1], vocabolario2)[0], multi_dict(batch[1], vocabolario2)[1].squeeze(-1)) #id_caption e id_img
        prediction = F.softmax(prediction, dim = -1)[0]
        positions.append(ranking_tracker(prediction,batch))

 
      
  return positions 

In [None]:
positions_ys_true = positive_scores(model,dataloader,criterion)

In [None]:
#define recall at k using positions
def recall_at_k(positions, k):
  count=0
  for i in positions:
    if i <=k:
      count+=1
  print(f"Recall at {k} is {count/len(positions)}")
  return count/len(positions)


In [None]:
recall_at_k(positions_ys_true,125)

In [None]:
#create dataframe to see results
df_recall = pd.DataFrame(columns =["dataset", "recall_1", "recall_5", "recall_10"])

In [None]:
df_recall=df_recall.append({"dataset":1,"recall_1":recall_at_k(positions_ys_true, 1),"recall_5":recall_at_k(positions_ys_true, 5),"recall_10":recall_at_k(positions_ys_true, 10)},ignore_index=True)