In [None]:
#In the last update gdown has problems, to solve i use a old version of it
!pip install gdown==4.6.0


Collecting gdown==4.6.0
  Downloading gdown-4.6.0-py3-none-any.whl.metadata (4.4 kB)
Downloading gdown-4.6.0-py3-none-any.whl (14 kB)


In [None]:

import gdown
from zipfile import ZipFile
import numpy as np
import torch
from PIL import Image
import os
from os.path import join as pj
import pandas as pd
import logging
from typing import List, Dict
from tqdm import tqdm


In [None]:
!mkdir test_data
!mkdir test_images

In [None]:
#import the file needed
# test data
#https://drive.google.com/file/d/1o6FV3XKC0aHxPvLWcY5JH1ttk78PEgi0/view?usp=sharing


# test images
# https://drive.google.com/file/d/1Gv_CZNoVKR5rTENnJPH4XwlUUwTodXmd/view?usp=sharing



url = 'https://drive.google.com/uc?id=1o6FV3XKC0aHxPvLWcY5JH1ttk78PEgi0'
output = 'test.data.v1.1.gold.zip'  # Optional: specify the output filename
gdown.download(url, output, quiet=False)


url = 'https://drive.google.com/uc?id=1Gv_CZNoVKR5rTENnJPH4XwlUUwTodXmd'
output = 'test_images_resized.zip'  # Optional: specify the output filename
gdown.download(url, output, quiet=False)


Downloading...
From: https://drive.google.com/uc?id=1o6FV3XKC0aHxPvLWcY5JH1ttk78PEgi0
To: /content/test.data.v1.1.gold.zip
100%|██████████| 36.8k/36.8k [00:00<00:00, 64.0MB/s]
Downloading...
From: https://drive.google.com/uc?id=1Gv_CZNoVKR5rTENnJPH4XwlUUwTodXmd
To: /content/test_images_resized.zip
100%|██████████| 600M/600M [00:04<00:00, 146MB/s]


'test_images_resized.zip'

In [None]:
#extract zip test data
with ZipFile("/content/test.data.v1.1.gold.zip",'r') as zObject:
  zObject.extractall(path="/content/test_data")

In [None]:
#extract zip test images
with ZipFile("/content/test_images_resized.zip",'r') as zObject:
  zObject.extractall(path="/content/test_images")

#UTILITY FUNCTIONS

In [None]:
def load(dirpath, datapath,dirimages):
  goldpath = datapath.replace('data','gold')

  concepts = []
  context_sentences = []
  candidates = []
  golds = []

  data = [] # list of dictionary

  with open(dirpath+datapath,'r') as file_d:

    for line in file_d.readlines():
      splitted_line = line.split('\t')

      concepts.append(splitted_line[0])
      context_sentences.append(splitted_line[1])

      l_candidates = splitted_line[2:] #list of candidates

      if '\n' in l_candidates[-1]: #eliminate the \n in the last candidate
        l_candidates[-1] = l_candidates[-1][:-1]


      l_candidates = [str(dirimages) + str(elem) for elem in l_candidates] #add path for each images
      candidates.append(l_candidates)


  with open(dirpath+goldpath,'r') as file_g:

    for line in file_g.readlines():

      splitted_line = line.split('\t')
      golds.append(splitted_line[0][:-1])

  for i in range(len(concepts)):
    data.append({'target_word': concepts[i], 'target_phrase':context_sentences[i],'candidate_images':candidates[i]})


  return data,golds


# dir_path = "test_data/"

# data_paths = ["en.test.data.v1.1.txt", "it.test.data.v1.1.txt","fa.test.data.txt"]
# dir_images = 'test_images/test_images_resized/'



def data_loader(dirpath,dirimages,list_datapaths):
  data = dict()
  gold = dict()
  for datapath in list_datapaths:
    language = datapath.split('.')[0]
    data[language], gold[language] = load(dirpath,datapath,dirimages)
  return data,gold






# concepts_en,context_sentences_en,candidates_en,golds_en = data_loader(dir_path, data_paths[0])
# concepts_it,context_sentences_it,candidates_it,golds_it = data_loader(dir_path, data_paths[1])
# concepts_fa,context_sentences_fa,candidates_fa,golds_fa = data_loader(dir_path, data_paths[2])



