In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#  %%capture
!pip install datasets
!pip install accelerate
!pip install xlsxwriter

In [None]:
import random
from datasets import load_dataset

dataset = load_dataset("HuggingFaceM4/A-OKVQA")

In [None]:
from matplotlib import pyplot as plt

def imshow(img, ax=None, caption = ""):
    if ax is None:
        fig, ax = plt.subplots()
    ax.imshow(img)
    ax.set_title(caption)
    ax.axis('off')
    return ax

In [None]:
def plot_images_with_captions(images, captions):
    fig, axes = plt.subplots(nrows=len(images), ncols=1, figsize=(10, 10))
    for i, (img, caption) in enumerate(zip(images, captions)):
        ax = imshow(img, axes[i])
        ax.set_title(caption)
    plt.show()

images = [dataset['validation'][i]['image'] for i in range(2)]
captions = [dataset['validation'][i]['question'] for i in range(2)]
plot_images_with_captions(images, captions)

images = [dataset['train'][i]['image'] for i in range(2)]
captions = [dataset['train'][i]['question'] for i in range(2)]
plot_images_with_captions(images, captions)


# Load model 🏋

In [None]:
import torch
# Load model directly
from transformers import AutoProcessor, AutoModelForPreTraining, LlavaForConditionalGeneration

MODEL_ID = "llava-hf/llava-1.5-7b-hf"
model = LlavaForConditionalGeneration.from_pretrained("llava-hf/llava-1.5-7b-hf")
processor = AutoProcessor.from_pretrained("llava-hf/llava-1.5-7b-hf")

model.to("cuda")

In [None]:
# prompt: get total model parameters in human readable format

total_params = sum(p.numel() for p in model.parameters())
human_readable_params = f"{total_params / 1e6:.2f}M"
print(f"Total Parameters: {human_readable_params}")


# running inference on A-OKVQA 🏃

In [None]:
from typing import List, Dict, Any

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
def inference(image, question, mode = "qa", hyperparams = None, max_new_tokens=40):
    # inputs = processor(images=image, text=make_prompt({'question': question}), return_tensors="pt").to("cuda", torch.float16)
    inputs = processor(text=question, images=image, return_tensors="pt").to("cuda")#, torch.float16)
    if mode == "qa":
        outputs = model.generate(**inputs,
                                num_beams=5,
                                length_penalty=-1,
                                max_new_tokens=max_new_tokens)
                                #  max_length=max_new_tokens)
    elif mode == "rationale":
        outputs = model.generate(**inputs,
                                num_beams=5,
                                length_penalty=1.1, # choose from [1, 1.5, 2]
                                max_new_tokens=max_new_tokens,
                                )
                            #  max_length=max_new_tokens,
                            #  )
    elif mode == "qa+r": # question answering + rationale (one step)
        outputs = model.generate(**inputs,
                                #  num_beams=5,
                                 temperature=1.0,
                                #  length_penalty=1.8,
                                 max_new_tokens=max_new_tokens,
                                 )


    # Decode and print the answer
    answer = processor.decode(outputs[0], skip_special_tokens=True)
    return answer

def make_prompt(x):
    """made so we can do some preprocessing editing to the question. left for the future"""
    return f"{x['question']} Choices: {', '.join(x['choices'])}"


# Helper methods

# Generating Sheets for Annotation

In [None]:
import re

def check_answer(response, choices):
    # # Extract the last sentence of the response
    # last_sentence = re.split(r'\.|\n', response)[-1]
    # if not last_sentence: # If the first sentence is empty
    #     last_sentence = re.split(r'\.|\n', response)[-2]

    # last_paragraph = re.split(r'\n', response)[-1]
    # if last_paragraph == response:
    #     last_paragraph = last_sentence

    try:
        sentence = response.split("Thus, the answer is ")[1].split(".")[0]
        try:
            sentence = sentence.split(",")[0]
            sentence = sentence.split("as")[0]
            sentence = sentence.split("because")[0]
            sentence = sentence.split("since")[0]
            sentence = sentence.split("and not")[0]

        except IndexError:
            pass
        sentence = sentence.strip().strip('\'').strip('"').strip('`')
        # print(sentence)
        # Use re to extract the answer in sentence
        sentence = re.sub(r'[^a-zA-Z0-9\s]', '', sentence)
        # print(sentence)
        for choice in choices:
            choice = re.sub(r'[^a-zA-Z0-9\s]', '', choice)
            if choice.lower() == sentence.lower():
                return choice

        # Initialize a list to hold matches
        matches = []

        # Check for each choice in the first sentence
        for choice in choices:
            if choice.lower() in sentence.lower():
                matches.append(choice)

        # Determine the result based on matches
        if len(matches) == 1:
            return f"{matches[0]}"
        elif len(matches) > 1:
            # if one match contains all other matches, say afternoon & noon, then return that match
            for match_word in matches:
                all_contain = True
                for match_word2 in matches:
                    if match_word2 not in match_word:
                        all_contain = False
                if all_contain == True:
                    return match_word
            return f"Indeterminate answer: Multiple matches found {matches}"
        else:
            return "Incorrect answer: No matches found"
        return "Incorrect answer: No matches found"
    except IndexError:
        try:
            sentence = response.split("Thus, ")[1].split(".")[0]
        except IndexError:
            try:
                sentence = response.split(".")[0]
            except IndexError:
                sentence = response

    # Initialize a list to hold matches
    matches = []

    # Check for each choice in the first sentence
    for choice in choices:
        if choice.lower() in sentence.lower():
            matches.append(choice)

    # Determine the result based on matches
    if len(matches) == 1:
        return f"{matches[0]}"
    elif len(matches) > 1:
        return f"Indeterminate answer: Multiple matches found {matches}"
    else:
        return "Incorrect answer: No matches found"

