In [1]:
import os
import torch
import json
import argparse
from tqdm import tqdm
import torch.optim as optim
from torchvision.utils import save_image
from PIL import Image

from training_models import MyClipEnsemble

from datasets import collate_fn_image, ImageDataset, EnsembleImageDataset
from poison_utils import L2_norm

from torchvision import transforms

[2025-03-07 07:06:44,639] [INFO] [real_accelerator.py:110:get_accelerator] Setting ds_accelerator to cuda (auto detect)


# Image Training

In [2]:
dataset_name = "mini_MathVista_grid"
method = "i2i_EnsembleAttack"
args = {
    "model": "clip",
    "poison_save_pth": f"./data/poisons/{dataset_name}+{method}",
    "iter_attack": 100,
    "lr_attack": 0.5,
    "base_data_pth": f"data/{dataset_name}/base",
    "target_data_pth": f"data/{dataset_name}/target",
    "questions_pth": f"data/{dataset_name}/questions.json",
    "temperature": 0,
    "max_new_tokens": 200,
    "eps": 8,
}
def read_json(path):
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

# encoder_model = MyClip()
encoder_model = MyClipEnsemble()
device = encoder_model.device

question_json = read_json(args["questions_pth"])
pids = question_json.keys()



base_caps = read_json(os.path.join(args["base_data_pth"], "caps.json"))
target_caps = read_json(os.path.join(args["target_data_pth"], "caps.json"))

In [3]:
# base_dataset = ImageDataset(base_caps["annotations"])
# target_dataset = ImageDataset(target_caps["annotations"])
base_dataset = EnsembleImageDataset(base_caps["annotations"])
target_dataset = EnsembleImageDataset(target_caps["annotations"])

base_dataloader = torch.utils.data.DataLoader(
    base_dataset, batch_size=1, 
    shuffle=False, collate_fn=collate_fn_image
)

target_dataloader = torch.utils.data.DataLoader(
    target_dataset, batch_size=1, 
    shuffle=False, collate_fn=collate_fn_image
)

In [4]:
import matplotlib.pyplot as plt
def show_img(img, caption):
    print(caption)
    plt.imshow(img.squeeze(0).permute(1,2,0))
    plt.axis("off")
    plt.show()

In [5]:
def i2i_attack(model, image_base, image_target, iters=100, lr=1/255, eps=8):
      '''
      optimizing x_adv to minimize emb_dist( img_embed of x_adv, img_embed of image_target ) within Lp constraint using PGD

      model: model class with image embedding functionality (e.g. CLIP, EVA)
      image_base, image_target: images between [0,1] 
      emb_dist: the distance metrics for vision embedding (such as L2): take a batch of bs image pairs as input, \
            and output EACH of pair-wise distances of the whole batch (size = [bs])

      eps: for Lp constraint
      lr: the step size. The update is grad.sign * lr
      diff_aug: using differentiable augmentation, e.g. RandomResizeCrop

      return: X_adv between [0,1]
      '''

      bs = image_base.size(0)
      device = image_base.device

      with torch.no_grad():
            embedding_targets = model.encode_image(image_target)
            embedding_targets = embedding_targets / embedding_targets.norm(dim=-1, keepdim=True)

      X_adv = image_base.clone().detach()
      X_adv.requires_grad_(True) 

      loss_best = 1e8
      X_adv_best = X_adv.clone().detach()

      # for i in tqdm(range(iters), leave=False):
      for i in range(iters):
            embs = model.encode_image(X_adv)
            embs = embs / embs.norm(dim=-1, keepdim=True)  # Normalize embeddings

            loss = L2_norm(embs, embedding_targets)
            # loss = torch.mean(torch.sum(embs * embedding_targets, dim=1))

            if loss < loss_best:
                  loss_best = loss.clone().detach()
                  X_adv_best = X_adv.clone().detach()

            # optimizer.zero_grad()
            loss.backward()

            # print losses at increments
            if i% max(int(iters/20),1) == 0:
                  # print('Iter :{} loss:{:.4f}, lr * 255:{:.4f}'.format(i,loss.item()/bs, scheduler.get_last_lr()[0]*255))
                  print('Iter :{} loss:{:.4f}, lr * 255:{:.4f}'.format(i,loss.item()/bs, lr*255))

            grad = X_adv.grad
            assert grad != None
            grad = grad / torch.mean(torch.abs(grad), dim=(1,2,3), keepdim=True)           
            
            perturbation = lr * grad.sign()
            # print(perturbation.shape)
            X_adv.data = X_adv.data.detach() + perturbation
            X_adv.data = torch.minimum(torch.maximum(X_adv, image_base - eps), image_base + eps) 
            X_adv.data = X_adv.data.clamp(0,255)
            X_adv.grad = None  

            if torch.isnan(loss):
                  print('Encounter nan loss at iteration {}'.format(i))
                  break                 

      print('Best Total loss:{:.4f}'.format(loss.item()))

      return X_adv_best, loss_best