#TAKE FROM PATH NAME OF IMAGE
def image_name(candidate_image):
  return candidate_image.split('/')[-1]


#GET ACCURACY
def get_accuracy(predictions,gold):
  total_images = len(predictions)
  correct = 0
  for pred,gol in zip(predictions,gold):
    if pred==gol:
      correct+=1
  return (correct/total_images)*100


#WRITE THE RESULTS
def write_result(predictions,language,path):
  os.makedirs(path, exist_ok=True)
  file_name = language + '.test' + '.preds' + '.txt'
  print(file_name)
  with open(path+file_name,'w') as file_write:
    for pred in predictions:
      file_write.write(str(pred)+'\n')




#DEFINED FOR CLIP AND MULTICLIP
def inference_clip(params,model):
  # promtp_type 0 -> prompt augmented baseline
  # promtp_type 1 -> original prompt
  # promtp_type 2 -> prompt augmented wordnet
  opt = params
  data, gold = data_loader(opt.data_dir,opt.images_dir,opt.data_paths)
  my_pred = []
  data_lang = data[opt.language]
  for n,d in tqdm(enumerate(data_lang), total = len(data_lang), leave = True):

    prompt_list = []

    if opt.prompt_type == 0:
      for input_type in opt.input_type:
        prompt_list += [(p.replace("<>", d[input_type]),input_type, p) for p in opt.prompt]
      texts = [p[0] for p in prompt_list]
    elif opt.prompt_type == 1:
      for input_type in opt.input_type:
        prompt_list += [(p.replace("<>", d[input_type]),input_type, p) for p in opt.original_prompt]
      texts = [p[0] for p in prompt_list]
    else:
      prompt_list += augment_texts(d['target_word'],d['target_phrase'])
      original_p = [d['target_word'], d['target_phrase']]
      texts = original_p + prompt_list
      # print(f'\n prompt_list : {prompt_list}')
      # texts

    images= d['candidate_images']


    sim = model.get_similarity(texts=texts,images=images, batch_size=opt.batch_size)

    best_image_index = sim.argmax()%10

    best_image_path = images[best_image_index]
    best_image_name = image_name(best_image_path)

    my_pred.append(best_image_name)

  return my_pred,data,gold[opt.language]






In [None]:

# for i in result_data['en']:
#   c = i['candidate_images']
#   for item in c:
#     print(item.split('/')[-1])
#     break

#AUGMENTATION

In [None]:
import nltk
nltk.download('wordnet')
from nltk.corpus import wordnet as wn

def augment_texts(target_word,target_phrase):
  synsets_set = set()
  synsets_tw = wn.synsets(target_word)
  print(f'synsets_tw  -> {synsets_tw}')


  word_phrases = target_phrase.split()
  synsets_tp = {synset for w in word_phrases if w != target_word for synset in wn.synsets(w)}


  print(f'synset_tp  -> {synsets_tp}')

  synsets_set.update(synsets_tw)
  synsets_set.update(synsets_tp)


  print(f'synset_set  -> {synsets_set}')

  sentences = set()

  for synset in synsets_set:
    for lemma in synset.lemmas():
      sentences.add(lemma.synset().definition())

  return list(sentences)





  # for synset in def_tw:
  #   for lemma in synset.lemmas():
  #     print(lemma.synset().definition())



augmented_texts = augment_texts('goal','football goal')
print(f'augmented_texts {augmented_texts}')




[nltk_data] Downloading package wordnet to /root/nltk_data...


synsets_tw  -> [Synset('goal.n.01'), Synset('finish.n.04'), Synset('goal.n.03'), Synset('goal.n.04')]
synset_tp  -> {Synset('football.n.01'), Synset('football.n.02')}
synset_set  -> {Synset('football.n.02'), Synset('goal.n.01'), Synset('goal.n.04'), Synset('football.n.01'), Synset('goal.n.03'), Synset('finish.n.04')}
augmented_texts ['the state of affairs that a plan is intended to achieve and that (when achieved) terminates behavior intended to achieve it', 'the place designated as the end (as of a race or journey)', 'game equipment consisting of the place toward which players of a game try to advance a ball or puck in order to score points', "any of various games played with a ball (round or oval) in which two teams try to kick or carry or propel the ball into each other's goal", 'a successful attempt at scoring', 'the inflated oblong ball used in playing American football']