# Example usage
response_text = """
Response: In the image, there is a large China Airlines airplane parked on the tarmac, and several cars are parked nearby. This suggests that the cars belong to airport workers who are responsible for servicing the airplane or performing other tasks related to airport operations. As a result, they have access to designated parking areas within the airport premises.
Thus, the drivers of the cars were able to park here because they are airport workers.
Thus, the answer is 'airport workers', as they are working collaborately with police and firemen.
"""
choices = ["firemen", "workers", "airport workers", "police", "postal workers"]

result = check_answer(response_text, choices)
print(result)

In [None]:
dataset['validation'][1]

In [None]:
import pandas as pd
from tqdm.auto import tqdm
from PIL import Image
import numpy as np

RES_PATH = "<hidden>"
SAVE_PATH = "<hidden>"

import os
if not os.path.exists(RES_PATH):
    os.makedirs(RES_PATH)
    os.makedirs(RES_PATH + "img\\")
    os.makedirs(RES_PATH + "result\\")

if not os.path.exists(SAVE_PATH):
    os.makedirs(SAVE_PATH)

meta_prompt =  "Please explain the reasoning behind your answer?"
# new_meta_prompt = "Identify and describe the visual markers in the image that are critical to your answer."
# one_step_prompt = "Generate the rationale for the question. End with a short sentence 'Thus, the answer is ' and then add the most probable answer. Make sure you give only one answer."

val_500_dataset = [dataset['validation'][i] for i in range(500)]

N_SAMPLES = len(val_500_dataset)

results = []
# prompt_answer_pair = {}

for idx, x in enumerate(tqdm(val_500_dataset, total = N_SAMPLES)):
    if idx >= N_SAMPLES:
        break

    # save image to make sheet
    image = x['image']

    # Show the image
    plt.imshow(image)
    plt.axis('off')
    plt.show()

    # save image
    image.save(f'{SAVE_PATH}{idx}.jpg')

    # run inference
    question = make_prompt(x) #f"{x['question']} Choices: {str(x['choices'])}"
    qa_question = f"<image>\nUSER:Question: {question}.\nASSISTANT:"
    qa_answer = inference(image, qa_question, mode = "qa", max_new_tokens=15)
    qa_answer = qa_answer[len(qa_question)-6:].strip().strip('.')

    correct_answer = x['choices'][x['correct_choice_idx']]
    print(f"Question: {qa_question}\nPredicted Answer: {qa_answer} \tCorrect Answer: {correct_answer}")
    # qa_is_correct = 1 if qa_answer.lower() == correct_answer.lower() else 0

    ra_question = f"<image>\nUSER:Question: {question}. Answer: {qa_answer}. {meta_prompt}\nASSISTANT:"
    answer = inference(image, ra_question, mode="rationale", max_new_tokens=100)
    answer = answer[len(ra_question)-6:]
    print()
    print(f"question: {ra_question}\nrationale: {answer}")

    results.append({
        'question': question,
        'correct_answer' : x['choices'][x['correct_choice_idx']],
        'predicted_answer' : qa_answer,
        'is_correct': 1 if qa_answer.lower() == x['choices'][x['correct_choice_idx']].lower() else 0,
        # 'groundtruth_rationale' : x['rationales'],
        # 'direct_answer' : x['direct_answers'],
        # 'rationale_prompt' : ra_question,
        'generated_rationale' : answer,
        'image_path': f'{SAVE_PATH}/{idx}.jpg',
    })

    # print('-'*100)

In [None]:
import xlsxwriter

RES_NAME = f"{MODEL_ID.split('/')[-1].strip()}_val_500_one_step.xlsx"
df = pd.DataFrame(results)
print(RES_PATH, RES_NAME)

with pd.ExcelWriter(RES_PATH + RES_NAME, engine='xlsxwriter') as writer:
    # Write the DataFrame to the worksheet, excluding the 'ImagePath' column if you don't want it in the Excel file.
    df.to_excel(writer, sheet_name='Sheet1', index=False)

    # Access the XlsxWriter workbook and worksheet objects from the DataFrame.
    workbook = writer.book
    worksheet = writer.sheets['Sheet1']

    # Assuming 'ImagePath' is the name of the column containing the image paths.
    # Iterate over the DataFrame to insert images.
    for index, data in df.iterrows():
        # The cell where to insert the image is in column 'U', on the row corresponding to the DataFrame's index + 2
        # (because DataFrame's index starts at 0 and Excel rows start at 1, and there's a header row).
        cell = f'U{index + 2}'
        image_path = data['image_path']

        # Insert the image.
        worksheet.insert_image(cell, image_path)
        # worksheet.insert_image(cell, image_path, {'x_scale': 0.5, 'y_scale': 0.5})


In [None]:
# Terminate this colab session
import os
os._exit(00)