In [6]:
def SGAImageAttack(model, image, cap, iters=100, lr=1/255, eps=8):
      '''
      optimizing x_adv to maximize emb_dist( img_embed of x_adv, text_embed of captions ) within Lp constraint using PGD

      model: model class with image and text embedding functionality (e.g. CLIP, EVA)
      image, cap: image between [0,255] (float) caption dict for that image
      emb_dist: the distance metrics for vision embedding (such as L2): take a batch of bs image pairs as input, \
            and output EACH of pair-wise distances of the whole batch (size = [bs])

      eps: for Lp constraint
      lr: the step size. The update is grad.sign * lr

      return: X_adv between [0,255]
      '''

      bs = image.size(0)
      device = image.device

      with torch.no_grad():
            embedding_targets = model.encode_text(cap['caption'])
            embedding_targets = embedding_targets / embedding_targets.norm(dim=-1, keepdim=True)

      X_adv = image.clone().detach()
      X_adv.requires_grad_(True) 

      loss_best = 1e8
      X_adv_best = X_adv.clone().detach()

      # for i in tqdm(range(iters), leave=False):
      for i in range(iters):
            embs = model.encode_image(X_adv)
            embs = embs / embs.norm(dim=-1, keepdim=True)  # Normalize embeddings

            loss = -L2_norm(embs, embedding_targets)

            if loss < loss_best:
                  loss_best = loss.clone().detach()
                  X_adv_best = X_adv.clone().detach()

            loss.backward()

            # print losses at increments
            if i% max(int(iters/20),1) == 0:
                  print('Iter :{} loss:{:.4f}, lr * 255:{:.4f}'.format(i,loss.item()/bs, lr*255))

            grad = X_adv.grad
            assert grad != None
            grad = grad / torch.mean(torch.abs(grad), dim=(1,2,3), keepdim=True)           
            
            perturbation = lr * grad.sign()
            # print(perturbation.shape)
            X_adv.data = X_adv.data.detach() - perturbation
            X_adv.data = torch.minimum(torch.maximum(X_adv, image_base - eps), image_base + eps) 
            X_adv.data = X_adv.data.clamp(0,255)
            X_adv.grad = None  

            if torch.isnan(loss):
                  print('Encounter nan loss at iteration {}'.format(i))
                  break                 

      print('Best Total loss:{:.4f}'.format(loss.item()))

      return X_adv_best, loss_best

In [17]:
def i2i_EnsembleAttack(model, image, target, iters=100, lr=1/255, eps=8):
      '''
      optimizing x_adv to maximize emb_dist( img_embed of x_adv, text_embed of captions ) within Lp constraint using PGD

      model: model class with image and text embedding functionality (e.g. CLIP, EVA)
      image, cap: image between [0,255] (float) caption dict for that image
      emb_dist: the distance metrics for vision embedding (such as L2): take a batch of bs image pairs as input, \
            and output EACH of pair-wise distances of the whole batch (size = [bs])

      eps: for Lp constraint
      lr: the step size. The update is grad.sign * lr

      return: X_adv between [0,255]
      '''

      bs = image.size(0)
      device = image.device

      with torch.no_grad():
            embedding_targets = model.encode_image(target)

      X_adv = image.clone().detach()
      X_adv.requires_grad_(True) 

      loss_best = -1e8
      X_adv_best = X_adv.clone().detach()

      # for i in tqdm(range(iters), leave=False):
      for i in range(iters):
            embs = model.encode_image(X_adv, use_grad=True)

            # loss = L2_norm(embs, embedding_targets)
            grad, loss = model.get_gradients(embs, embedding_targets, X_adv) 

            if loss > loss_best:
                  loss_best = loss.clone().detach()
                  X_adv_best = X_adv.clone().detach()

            # print losses at increments
            if i% max(int(iters/20),1) == 0:
                  print('Iter :{} loss:{:.4f}, lr:{:.4f}'.format(i,loss.item()/bs, lr))

            
            perturbation = lr * grad.sign() / 255.0
            # print(perturbation.shape)
            with torch.no_grad():
                  X_adv = X_adv + perturbation
                  X_adv = torch.min(torch.max(X_adv, image_base - eps/255.0), image_base + eps/255.0)
                  X_adv = torch.clamp(X_adv, 0, 1)
            X_adv.requires_grad_(True)

            if torch.isnan(loss):
                  print('Encounter nan loss at iteration {}'.format(i))
                  break                 

      print('Best Total loss:{:.4f}'.format(loss.item()))

      # return X_adv.clone().detach(), loss_best
      return X_adv_best, loss_best

