In [1]:
from openai import OpenAI
import base64
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from PIL import Image
import io
import base64
import os
client = OpenAI()
import numpy as np
from tqdm.notebook import tqdm
def pil_to_base64(pil_image):
    img_byte_arr = io.BytesIO()
    pil_image.save(img_byte_arr, format='PNG')
    img_encoded_str = base64.b64encode(img_byte_arr.getvalue()).decode('ascii')
    return img_encoded_str


def gpt_call_single(query, model):
    response = client.chat.completions.create(
            model=model,
            messages=[
                {
                    "role": "user",
                    "content": query
                }
            ],
            max_tokens=2048,
        )
    return response.choices[0].message.content


def extract_answer_from_model_response(model_response):
    import re
    match = re.search(r'\\boxed\{.*?\b([A-D]|yes|no)\b.*?\}', model_response)
    
    return match.group(1) if match else "Z"


# convert PIL image to base64
def pil_to_base64(pil_image):
    import io
    import base64
    img_byte_arr = io.BytesIO()
    pil_image.save(img_byte_arr, format='PNG')
    img_encoded_str = base64.b64encode(img_byte_arr.getvalue()).decode('ascii')
    return img_encoded_str



def test_gpt_on_VisSim_va(dataset_name, output_dir, model, index, max_tokens=2048, debug=False):

    from datasets import load_dataset
    dataset = load_dataset(f"VisSim/{dataset_name}")
    dataset = dataset['train']
    d_index = list(range(len(dataset)))

    
    output_path = f"./{model}_{dataset_name}_{index}.json"
    from collections import defaultdict
    acc_by_type = defaultdict(float)
    acc_by_difficulty = defaultdict(float)
    counts_by_difficulty = defaultdict(int)
    counts_by_type = defaultdict(int)

    # create output dir
    answer_dict= {}
    from tqdm import tqdm
    for i in tqdm(d_index, total=len(d_index)):
        example = dataset[i]
        qid = example['qid']
        difficulty_level = example['difficulty_level']
        variant = " ".join(qid.split("_")[1:-1])
        # 'Observe the transformation pattern of Shape A through steps 0 to 1. <question_image> Apply the same transformation sequence to Shape B and determine the final shape at step 3. <image_for_B> For reference, the black dots in each panel of the figures indicate the origin. Select the correct answer choice that matches the expected transformation result. <answer_choices>'
        A_image = example['A_image']
        B_image = example['B_image']
        question_info = json.loads(example['question_info'])
        question = question_info['question']
        choice_image = example['choices']
        query = []
        prefix, question = question.strip().split("<question_image>")
        query.append({
                            "type": "text",
                            "text": prefix
                        })
        query.append({
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{pil_to_base64(A_image)}", 
                            },
                        })
        prefix, question = question.split("<image_for_B>")
        query.append({
                            "type": "text",
                            "text": prefix
                        })
        query.append({
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{pil_to_base64(B_image)}",
                            },
                        })
        prefix, question = question.split("<answer_choices>")   
        query.append({
                            "type": "text",
                            "text": prefix
                        })
        query.append({
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{pil_to_base64(choice_image)}", 
                            },
                        })
        if len(question) > 0:
            query.append({
                            "type": "text",
                            "text": question
                        })
        query.append({
                            "type": "text",
                            "text": "Please first solve the problem step by step, then put your final answer or a single letter (if it is a multiple choice question) in one \"\\boxed{}\""
                        })
        
        # print(query)
        response = gpt_call_single(query, model=model)

        pred = extract_answer_from_model_response(response)

        gt_ans = example['answer']

        answer_dict[qid] ={
            "pred": pred,
            "gt_ans": gt_ans.lower(),
            "response": response
        }
        if "ans" not in variant:
            acc_by_difficulty[difficulty_level]+= pred.lower() == gt_ans.lower()
            counts_by_difficulty[difficulty_level] += 1
        acc_by_type[variant] += pred.lower() == gt_ans.lower()
        counts_by_type[variant] += 1

        if len(answer_dict)%10==0:
            np.save(output_path, answer_dict)

    # print accuracy
    print(dataset, index)
    print("Accuracy by difficulty level:")
    for k, v in acc_by_difficulty.items():
        print(f"{k}: {v/counts_by_difficulty[k]}")

    print("Accuracy by variants:")
    for k, v in acc_by_type.items():
        print(f"{k}: {v/counts_by_type[k]}")
    
    overall_acc = sum(acc_by_difficulty.values())/sum(counts_by_difficulty.values())
    print(f"Overall accuracy: {overall_acc}")
    np.save(output_path, answer_dict)