In [None]:
word_to_search = 'goal'
if augmented_texts:
    print(f"Definizioni di '{word_to_search}':")
    for i, definition in enumerate(augmented_texts, 1):
        print(f"{i}. {definition}")
else:
    print(f"Nessuna definizione trovata per '{word_to_search}'.")


Definizioni di 'goal':
1. the inflated oblong ball used in playing American football
2. the state of affairs that a plan is intended to achieve and that (when achieved) terminates behavior intended to achieve it
3. a successful attempt at scoring
4. game equipment consisting of the place toward which players of a game try to advance a ball or puck in order to score points
5. any of various games played with a ball (round or oval) in which two teams try to kick or carry or propel the ball into each other's goal
6. the place designated as the end (as of a race or journey)


In [None]:
#test for goal and football goal

# PARAMETERS


In [None]:
#this class is needed to pass parameters, instead of pass it through command line
class parameters:
  data_dir = 'test_data/'
  images_dir = 'test_images/test_images_resized/'
  data_paths = ["en.test.data.v1.1.txt", "it.test.data.v1.1.txt","fa.test.data.txt"]
  prompt_type = 0
  language = 'en'
  output_dir = "result"
  original_prompt = ['<>'] #if we want use only target_word or only target_phrase
  prompt = ['<>', 'This is <>.', 'Example of an image caption that explains <>.'] #prompt to be used in text embedding (specify the placeholder by <>)
  input_type = ['target_word', 'target_phrase'] # input text type
  batch_size = None
  plot = True
  print_sleep = 100
  device = 'cuda' if torch.cuda.is_available() else 'cpu'

#gli altri sono inutili cambia solo un parametro

# class params_it:
#   data_dir = 'test_data/'
#   images_dir = 'test_images/test_images_resized/'
#   language = 'it'
#   model_load = None
#   output_dir = "result"
#   original_prompt = ['<>'] #if we want use only target_word or only target_phrase
#   prompt = ['<>', 'This is <>.', 'Example of an image caption that explains <>.'] #prompt to be used in text embedding (specify the placeholder by <>)
#   input_type = ['target_word', 'target_phrase'] # input text type
#   batch__size = None
#   plot = True
#   print_sleep = 100
#   device = 'cuda' if torch.cuda.is_available() else 'cpu'

# # class params_fa:
#   data_dir = 'test_data/'
#   images_dir = 'test_images/test_images_resized/'
#   data_paths = ["en.test.data.v1.1.txt", "it.test.data.v1.1.txt","fa.test.data.txt"]
#   language = 'fa'
#   model_load = None
#   output_dir = "result"
#   original_prompt = ['<>'] #if we want use only target_word or only target_phrase
#   prompt = ['<>', 'This is <>.', 'Example of an image caption that explains <>.'] #prompt to be used in text embedding (specify the placeholder by <>)
#   input_type = ['target_word', 'target_phrase'] # input text type
#   batch_size = None
#   plot = True
#   print_sleep = 100
#   device = 'cuda' if torch.cuda.is_available() else 'cpu'

#CLIP

In [None]:
from transformers import CLIPProcessor, CLIPModel

In [None]:
#MODEL CLIP from github (only english)
# not so useful, maybe delete
def to_batch(inputs: Dict, batch_size: int = None):
    size = len(list(inputs.values())[0])
    batch_size = size if batch_size is None or batch_size > size else batch_size
    block = list(range(0, size, batch_size)) + [size]
    batch_data = []
    for s, e in zip(block[:-1], block[1:]):
        batch_data.append({k: v[s:e] for k, v in inputs.items()})
    return batch_data