In [8]:
# loss gets logits and maximizes entropy of each generated word for image caption

# def MaximizeInformationEntropyAttack(model, image, cap, iters=100, lr=1/255, eps=8):


### SGA Image Attack

In [9]:
# import matplotlib.pyplot as plt

# ###### Running SGA Image Attack ######
# original_sizes_list = []
# X_adv_list = []
# all_losses = []
# loss_attack_list = []

# target_path = os.path.join(args['poison_save_pth'], "SGA")

# if not os.path.exists(target_path):
#     os.makedirs(target_path)
# saved_images = set(os.listdir(target_path))
# saved_image_ids = {int(fname.split('.')[0]) for fname in saved_images if fname.endswith(('.png', '.jpg', '.jpeg'))}


# for i, (image_base, base_cap) in  tqdm(enumerate(base_dataloader), desc=f"Processing all images", total=len(base_dataloader)):
#     if base_cap['name'] in saved_image_ids:
#             print(f"{base_cap['name']} already processed for {target_cap['name']}, skipping...")
#             continue

#     print('image name = ', base_cap['name'])
#     image_base = image_base.to(device)

#     X_adv, loss_attack = SGAImageAttack(
#             model=encoder_model,
#             image=image_base,
#             cap=base_cap,
#             iters=args['iter_attack'],
#             lr=args['lr_attack'],
#             eps=args['eps']
#     )


#     ###### Save poisoned images after each batch ######
#     img_pth = os.path.join(target_path, f"{base_cap['name']}.png")
#     image_to_save = X_adv/255.0
#     print()
#     if base_cap['name'] not in saved_image_ids:  # Only save if it doesn't already exist
#         print("Max Pixel Difference between Adversarial Image and Base *255:", round(torch.max(torch.abs(X_adv-image_base)).item(), 4))
#         save_image(image_to_save.cpu(), img_pth)
#         print(f"Saved poisoned image {base_cap['name']} to {img_pth}") # mathvista

#     # show_img(((X_adv-image_base).to(torch.uint8).cpu()*10), ("Adversarial Noise", img_pth))
#     # show_img(image_to_save.cpu(), ("Adversarial Image", img_pth))

        

# print("Done.")

### i2i attack

In [10]:
# ##### Running i2i attack ######
# original_sizes_list = []
# X_adv_list = []
# all_losses = []
# loss_attack_list = []
# start_idx = 0
# end_idx = 0
# # for j, (image_target, target_cap) in  tqdm(enumerate(target_dataloader)):
# for j, (image_target, target_cap) in  tqdm(enumerate(target_dataloader), desc="Target Dataloader", position=0, leave=True):
#     target_name = target_cap["name"]
#     target_path = os.path.join(args["poison_save_pth"], target_cap["name"])
#     if not os.path.exists(target_path):
#         os.makedirs(target_path)

#     ###### Resume by checking already saved images ######
#     saved_images = set(os.listdir(target_path))
#     saved_image_ids = {int(fname.split('.')[0]) for fname in saved_images if fname.endswith(('.png', '.jpg', '.jpeg'))}

#     # show_img(image_target, ("Working on target:", target_name))

