In [None]:
# coding=utf-8
# Copyright 2023-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModelForVision2Seq, AutoProcessor
from PIL import Image

from tqdm import tqdm
import csv

In [None]:
import os 
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [None]:
dataset = load_dataset("csv", data_files={"train" : "./ambiguous_questions_train.csv", "test" : "./ambiguous_questions_test.csv"})

In [None]:
from open_flamingo import create_model_and_transforms

model, image_processor, tokenizer = create_model_and_transforms(
    clip_vision_encoder_path="ViT-L-14",
    clip_vision_encoder_pretrained="openai",
    lang_encoder_path="anas-awadalla/mpt-7b",
    tokenizer_path="anas-awadalla/mpt-7b",
    cross_attn_every_n_layers=4
)

In [None]:
# grab model checkpoint from huggingface hub
from huggingface_hub import hf_hub_download
import torch

checkpoint_path = hf_hub_download("openflamingo/OpenFlamingo-9B-vitl-mpt7b", "checkpoint.pt")
model.load_state_dict(torch.load(checkpoint_path), strict=False)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model.to(device)
model.eval()


In [None]:
from collections import Counter

with open("./ambiguous_questions_train.csv", 'r') as f:
    reader = csv.reader(f)
    train_lines = [line for line in reader]


with open("./ambiguous_questions_test.csv", 'r') as f:
    reader = csv.reader(f)
    test_lines = [line for line in reader]
    
    
  
num_few_shot_examples = 32
#promt = f"Instructions : Classify the following into ambigous and definite question. An ambiguous question is unanswerable question considering image. "

few_shot_image_ids = [train_lines[idx][5] for idx in range(1,num_few_shot_examples + 1)]
few_shot_images = [Image.open("./images/" + str(image_id) + ".jpg").convert('RGB') for image_id in few_shot_image_ids]

vision_x = [image_processor(demo_image).unsqueeze(0) for demo_image in few_shot_images]

labels = ["ambiguous" if train_lines[idx][6] == "O" else "definite" for idx in range(1, 101)]

few_shot_examples_ids = []
counter_list = []
a_count = 0
d_count = 0
for idx, label in enumerate(labels):
    if a_count + d_count == num_few_shot_examples:
        break
    
    if label == "ambiguous":
        if a_count < 16:
            few_shot_examples_ids.append(idx)
            a_count += 1
            counter_list.append(label)
    else:
        
        if d_count < 16:
            few_shot_examples_ids.append(idx)
            d_count += 1
            
            counter_list.append(label)
            
assert num_few_shot_examples == len(few_shot_examples_ids)

print(Counter(counter_list))
    

tokenizer.padding_side = "left" # For generation padding tokens should be on the left

text = [f"The question of <image> is '{train_lines[idx+1][1]}'. Is the question is ambiguous or definite? Answer: {labels[idx]} <|endofchunk|>"  for idx in few_shot_examples_ids]
text = "".join(text)

print(text)


In [None]:
predictions = []
for idx, line in tqdm(enumerate(test_lines)):
    if idx == 0:
        continue
    question = line[1]
    image_id = line[5]
    image = Image.open("./images/" + str(image_id) + '.jpg').convert('RGB')
    input_prompt = text + f"The question of <image> is '{question}'. Is the question is ambiguous or definite? Answer:"
    
    # print(input_prompt)
    
    input_images = vision_x + [image]
    input_images = torch.cat(vision_x, dim=0)
    input_images = input_images.unsqueeze(1).unsqueeze(0)
    
    lang_x = tokenizer(
        [input_prompt] ,
        return_tensors="pt",
    )
    
    generated_text = model.generate(
    vision_x=input_images.to(device),
    lang_x=lang_x["input_ids"].to(device),
    attention_mask=lang_x["attention_mask"].to(device),
    max_new_tokens=2,
    num_beams=4,
    )
    
    #print(tokenizer.batch_decode(lang_x["input_ids"],skip_special_tokens=False))
    #print("Input prompt: ", input_prompt)
    
    decoded_text =  tokenizer.decode(generated_text[0], skip_special_tokens=True)
    if idx < 10:
        print("question: ", question)
        print("Generated text: ",decoded_text)
        
    answer = decoded_text.split("Answer:")[-1].strip()

    #inputs = processor(image, input_promt, return_tensors="pt").to(device)
    
    #out = model.generate(**inputs)
    #prediction = processor.decode(out[0], skip_special_tokens=True)
    #print(prediction)
    predictions.append(answer)
    
with open ("./test_fewshot_flamingo_balance.csv", 'w') as f:
        
    writer = csv.writer(f)
    for idx, line in enumerate(test_lines):
        if idx == 0:
            writer.writerow(line)
        else:
            line.append(predictions[idx-1])
            writer.writerow(line)
                
            