class CLIP:

    def __init__(self, model: str = 'openai/clip-vit-large-patch14-336'):
        """ Huggingface CLIP Warapper

        :param model: CLIP model on huggingface
            - 'openai/clip-vit-large-patch14'
            - 'openai/clip-vit-base-patch32'
            - 'openai/clip-vit-large-patch14-336'
            - 'openai/clip-vit-base-patch16'
        """
        self.model = CLIPModel.from_pretrained(model).eval()
        self.processor = CLIPProcessor.from_pretrained(model)
        self.config = self.model.config.to_dict()
        self.device = 'cuda' if torch.cuda.device_count() > 0 else 'cpu'
        self.parallel = torch.cuda.device_count() > 1
        assert not self.parallel, "Processing on multiple GPUs is not supported"
        # if self.parallel:
        #     self.model = torch.nn.DataParallel(self.model)
        self.model.to(self.device)
        self.cos = torch.nn.CosineSimilarity(dim=2, eps=1e-6)

        logging.info('** LOAD MODEL ** ')
        logging.info(f'\tDevice: {self.device} ({torch.cuda.device_count()} gpus)')
        logging.info(f"\tModel parameters: {np.sum([int(np.prod(p.shape)) for p in self.model.parameters()]):,}")
        logging.info(f"\tInput resolution: {self.config['vision_config']['image_size']}")
        logging.info(f"\tContext length: {self.config['text_config']['max_position_embeddings']}")
        logging.info(f"\tVocab size: {self.config['text_config']['vocab_size']}")

    def get_similarity(self, images: List or str, texts: List or str, batch_size: int = None):
        """ get embedding

        :param images: a list of images to get embedding
        :param texts: a list of texts to get embedding
        :param batch_size: batch size
        :return: (output_image_embedding, output_text_embedding, sim)
            - output_image_embedding: a tensor of image embedding (image size x output dim)
            - output_text_embedding: a tensor of text embedding (text size x output dim)
            - sim: a tensor of similarity (image size x text size)
        """

        # self.model(eval)

        images = [images] if type(images) is str else images
        texts = [texts] if type(texts) is str else texts

        logging.debug(f'model inference on images: {len(images)}')
        pil_images = [Image.open(i).convert("RGB") for i in images]
        image_inputs = self.processor(images=pil_images, return_tensors="pt", padding=True)
        batch_image_inputs = to_batch(image_inputs, batch_size=batch_size)

        # print(f'batch_image_inputs = {batch_image_inputs}')



        with torch.no_grad():
            output_image_embedding = []
            for i in batch_image_inputs:
                output_image_embedding.append(
                    self.model.get_image_features(**{k: v.to(self.device) for k, v in i.items()})
                )
            output_image_embedding = torch.cat(output_image_embedding)
        logging.debug(f'model inference on texts: {len(texts)}')
        text_inputs = self.processor(text=texts, return_tensors="pt", padding=True)
        batch_text_inputs = to_batch(text_inputs, batch_size=batch_size)


        # print(f'batch_text_inputs = {batch_text_inputs}')



        with torch.no_grad():
            output_text_embedding = []
            for i in batch_text_inputs:
                output_text_embedding.append(
                    self.model.get_text_features(**{k: v.to(self.device) for k, v in i.items()})
                )
        output_text_embedding = torch.cat(output_text_embedding)
        logging.debug('compute similarity')
        sim = self.cos(
            output_image_embedding.unsqueeze(1).repeat((1, len(output_text_embedding), 1)),
            output_text_embedding.unsqueeze(0).repeat((len(output_image_embedding), 1, 1))
        ) * 100  # image size x text size
        return sim.cpu().numpy().T

In [None]:

# opt class substitute the various command prompts parsed
# opt = params_en
# def baseline_clip(opt):

#   #TO DO: update opt adding these 3 elements
#   data, gold = data_loader(opt.data_dir,opt.images_dir,opt.data_paths)





#   # load model
#   if opt.language == 'en':
#       clip = CLIP(opt.model_clip if opt.model_clip is not None else 'openai/clip-vit-large-patch14-336')
#   else:
#       clip = MultilingualCLIP(
#           opt.model_clip if opt.model_clip is not None else 'sentence-transformers/clip-ViT-B-32-multilingual-v1')


#   #run inference
#   result = []
#   for n,d in enumerate(data[opt.language]):
#     prompt_list = []
#     for input_type in opt.input_type:
#       prompt_list += [(p.replace("<>", d[input_type]),input_type, p) for p in opt.prompt]

#     sim = clip.get_similarity(texts=[p[0] for p in prompt_list], images=d['candidate_images'], batch_size=opt.batch_size)