#     for i, (image_base, base_cap) in  tqdm(enumerate(base_dataloader), desc=f"Processing {target_name}", total=len(base_dataloader), position=1, leave=False):
#         if base_cap['name'] in saved_image_ids:
#                 print(f"{base_cap['name']} already processed for {target_cap['name']}, skipping...")
#                 continue

#         print('image name = ', base_cap['name'])
#         image_base, image_target = image_base.to(device), image_target.to(device)

#         X_adv, loss_attack = i2i_attack(
#                 model=encoder_model,
#                 image_base=image_base,
#                 image_target=image_target,
#                 iters=args['iter_attack'],
#                 lr=args['lr_attack'],
#                 eps=args['eps']
#         )


#         ###### Save poisoned images after each batch ######
#         img_pth = os.path.join(target_path, f"{base_cap['name']}.png")
#         image_to_save = X_adv/255.0
#         print()
#         if base_cap['name'] not in saved_image_ids:  # Only save if it doesn't already exist
#             print("Max Pixel Difference between Adversarial Image and Base *255:", round(torch.max(torch.abs(X_adv-image_base)).item(), 4))
#             save_image(image_to_save.cpu(), img_pth)
#             print(f"Saved poisoned image {base_cap['name']} to {img_pth}") # mathvista

#         # show_img(((X_adv-image_base).to(torch.uint8).cpu()*10), ("Adversarial Noise", img_pth))
#         # show_img(image_to_save.cpu(), ("Adversarial Image", img_pth))

        

# print("Done.")

## i2i Ensemble attack

In [18]:
##### Running i2i attack ######
original_sizes_list = []
X_adv_list = []
all_losses = []
loss_attack_list = []
start_idx = 0
end_idx = 0
# for j, (image_target, target_cap) in  tqdm(enumerate(target_dataloader)):
for j, (image_target, target_cap) in  tqdm(enumerate(target_dataloader), desc="Target Dataloader", position=0, leave=True):
    target_name = target_cap["name"]
    target_path = os.path.join(args["poison_save_pth"], target_cap["name"])
    if not os.path.exists(target_path):
        os.makedirs(target_path)

    ###### Resume by checking already saved images ######
    saved_images = set(os.listdir(target_path))
    saved_image_ids = {int(fname.split('.')[0]) for fname in saved_images if fname.endswith(('.png', '.jpg', '.jpeg'))}

    # show_img(image_target, ("Working on target:", target_name))

    for i, (image_base, base_cap) in  tqdm(enumerate(base_dataloader), desc=f"Processing {target_name}", total=len(base_dataloader), position=1, leave=False):
        if base_cap['name'] in saved_image_ids:
                print(f"{base_cap['name']} already processed for {target_cap['name']}, skipping...")
                continue

        print('image name = ', base_cap['name'])
        image_base, image_target = image_base.to(device), image_target.to(device)

        X_adv, loss_attack = i2i_EnsembleAttack(
                model=encoder_model,
                image=image_base,
                target=image_target,
                iters=args['iter_attack'],
                lr=args['lr_attack'],
                eps=args['eps']
        )


        ###### Save poisoned images after each batch ######
        img_pth = os.path.join(target_path, f"{base_cap['name']}.png")
        image_to_save = X_adv
        print()
        if base_cap['name'] not in saved_image_ids:  # Only save if it doesn't already exist
            print("Max Pixel Difference between Adversarial Image and Base *255:", torch.max(torch.abs(X_adv-image_base)).item()*255)
            save_image(image_to_save.cpu(), img_pth)
            print(f"Saved poisoned image {base_cap['name']} to {img_pth}") # mathvista

        # show_img(((X_adv-image_base).to(torch.uint8).cpu()*10), ("Adversarial Noise", img_pth))
        # show_img(image_to_save.cpu(), ("Adversarial Image", img_pth))

        

print("Done.")

Target Dataloader: 0it [00:00, ?it/s]