def test_gpt_on_VisSim_text_inst(dataset_name, output_dir, model, index, max_tokens=2048,debug=False):

    from datasets import load_dataset
    dataset = load_dataset(f"VisSim/{dataset_name}")
    dataset = dataset['train']
    d_index = list(range(len(dataset)))
   


    from collections import defaultdict
    acc_by_type = defaultdict(float)
    acc_by_difficulty = defaultdict(float)
    counts_by_difficulty = defaultdict(int)
    counts_by_type = defaultdict(int)

    output_path  = f"./{model}_{dataset_name}_{index}.json"
    if os.path.exists(output_path+".npy"):
        answer_dict = np.load(output_path+".npy", allow_pickle=True).item()
    else:
        answer_dict={}
    from tqdm import tqdm
    for i in tqdm(d_index, total=len(d_index)):
        example = dataset[i]
        qid = example['qid']
        if qid in answer_dict:
            continue
        difficulty_level = example['difficulty_level']
        variant = " ".join(qid.split("_")[1:-1])
        
        images = example['images'][:-1]
        # question_info = json.loads(example['question_info'])
        question = example['question']
        choice_image = example['choices']

        # use regex to parse the question and place the images in the right spots
        query = []
        for i, image in enumerate(images):
            if i == 0:
                prefix, question = question.strip().split("<shapeB_image>")
            else:
                prefix, question = question.split(f"<shapeB_step_{i-1}>")
            query.append({
                            "type": "text",
                            "text": prefix
                        })
            query.append({
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{pil_to_base64(image)}",
                            },
                        })
        
        # replace the remaining <shapeB_image> with "" using regex
        import re
        # using wildcards to match the <shapeB_step_{i}> and replace it with ""
        uery.append({
                            "type": "text",
                            "text": re.sub(r'<shapeB_step_\d+>', '', question)
                        })
        query.append({
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{pil_to_base64(choice_image)}",
                            },
                        })
        
        query.append({
                            "type": "text",
                            "text": "Please first solve the problem step by step, then put your final answer or a single letter (if it is a multiple choice question) in one \"\\boxed{}\""
                        })
        response = gpt_call_single(query, model=model)

        pred = extract_answer_from_model_response(response)

        gt_ans = example['answer']

        answer_dict[qid]={
            "pred": pred,
            "gt_ans": gt_ans.lower(),
            "response": response
        }
        if len(answer_dict)%10==0:
            np.save(output_path, answer_dict)
        if "ans" not in variant:
            acc_by_difficulty[difficulty_level]+= pred.lower() == gt_ans.lower()
            counts_by_difficulty[difficulty_level] += 1
        acc_by_type[variant] += pred.lower() == gt_ans.lower()
        counts_by_type[variant] += 1

    # print accuracy
    np.save(output_path, answer_dict)
    print("Accuracy by difficulty level:")
    for k, v in acc_by_difficulty.items():
        print(f"{k}: {v/counts_by_difficulty[k]}")

    print("Accuracy by variants:")
    for k, v in acc_by_type.items():
        print(f"{k}: {v/counts_by_type[k]}")
    
    overall_acc = sum(acc_by_difficulty.values())/sum(counts_by_difficulty.values())
    print(f"Overall accuracy: {overall_acc}")



def test_gpt_on_folding_nets(dataset_name, output_dir, model, index, max_tokens=2048,debug=False):

    from datasets import load_dataset
    dataset = load_dataset(f"VisSim/{dataset_name}")
    dataset = dataset['train']
    d_index = list(range(len(dataset)))


    from collections import defaultdict
    pred_by_type = defaultdict(list)
    pred_by_difficulty = defaultdict(list)

    gt_by_type = defaultdict(list)
    gt_by_difficulty = defaultdict(list)

    
    output_path  = f"./{model}_{dataset_name}_{index}.json"
    if os.path.exists(output_path+".npy"):
        answer_dict = np.load(output_path+".npy", allow_pickle=True).item()
    else:
        answer_dict={}
    from tqdm import tqdm
    for i in tqdm(d_index, total=len(d_index)):
        example = dataset[i]
        qid = example['qid']
        if qid in answer_dict:
            continue
        variant = example['type']
  
        images = example['images']
        # question_info = json.loads(example['question_info'])
        question = example['question']

        # use regex to parse the question and place the images in the right spots
        query = []
        for i, image in enumerate(images):
            prefix, question = question.split(f"<image_{i}>")
            query.append({
                            "type": "text",
                            "text": prefix
                        })
            query.append({
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{pil_to_base64(choice_image)}",
                            },
                        })
        if len(question) > 0:
            query.append({
                            "type": "text",
                            "text": question + "Think step-by-step, and then put your final answer in \"\\boxed{}\"."
                        })
        else:
            query.append({
                            "type": "text",
                            "text": "Think step-by-step, and then put your final answer in \"\\boxed{}\"."
                        })
        
        
        response = gpt_call_single(query, model=model)

        pred = extract_answer_from_model_response(response.lower())

        gt_choice = example['answer'].lower()
        answer_choices = example['choices']

        gt_ans = answer_choices[int(ord(gt_choice) - ord('a'))]

        answer_dict[ qid]={
            "pred": pred,
            "gt_choice": gt_choice,
            "gt_ans": gt_ans.lower(),
            "response": response,
            "question": [q  if isinstance(q, str) else '<image>' for q in query]
        }
        if len(answer_dict)%10==0:
            np.save(output_path, answer_dict)
        correct = pred.lower() == gt_ans.lower() or pred.lower() == gt_choice.lower()
        pred_by_type[variant].append(pred.lower())
        gt_by_type[variant].append(gt_ans.lower())

    np.save(output_path, answer_dict)
    print("F1 by variants:")
    for k in pred_by_type.keys():
        from sklearn.metrics import f1_score
        print(f"{k}: {f1_score(gt_by_type[k], pred_by_type[k], average='weighted')}")

    print("Random Chance F1 by variants:")
    for k in pred_by_type.keys():
        from sklearn.metrics import f1_score
        import random
        random_pred = [random.choice(["yes", "no"]) for _ in range(len(gt_by_type[k]))]
        print(f"{k}: {f1_score(gt_by_type[k],random_pred, average='weighted')}")


model = "gpt-4o"
import os


ERROR! Session/line number was not unique in database. History logging moved to new session 79