#     for (text, input_type, prompt_type), s in zip(prompt_list, sim):
#       tmp = sorted(zip(s, d['candidate_images']), key=lambda x: x[0], reverse=True)
#       result.append({
#           'language': opt.language,
#           'data': n,
#           'candidate': [os.path.basename(i[1]) for i in tmp],
#           'relevance': sorted(s, reverse=True),
#           'text': text,
#           'input_type': input_type,
#           'prompt': prompt_type
#       })
#   #use pandas

#   df = pd.DataFrame(result)
#   for (prompt, input_type), g in df.groupby(['prompt','input_type']):
#     path = pj(opt.output_dir, f'{prompt.replace("<>", "mask")}.{input_type}'.replace(" ", "_"))
#     os.makedirs(path, exist_ok=True)
#     with open(pj(path, f'prediction.{opt.language}.txt'), 'w') as f:
#       f.write('\n'.join(['\t'.join(x) for x in g.sort_values(by=['data'])['candidate'].to_list()]))
#     g.to_csv(pj(path, f'full_result.{opt.language}.csv'), index=False)


#   return 0

  #TO DECOMMENT IMPORT PLOT
  # if opt.plot:
  #   plot(
  #       similarity=sim,
  #       texts=[p[0] for p in prompt_list],
  #       images=d['candidate images'],
  #       export_file=pj(opt.output_dir, "visualization", opt.language, f'similarity.{n}.png')
  #   )










In [None]:
 #load model the weight is about 3gb

 clip = CLIP('openai/clip-vit-large-patch14-336')

`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["id2label"]` will be overriden.
`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["bos_token_id"]` will be overriden.
`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["eos_token_id"]` will be overriden.


In [None]:
#change parameters and call the function to do inference
opt = parameters
opt.prompt_type = 0
clip_predictions, data , gold = inference_clip(opt,clip)

100%|██████████| 463/463 [10:49<00:00,  1.40s/it]


In [None]:
#SAVE RESULTT AND PRINT ACCURACY OF THE MODEL

path_prediction_model = 'clip_result/'
write_result(clip_predictions,'en',path_prediction_model)
print(get_accuracy(clip_predictions,gold['en']))


# MULTICLIP

In [None]:
pip install -U sentence-transformers