image name =  85
Iter :0 loss:0.4771, lr:0.5000
Iter :5 loss:0.6007, lr:0.5000
Iter :10 loss:0.6810, lr:0.5000
Iter :15 loss:0.7259, lr:0.5000
Iter :20 loss:0.7715, lr:0.5000
Iter :25 loss:0.7914, lr:0.5000
Iter :30 loss:0.8071, lr:0.5000
Iter :35 loss:0.8280, lr:0.5000
Iter :40 loss:0.8331, lr:0.5000
Iter :45 loss:0.8428, lr:0.5000
Iter :50 loss:0.8529, lr:0.5000
Iter :55 loss:0.8580, lr:0.5000
Iter :60 loss:0.8668, lr:0.5000
Iter :65 loss:0.8723, lr:0.5000
Iter :70 loss:0.8705, lr:0.5000
Iter :75 loss:0.8700, lr:0.5000
Iter :80 loss:0.8803, lr:0.5000
Iter :85 loss:0.8779, lr:0.5000
Iter :90 loss:0.8851, lr:0.5000
Iter :95 loss:0.8897, lr:0.5000




Best Total loss:0.8904

Max Pixel Difference between Adversarial Image and Base *255: 8.00000712275505
Saved poisoned image 85 to ./data/poisons/mini_MathVista_grid+i2i_EnsembleAttack/bar/85.png
image name =  54
Iter :0 loss:0.5550, lr:0.5000
Iter :5 loss:0.5976, lr:0.5000
Iter :10 loss:0.6423, lr:0.5000
Iter :15 loss:0.6705, lr:0.5000
Iter :20 loss:0.6905, lr:0.5000
Iter :25 loss:0.7063, lr:0.5000
Iter :30 loss:0.7240, lr:0.5000
Iter :35 loss:0.7280, lr:0.5000
Iter :40 loss:0.7399, lr:0.5000
Iter :45 loss:0.7490, lr:0.5000
Iter :50 loss:0.7591, lr:0.5000
Iter :55 loss:0.7684, lr:0.5000
Iter :60 loss:0.7769, lr:0.5000
Iter :65 loss:0.7748, lr:0.5000
Iter :70 loss:0.7802, lr:0.5000
Iter :75 loss:0.7788, lr:0.5000
Iter :80 loss:0.7824, lr:0.5000
Iter :85 loss:0.7900, lr:0.5000
Iter :90 loss:0.7931, lr:0.5000
Iter :95 loss:0.7971, lr:0.5000




Best Total loss:0.7998

Max Pixel Difference between Adversarial Image and Base *255: 8.00000712275505
Saved poisoned image 54 to ./data/poisons/mini_MathVista_grid+i2i_EnsembleAttack/bar/54.png
image name =  28
Iter :0 loss:0.5532, lr:0.5000
Iter :5 loss:0.5857, lr:0.5000
Iter :10 loss:0.6195, lr:0.5000
Iter :15 loss:0.6400, lr:0.5000
Iter :20 loss:0.6523, lr:0.5000
Iter :25 loss:0.6661, lr:0.5000
Iter :30 loss:0.6760, lr:0.5000
Iter :35 loss:0.6923, lr:0.5000
Iter :40 loss:0.6990, lr:0.5000
Iter :45 loss:0.7090, lr:0.5000
Iter :50 loss:0.7182, lr:0.5000
Iter :55 loss:0.7209, lr:0.5000
Iter :60 loss:0.7247, lr:0.5000
Iter :65 loss:0.7249, lr:0.5000
Iter :70 loss:0.7361, lr:0.5000
Iter :75 loss:0.7364, lr:0.5000
Iter :80 loss:0.7432, lr:0.5000
Iter :85 loss:0.7449, lr:0.5000
Iter :90 loss:0.7441, lr:0.5000
Iter :95 loss:0.7417, lr:0.5000




Best Total loss:0.7537

Max Pixel Difference between Adversarial Image and Base *255: 8.00000712275505
Saved poisoned image 28 to ./data/poisons/mini_MathVista_grid+i2i_EnsembleAttack/bar/28.png
image name =  82
Iter :0 loss:0.4966, lr:0.5000
Iter :5 loss:0.5589, lr:0.5000
Iter :10 loss:0.5917, lr:0.5000
Iter :15 loss:0.6045, lr:0.5000
Iter :20 loss:0.6194, lr:0.5000
Iter :25 loss:0.6345, lr:0.5000
Iter :30 loss:0.6481, lr:0.5000
Iter :35 loss:0.6541, lr:0.5000
Iter :40 loss:0.6651, lr:0.5000
Iter :45 loss:0.6718, lr:0.5000
Iter :50 loss:0.6808, lr:0.5000
Iter :55 loss:0.6904, lr:0.5000
Iter :60 loss:0.6844, lr:0.5000
Iter :65 loss:0.7012, lr:0.5000
Iter :70 loss:0.6940, lr:0.5000
Iter :75 loss:0.7125, lr:0.5000
Iter :80 loss:0.7121, lr:0.5000
Iter :85 loss:0.7181, lr:0.5000
Iter :90 loss:0.7202, lr:0.5000
Iter :95 loss:0.7242, lr:0.5000




