In [None]:
%cd ../../data/evaluation_data

In [None]:
import transformers
from transformers import InstructBlipProcessor, InstructBlipForConditionalGeneration
from transformers import Blip2Processor, Blip2ForConditionalGeneration
import torch
import os
from PIL import Image
import numpy as np
import pandas as pd
import gc
import pickle as pkl
import random
import json
import requests
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, accuracy_score

In [None]:
processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl")
model = Blip2ForConditionalGeneration.from_pretrained("Salesforce/blip2-flan-t5-xl", load_in_4bit=True, device_map="auto")

In [None]:
class AVEDataset(Dataset):
    def __init__(self, df, img_dir, name_to_value, text_only=False, image_only=False):
        self.df = df
        self.img_dir = img_dir
        self.name_to_value = name_to_value
        self.text_only = text_only
        self.image_only = image_only

        self.texts = []
        self.images = []
        self.labels = []
        self.ids = []

        if self.image_only:
          self.df['prompt'] = self.df.apply(lambda row: f"Question: What is {row['attribute_names']} of this product?\nYou must only answer the question with exactly one of the following options {self.name_to_value[row['attribute_names']]}. \nAnswer:", axis=1)
        else:
          self.df['prompt'] = self.df.apply(lambda row: f"Question: What is {row['attribute_names']} of this product?\nContext: [Category] {row['category']} {row['texts']}.\nYou must only answer the question with exactly one of the following options {self.name_to_value[row['attribute_names']]}. \nAnswer:", axis=1)

        for row in self.df.itertuples():
          id = row.id
          img_file = f'{id}.jpg'
          img_path = os.path.join(self.img_dir, img_file)
          if not os.path.exists(img_path):
            continue

          self.texts.append(row.prompt)
          self.ids.append(row.id)
          self.images.append(img_path)
          self.labels.append(row.attribute_values)


    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text, img_path = self.texts[idx], self.images[idx]
        label = self.labels[idx]
        id = self.ids[idx]

        if self.text_only:
          return text, label, id

        else:
          image = Image.open(img_path)
          if image.mode == 'L':
            image = image.convert('RGB')

          image = np.array(image)

        return text, label, id, image

In [None]:
def collate_fn(batch):
  prompts = []
  labels = []
  images = []
  ids = []

  for item in batch:
    prompts.append(item[0])
    labels.append(item[1])
    ids.append(item[2])

    if len(item) == 4:
      images.append(item[3])


  if len(images) == 0:
    inputs = processor(text=prompts, padding=True, return_tensors="pt").to("cuda")

  else:
    inputs = processor(images=images, text=prompts, padding=True, return_tensors="pt").to("cuda")

  return inputs, labels, ids

In [None]:
def inference(loader):
  preds = []
  labels = []
  ids = []

  for inputs, label, id in loader:
    generated_ids = model.generate(
          **inputs,
          num_beams=5,
          max_new_tokens=17,
          min_length=1,
          do_sample=True,
          top_p=0.8,
          repetition_penalty=1.0,
          length_penalty=1.0,
          temperature=1,
    )
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)

    preds.extend(generated_text)
    labels.extend(label)
    ids.extend(id)

  return preds, labels, ids

In [None]:
def calculate_metrics(predictions, targets):
  binary_predictions = [1 if target.lower() in prediction.lower() else 0 for target, prediction in zip(targets, predictions)]
  binary_targets = [1] * len(targets)

  accuracy = sum(binary_predictions)
  micro_f1 = f1_score(binary_targets, binary_predictions, average='micro')
  return accuracy, micro_f1

In [None]:
def combine_lists(ids, preds):
  res = {}
  for i in range(len(ids)):
    res[ids[i]] = preds[i]

  return res

def seed_everything(seed=42):
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)  
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

seed_everything(42)

In [None]:
file_names = [
    'Clothing_annotated_final.tsv',
    'Jewlery_and_General_Apparel_annotated_final.tsv',
    'Footwear_annotated_final.tsv',
    'Home_annotated_final.tsv',
    'Food_annotated_final.tsv'
]