Collecting sentence-transformers
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.0/86.0 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting sentencepiece (from sentence-transformers)
  Downloading sentencepiece-0.1.99-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: sentence-transformers
  Building wheel for sentence-transformers (setup.py) ... [?25l[?25hdone
  Created wheel for sentence-transformers: filename=sentence_transformers-2.2.2-py3-none-any.whl size=125923 sha256=5303d9f8f6762e6b992e6e098cc723ba3861c5207cb21cf40f17b10d4fecbbe0
  Stored in directory: /root/.cache/pip/wheels/62/f2/10/1e606fd5f02395388f74e7462910fe851042f97238cbbd902f
Successfully built sentence-tr

In [None]:
import logging
from typing import List
from sentence_transformers import SentenceTransformer, util
os.environ["TOKENIZERS_PARALLELISM"] = "false"

def to_batch_multi(inputs: List, batch_size: int = None):
    batch_size = len(inputs) if batch_size is None or batch_size > len(inputs) else batch_size
    block = list(range(0, len(inputs), batch_size)) + [len(inputs)]
    return [inputs[s:e] for s, e in zip(block[:-1], block[1:])]



def cosine_similarity(a, b, zero_vector_mask: float = -100):
    norm_a = sum(map(lambda x: x * x, a)) ** 0.5
    norm_b = sum(map(lambda x: x * x, b)) ** 0.5
    if norm_b * norm_a == 0:
        return zero_vector_mask
    return sum(map(lambda x: x[0] * x[1], zip(a, b)))/(norm_a * norm_b)


class MultilingualCLIP:
    """ Huggingface CLIP Wrapper """

    def __init__(self, model: str = 'sentence-transformers/clip-ViT-B-32-multilingual-v1'):
        """ Huggingface CLIP Warapper

        :param model: model name
        """
        self.img_model = SentenceTransformer('clip-ViT-B-32')
        self.text_model = SentenceTransformer(model)
        self.device = 'cuda' if torch.cuda.device_count() > 0 else 'cpu'
        self.parallel = torch.cuda.device_count() > 1
        assert not self.parallel, "Processing on multiple GPUs is not supported"
        for model in [self.img_model, self.text_model]:
            model.eval()
            model.to(self.device)

        logging.info('** LOAD MODEL ** ')
        logging.info(f'\tDevice: {self.device} ({torch.cuda.device_count()} gpus)')

    def get_similarity(self, images: List or str, texts: List or str, batch_size: int = None):
        """ get embedding

        :param images: a list of images to get embedding
        :param texts: a list of texts to get embedding
        :param batch_size: batch size
        :return: (output_image_embedding, output_text_embedding, sim)
            - output_image_embedding: a tensor of image embedding (image size x output dim)
            - output_text_embedding: a tensor of text embedding (text size x output dim)
            - sim: a tensor of similarity (image size x text size)
        """
        images = [images] if type(images) is str else images
        texts = [texts] if type(texts) is str else texts

        logging.debug(f'model inference on images: {len(images)}')
        batch = to_batch_multi([Image.open(i).convert("RGB") for i in images], batch_size=batch_size)
        with torch.no_grad():
            output_image_embedding = []
            for i in batch:
                output_image_embedding += self.img_model.encode(i).tolist()

        logging.debug(f'model inference on texts: {len(texts)}')
        batch = to_batch_multi(texts, batch_size=batch_size)
        with torch.no_grad():
            output_text_embedding = []
            for i in batch:
                output_text_embedding += self.text_model.encode(i).tolist()

        logging.debug('compute similarity')
        # text size x image size
        sim = [[cosine_similarity(i, t) for i in output_image_embedding] for t in output_text_embedding]
        return np.array(sim)

In [None]:
 opt = parameters
 m_clip = MultilingualCLIP('sentence-transformers/clip-ViT-B-32-multilingual-v1')

.gitattributes:   0%|          | 0.00/690 [00:00<?, ?B/s]

0_CLIPModel/config.json:   0%|          | 0.00/4.03k [00:00<?, ?B/s]

0_CLIPModel/merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

0_CLIPModel/preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

0_CLIPModel/special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

0_CLIPModel/tokenizer_config.json:   0%|          | 0.00/604 [00:00<?, ?B/s]

0_CLIPModel/vocab.json:   0%|          | 0.00/961k [00:00<?, ?B/s]

README.md:   0%|          | 0.00/1.88k [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

.gitattributes:   0%|          | 0.00/690 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

2_Dense/config.json:   0%|          | 0.00/115 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/1.57M [00:00<?, ?B/s]

README.md:   0%|          | 0.00/5.63k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/572 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/539M [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/371 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/341 [00:00<?, ?B/s]

In [None]:
m_clip_prediction, m_data, m_gold = inference_clip(opt,m_clip)

100%|██████████| 463/463 [01:53<00:00,  4.09it/s]


In [None]:
#
m_clip_prediction, m_data, m_gold = result

In [None]:
print(get_accuracy(m_clip_prediction,m_gold['en']))

42.98056155507559


# BLIP2 INTEGRATION



In [None]:
!pip3 install salesforce-lavis

Collecting salesforce-lavis
  Downloading salesforce_lavis-1.0.2-py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting contexttimer (from salesforce-lavis)
  Downloading contexttimer-0.3.3.tar.gz (4.9 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting decord (from salesforce-lavis)
  Downloading decord-0.6.0-py3-none-manylinux2010_x86_64.whl (13.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.6/13.6 MB[0m [31m50.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting einops>=0.4.1 (from salesforce-lavis)
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fairscale==0.4.4 (from salesforce-lavis)
  Downloading fairscale-0.4.4.tar.gz (235 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.4/

In [None]:
# drop unusefull import

import csv
import numpy as np
import torch
from PIL import Image
from torch.utils.data import DataLoader, SubsetRandomSampler
from tqdm import tqdm
# from utils.dataset_albef import SemevalDataset, collate_fn_eval
from lavis.models import load_model_and_preprocess
# import pytorch_lightning as pl
import os
import statistics
import tensorflow as tf

In [None]:
class params_en_blip2:
  data_dir = 'test_data/'
  images_dir = 'test_images/test_images_resized/'
  data_paths = ["en.test.data.v1.1.txt", "it.test.data.v1.1.txt","fa.test.data.txt"]
  language = 'en'
  model_clip = None
  model_name = 'blip2'
  output_dir = "result"
  prompt = ['<>', 'This is <>.', 'Example of an image caption that explains <>.'] #prompt to be used in text embedding (specify the placeholder by <>)
  input_type = ['target_word', 'target_phrase'] # input text type
  batch_size = None
  plot = True
  print_sleep = 100
  device = 'cuda' if torch.cuda.is_available() else 'cpu'



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

100%|██████████| 1.89G/1.89G [00:53<00:00, 38.0MB/s]


Position interpolate from 16x16 to 26x26


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

100%|██████████| 4.37G/4.37G [02:16<00:00, 34.3MB/s]


In [None]:
class BLIP2:

    def __init__(self, model:str='blip2'):

        model_, vis_processor_,txt_processor_ = load_model_and_preprocess(name = model, model_type="coco", is_eval=True,device='cuda' if torch.cuda.device_count() > 0 else 'cpu')

        self.model = model_.eval()
        self.vis_processor = vis_processor_ #for the images
        self.txt_processor = txt_processor_ #for the texts
        self.cos = torch.nn.CosineSimilarity(dim=-1, eps=1e-6) #eps se viene denominatore 0


    def get_similarity(self, images: List or str, texts: List or str, batch_size: int = None):
        """ get embedding

        :param images: a list of images to get embedding
        :param texts: a list of texts to get embedding
        :param batch_size: batch size
        :return: (output_image_embedding, output_text_embedding, sim)
            - output_image_embedding: a tensor of image embedding (image size x output dim)
            - output_text_embedding: a tensor of text embedding (text size x output dim)
            - sim: a tensor of similarity (image size x text size)
        """


        images = [images] if type(images) is str else images
        texts = [texts] if type(texts) is str else texts

        logging.debug(f'model inference on images: {len(images)}')
        open_images = [Image.open(i).convert("RGB") for i in images]



        with torch.no_grad():
          processed_images = [self.vis_processor["eval"](image).unsqueeze(0).to(opt.device) for image in open_images]
          processed__images_stacked = torch.stack(processed_images,dim=0).squeeze(1)
          images_embedded = self.model.forward_image(processed__images_stacked)

          images_embedded_ = images_embedded[0][:,0,:] # i need the sentence embedding

          processed_text = [self.txt_processor["eval"](txt) for txt in texts]


          processed_text_ = self.model.tokenizer(processed_text, padding="max_length", truncation=True, max_length=30,return_tensors='pt').to(opt.device)


          text_embedding = self.model.forward_text(processed_text_)

        result_matrix = []
        for i in range(text_embedding.shape[0]):
          row = []
          for j in range(images_embedded_.shape[0]):
            text = text_embedding[i]
            image = images_embedded_[j]
            value = self.cos(image,text)
            row.append(value)
          result_matrix.append(torch.stack(row))

        sim = torch.stack(result_matrix)

        return sim





In [None]:
opt = params_en_blip2
model_name = 'blip2'
model, vis_processor, txt_processor = load_model_and_preprocess(name=opt.model_name, model_type="coco", is_eval=True,device=opt.device)
model.eval() #disable all layer that do regularization

In [None]:
opt = parameters
blip_model = BLIP2()

Position interpolate from 16x16 to 26x26


OutOfMemoryError: CUDA out of memory. Tried to allocate 24.00 MiB. GPU 0 has a total capacty of 14.75 GiB of which 5.06 MiB is free. Process 2365 has 14.74 GiB memory in use. Of the allocated memory 14.28 GiB is allocated by PyTorch, and 338.90 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [None]:

blip_predictions, data, gold = inference_clip(opt, blip_model)

In [None]:
#THIS IS A TEST, I'M RUNNING THE MODEL FOR ONLY THE FIRST SAMPLE

data, gold = data_loader(opt.data_dir,opt.images_dir,opt.data_paths)

#run inference
result = []
data_lang = data[opt.language]
print(f'len -> {len(data_lang)}')
for n,d in tqdm(enumerate(data_lang), total = len(data_lang), leave = True):
  prompt_list = []
  for input_type in opt.input_type:
    prompt_list += [(p.replace("<>", d[input_type]),input_type, p) for p in opt.prompt]


  texts = [p[0] for p in prompt_list]
  images= d['candidate_images']

  #BLIP2 MODEL

  # print(f'texts = {texts}')
  # print(f'len texts = {len(texts)}')
  break

In [None]:
#let's apply the model on images and texts
print(type(texts))
print(images)
#Images part
open_images = [Image.open(image).convert("RGB") for image in images]
processed_images = [vis_processor["eval"](image).unsqueeze(0).to(opt.device) for image in open_images]

processed__images_stacked = torch.stack(processed_images,dim=0).squeeze(1) #[10, 3, 364, 364])
images_embedded = model.forward_image(processed__images_stacked)
# images_embedded = model.forward_image(processed__images_stacked)[0]


print(f'processed__images_stacked = {processed__images_stacked.shape}')






<class 'list'>
['test_images/test_images_resized/image.4418.jpg', 'test_images/test_images_resized/image.4416.jpg', 'test_images/test_images_resized/image.4417.jpg', 'test_images/test_images_resized/image.4413.jpg', 'test_images/test_images_resized/image.4412.jpg', 'test_images/test_images_resized/image.4415.jpg', 'test_images/test_images_resized/image.4419.jpg', 'test_images/test_images_resized/image.4414.jpg', 'test_images/test_images_resized/image.2166.jpg', 'test_images/test_images_resized/image.1150.jpg']




processed__images_stacked = torch.Size([10, 3, 364, 364])


In [None]:
#EXPLANATION OF THE MODEL BLIP2
'''
Image Preprocessing: The images are first preprocessed. This usually involves
resizing, normalization, and potentially other transformations to make them
compatible with the model's expectations. This preprocessing is often handled by vis_processor,
which you've loaded with load_model_and_preprocess.

Feature Extraction: The model then extracts features from these processed images.
This is done by passing the images through a series of convolutional layers (in a CNN-based model)
or other relevant layers depending on the architecture.

Output: The output of model.forward_image(images) is a tensor representing the extracted features from the images.
These features are a high-level, compressed representation of the input images,
capturing the essential information that the model needs for further tasks like
image classification, object detection, or in your case, likely some form of
multimodal analysis involving both images and text.
'''

for elem in images_embedded:
  print(type(elem))
  print(elem.shape)

<class 'torch.Tensor'>
torch.Size([10, 32, 768])
<class 'torch.Tensor'>
torch.Size([10, 677, 1408])


In [None]:

print(images_embedded[0].shape)
#riga 236: ind no perche' non facciamo il doppio ciclo, dot product
images_embedded_ = images_embedded[0][:,0,:] #tutti i batch, 0(i transformer ),- tutti i batch
#[cls] oshfoewifhew [sep], due token in piu'
#[cls] tag che definisce dove inizia l'input, che rappresenta tutta l'immagine che viene dopo
print(images_embedded_.shape)
#FINISH IMAGES PART

torch.Size([10, 32, 768])
torch.Size([10, 768])


In [None]:
#TEXT PART
processed_text = [txt_processor["eval"](txt) for txt in texts]


processed_text_ = model.tokenizer(processed_text, padding="max_length", truncation=True, max_length=30,return_tensors='pt').to(opt.device)


text_embedding = model.forward_text(processed_text_)




In [None]:
print(text_embedding.shape)

torch.Size([6, 768])


In [None]:
# NOW YOU HAVE TO COMPUTE COS SIMILARITY
#PSEUDOCODE
cos = torch.nn.CosineSimilarity(dim=-1, eps=1e-6) #eps se viene denominatore 0
result_matrix = []
for i in range(text_embedding.shape[0]):
  row = []
  for j in range(images_embedded_.shape[0]):
    text = text_embedding[i]
    image = images_embedded_[j]
    value = cos(image,text)
    row.append(value)
  result_matrix.append(torch.stack(row))

sim = torch.stack(result_matrix)

In [None]:
print(result_matrix.shape)
print(result_matrix) # il mio sim
best_image = sim.argmax()%10
display(open_images[best_image])