Best Total loss:0.7253

Max Pixel Difference between Adversarial Image and Base *255: 8.00000712275505
Saved poisoned image 82 to ./data/poisons/mini_MathVista_grid+i2i_EnsembleAttack/bar/82.png
image name =  53
Iter :0 loss:0.3440, lr:0.5000
Iter :5 loss:0.4472, lr:0.5000
Iter :10 loss:0.5204, lr:0.5000


Target Dataloader: 0it [00:56, ?it/s]

Iter :15 loss:0.5458, lr:0.5000





KeyboardInterrupt: 

In [None]:
raise Exception("Here to stop execution for this section")

# Mathvista Evaluation Pipeline

### Generate Response

In [None]:
attacking_model="internvl"
captioning_model = "gpt-3.5-turbo"

In [None]:
import os
import io
import time
import argparse

from tqdm import tqdm

import sys
from poison_utils import *

from eval_models import internlm, internvl
from eval_models import gpt

from build_query import create_query_data



  warn("The installed version of bitsandbytes was compiled without GPU support. "


/home/users/mxy/miniconda3/envs/VLM_Poisoning/lib/python3.10/site-packages/bitsandbytes/libbitsandbytes_cpu.so: undefined symbol: cadam32bit_grad_fp32
[2025-01-10 13:40:15,497] [INFO] [real_accelerator.py:110:get_accelerator] Setting ds_accelerator to cuda (auto detect)


In [None]:


def verify_response(response):
    if isinstance(response, str):
        response = response.strip() 
    if response == "" or response == None:
        return False
    if "Response Error" in response:
        return False
    return True


def evaluate_code(code_string):
    # execute_code_and_capture_output
    # Backup the original stdout
    old_stdout = sys.stdout
    
    # Redirect stdout to capture the output
    new_stdout = io.StringIO()
    sys.stdout = new_stdout
    
    # Try executing the code and capture any exception
    error = None
    try:
        exec(code_string)
    except Exception as e:
        error = e
    
    # Restore the original stdout
    sys.stdout = old_stdout
    
    # Get the captured output
    captured_output = new_stdout.getvalue()
    if isinstance(captured_output, str):
        captured_output = captured_output.strip()
    
    # Return the captured output or error
    return captured_output, error


In [None]:
task_name = "mini_MathVista_grid"
class Args:
    def __init__(self, task_name, attacking_model, captioning_model):
        # Input
        self.poison_data_dir = f'data/poisons/{task_name}'
        self.input_file = 'questions.json'
        self.task_data_pth = f'data/{task_name}'

        # Output
        self.output_dir = f'results/{attacking_model}'
        self.output_file = f'output_{attacking_model}.json'

        # Model
        self.model = captioning_model
        self.key = ""

        # Query
        self.query_file = None
        # self.caption_file = '../data/texts/captions_bard.json'
        # self.ocr_file = '../data/texts/ocrs_easyocr.json'
        self.shot_type = 'solution'
        self.shot_num = 0
        self.use_caption = False
        self.use_ocr = False

        # Other settings
        self.rerun = False
        self.debug = False

    def __repr__(self):
        return f"Args({self.__dict__})"

args = Args(task_name, attacking_model, captioning_model)

In [None]:

    # load data
    input_file = os.path.join(args.task_data_pth, args.input_file)
    print(f"Reading {input_file}...")
    data = read_json(input_file)
    # load or create query data
    if args.query_file:
        query_file = os.path.join(args.task_data_pth, args.query_file)
        if os.path.exists(query_file):
            print(f"Loading existing {query_file}...")
            query_data = read_json(query_file)
    else:
        print("\nCreating new query...")
        # load caption
        caption_data = {}
        if args.use_caption:
            caption_file = args.caption_file
            if os.path.exists(caption_file):
                print(f"Reading {caption_file}...")
                try:
                    caption_data = read_json(caption_file)["texts"]
                    print("Caption data loaded.")
                except:
                    print("Caption data not found!! Please Check.")                    
        # load ocr
        ocr_data = {}
        if args.use_ocr:
            ocr_file = args.ocr_file
            if os.path.exists(ocr_file):
                print(f"Reading {ocr_file}...")
                try:
                    ocr_data = read_json(ocr_file)["texts"]
                    print("OCR data loaded.")
                except:
                    print("OCR data not found!! Please Check.")
        # create query
        query_data = create_query_data(data, caption_data, ocr_data, args)

    # output file
    os.makedirs(args.output_dir, exist_ok=True)
    output_file = os.path.join(args.output_dir, args.output_file)
    
    # load results
    if os.path.exists(output_file):
        print("\nResults already exist.")
        print(f"Reading {output_file}...")
        results = read_json(output_file)
    else:
        results = {}

    # load model
    print(f"\nLoading {args.model}...")
    if args.model == 'bard':
        if args.key == '':
            print("Loading key from environment variable")
            key = os.environ['_BARD_API_KEY']
        else:
            key = args.key
        model = bard.Bard_Model(key)
    
    elif "gpt" in args.model:
        if args.key == '':
            print("Loading token from environment variable")
            key = os.getenv("OPENAI_API_KEY")
        else:
            key = args.key
        model = gpt.GPT_Model(args.model, key)
    
    elif "claude" in args.model:
        if args.key == '':
            print("Loading token from environment variable")
            key = os.environ.get("ANTHROPIC_API_KEY")
        else:
            key = args.key
        model = claude.Claude_Model(args.model, key)
    elif "internlm" in args.model:
        model = internlm.InternLM_Model()
    elif "llava_one_v" in args.model:
        model = llava_one_v.Llava_One_V()
    elif "internvl" in args.model:
        model = internvl.InternVL_Model()
            
    
    print(f"Model loaded.")
    
    # build final test pid list
    test_pids = list(data.keys())
    print("\nNumber of test problems in total:", len(test_pids))

    available_directories = [d for d in os.listdir(args.poison_data_dir) if os.path.isdir(os.path.join(args.poison_data_dir, d))]
    target_names = read_json(os.path.join(args.task_data_pth, "target/caps.json"))
    if not all([name in available_directories for name in target_names]):
        print("Not all targets have directories. Working with:", available_directories)
    target_names = [item["name"] for item in target_names["annotations"] if item["name"] in available_directories]

    skip_pids = []
    if not args.rerun:
        print("\nRemoving problems with existing valid response...")
        for i, name in enumerate(target_names):
            skip_pids.append([])
            for pid in test_pids:
                # print(f"Checking {pid}...")
                if pid in results and 'response' in results[pid]:
                    response = results[pid][name]['response']
                    if verify_response(response):
                        # print(f"Valid response found for {pid}.")
                        skip_pids[i].append(pid)
    else:
        print("\nRerun answer extraction for all problems...")

    test_pids = [[pid for pid in test_pids if pid not in target_skip_pids] for target_skip_pids in skip_pids]
    print("Number of test problems to run for each target:", {target_names[i]: len(target_pids) for i, target_pids in enumerate(test_pids)})
    # print(test_pids)

    

    # tqdm, enumerate results
    for i, target_name in enumerate(target_names):
        for _, pid in enumerate(tqdm(test_pids[i])):
            problem = data[pid]
            query = query_data[pid]
            image = problem['image']
            image_path = os.path.join(args.poison_data_dir, target_name, image)

            if args.debug:
                print("--------------------------------------------------------------")
            print(f"\nGenerating response for {pid}...")
            try:
                response = model.get_response(image_path, query)
                new_caption = model.get_response(image_path, "describe what is in this image")
                # print(f"Response: {response}")
                if pid not in results:
                    results[pid] = problem
                if "targets" not in results[pid]:
                    results[pid]["targets"] = {}
                if target_name not in results[pid]["targets"]:
                    results[pid]["targets"][target_name] = {}

                results[pid]["targets"][target_name]['query'] = query
                results[pid]["targets"][target_name]['model_description'] = new_caption
                if args.shot_type == 'solution':
                    results[pid]["targets"][target_name]['response'] = response
                else:
                    output, error = evaluate_code(response)
                    results[pid]["targets"][target_name]['response'] = response
                    results[pid]["targets"][target_name]['execution'] = output
                    results[pid]["targets"][target_name]['error'] = str(error)
                if args.debug:
                    print(f"\n#Query: \n{query}")
                    print(f"\n#Response: \n{response}")
            except Exception as e:
                print(e)
                print(f"Error in extracting answer for {pid}")
                results[pid][target_name]['error'] = e
        
            try:
                print(f"Saving results to {output_file}...")
                save_json(results, output_file)
                print(f"Results saved.")
            except Exception as e:
                print(e)
                print(f"Error in saving {output_file}")

### Extract Answer

In [None]:
import os
import re
import time
import argparse

from tqdm import tqdm
from poison_utils import *

# OpenAI
import openai
from dotenv import load_dotenv
from prompts.ext_ans import demo_prompt

In [None]:
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

In [None]:
def verify_extraction(extraction):
    extraction = extraction.strip()
    if extraction == "" or extraction == None:
        return False
    return True


def create_test_prompt(demo_prompt, query, response):
    demo_prompt = demo_prompt.strip()
    test_prompt = f"{query}\n\n{response}"
    full_prompt = f"{demo_prompt}\n\n{test_prompt}\n\nExtracted answer: "
    return full_prompt


def extract_answer(response, problem, target_name, model=None, quick_extract=False):
    question_type = problem['question_type']
    answer_type = problem['answer_type']
    choices = problem['choices']
    query = problem[target_name]['query']
    pid = problem['pid']

    if response == "":
        return ""
    
    if question_type == 'multi_choice' and response in choices:
        return response
    
    if answer_type == "integer":
        try:
            extraction = int(response)
            return str(extraction)
        except:
            pass

    if answer_type == "float":
        try:
            extraction = str(float(response))
            return extraction
        except:
            pass

    # quick extraction
    if quick_extract:
        print("Quickly extracting answer...")
        # The answer is "text". -> "text"
        try:
            result = re.search(r'The answer is "(.*)"\.', response)
            if result:
                extraction = result.group(1)
                return extraction
        except:
            pass

    # general extraction
    try:
        full_prompt = create_test_prompt(demo_prompt, query, response)
        # extraction = model.get_chat_response(full_prompt)
        extraction = get_chat_response(full_prompt, openai.api_key)
        return extraction
    except Exception as e:
        print(e)
        print(f"Error in extracting answer for {pid}")

    return ""

    

In [None]:
args = {
    "output_dir": f"results/{model}",
    "output_file": f"output_{model}.json",
    "response_label": ,
    "llm_engine": ,
    "number": ,
    "quick_extract": ,
    "rerun": , 
    "save_every": ,
    "output_label": 
}

In [None]:
# args
label = args.response_label
result_file = os.path.join(args.output_dir, args.output_file)

if args.output_label != '':
    output_file = result_file.replace('.json', f'_{args.output_label}.json')
else:
    output_file = result_file

# read results
print(f"Reading {result_file}...")
results = read_json(result_file)

# model = llama_3_1.LLaMA()

# full pids
full_pids = list(results.keys())
if args.number > 0:
    full_pids = full_pids[:min(args.number, len(full_pids))]
print("Number of testing problems:", len(full_pids))

# test pids
if args.rerun:
    test_pids = full_pids
else:
    test_pids = []
    for pid in full_pids:
        # print(pid)
        if 'extraction' not in results[pid] or not verify_extraction(results[pid]['extraction']):
            test_pids.append(pid)


test_num = len(test_pids)
print("Number of problems to run:", test_num)
# print(test_pids)

# tqdm, enumerate results
for i, pid in enumerate(tqdm(test_pids)):
    target_names = results[pid]["targets"].keys()
    for name in target_names:
        problem = results[pid]

        assert label in problem["targets"][name]
        response = problem["targets"][name][label]       

        
        # extraction  = extract_answer(response, problem, name, model, args.quick_extract)
        extraction  = extract_answer(response, problem, name, None, args.quick_extract)
        results[pid]["targets"][name]['extraction'] = extraction

        if i % args.save_every == 0 or i == test_num - 1:
            print(f"Saving results to {output_file}...")
            save_json(results, output_file)
            print(f"Results saved.")