In [None]:
img_dir = "images"
for dataset in file_names:
  if "Food" in dataset:
    with open("options_Food.json", "r") as f:
      name_to_val = json.load(f)

  elif "Home" in dataset:
    with open("options_Home.json", "r") as f:
      name_to_val = json.load(f)

  else:
    with open("options_Clothing_Shoes_and_Jewelry.json", "r") as f:
      name_to_val = json.load(f)

  data = pd.read_csv(f'texts/{dataset}', sep='\t')


  both_dataset = AVEDataset(data, img_dir, name_to_val, text_only=False, image_only=False)
  both_loader = DataLoader(both_dataset, batch_size=2, collate_fn=collate_fn)
  both_preds, both_labels, both_ids = inference(both_loader)
  both_acc, both_micro_f1 = calculate_metrics(both_preds,both_labels)
  both_output = combine_lists(both_ids, both_preds)


  # Replace with path to save results
  with open(f'results/blip_xxl_preds_both_{dataset[:-4]}.pkl', 'wb') as f:
      pkl.dump(both_output, f)


  print(f'For {dataset[:-4]} and both modalities micro_f1 was {both_micro_f1}')


# Prompt Testing

In [None]:
class AVEDataset(Dataset):
    def __init__(self, df, img_dir, name_to_value, custom_prompt, text_only=False, image_only=False):
        self.df = df
        self.img_dir = img_dir
        self.name_to_value = name_to_value
        self.text_only = text_only
        self.image_only = image_only

        self.texts = []
        self.images = []
        self.labels = []
        self.ids = []

        def apply_custom_prompt(row):
          return custom_prompt.format(
              attribute_names=row['attribute_names'],
              category=row['category'],
              texts=row['texts'],
              options=name_to_value[row['attribute_names']]
          )

        if self.image_only:
          self.df['prompt'] = self.df.apply(lambda row: f"Question: What is {row['attribute_names']} of this product?\nYou must only answer the question with exactly one of the following options {self.name_to_value[row['attribute_names']]}. \nAnswer:", axis=1)
        else:
          self.df['prompt'] = self.df.apply(apply_custom_prompt, axis=1)

        for row in self.df.itertuples():
          id = row.id
          img_file = f'{id}.jpg'
          img_path = os.path.join(self.img_dir, img_file)
          if not os.path.exists(img_path):
            continue

          self.texts.append(row.prompt)
          self.ids.append(row.id)
          self.images.append(img_path)
          self.labels.append(row.attribute_values)


    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text, img_path = self.texts[idx], self.images[idx]
        label = self.labels[idx]
        id = self.ids[idx]

        if self.text_only:
          return text, label, id

        else:
          image = Image.open(img_path)
          if image.mode == 'L':
            image = image.convert('RGB')

          image = np.array(image)

        return text, label, id, image

In [None]:
custom_prompts = [
    "Question: What is {attribute_names} of this product?\nContext: [Category] {category} {texts}.\nYou must only answer the question with exactly one of the following options {options}. \nAnswer:",
    "What is {attribute_names} of this product?[Category] {category} {texts}.Answer with the option from the given choices directly: {options}. \nAnswer:",
    "[Category] {category} {texts}. What is {attribute_names} of this product? Answer with the option from the given choices directly: {options}.",
    "[Category] {category} {texts}. What is {attribute_names} of this product based on the given information and the given image? Answer with the option from the given choices directly: {options}.",
    "[Category] {category} {texts}. Which one of {options} is the {attribute_names} of this product? Answer with the option from the given choices directly.",
    "{texts}. What is the {attribute_names} of this product? Answer with the option from the given choices directly: {options}.",
    "{texts}. Based on the description and the image, what is the {attribute_names} of this product? Answer with the option from the given choices directly: {options}.",
    "What is the {attribute_names} of this product: {texts}? Answer with the option from the given choices directly: {options}."
]

In [None]:
img_dir = "images"
dataset = "texts/Clothing_annotated_final.tsv"
with open("options_Clothing_Shoes_and_Jewelry.json", "r") as f:
    name_to_val = json.load(f)

for i, prompt in enumerate(custom_prompts):
  data = pd.read_csv(f'{dataset}', sep='\t')

  both_dataset = AVEDataset(data, img_dir, name_to_val, prompt, text_only=False, image_only=False)
  both_loader = DataLoader(both_dataset, batch_size=2, collate_fn=collate_fn)
  both_preds, both_labels, both_ids = inference(both_loader)
  both_acc, both_micro_f1 = calculate_metrics(both_preds,both_labels)

  print(f'For prompt {i} the micro_f1 was {both_micro_f1}')

  torch.cuda.empty_cache()
  gc.collect()