In [None]:
###### FOR PACE ICE - replace GT username below ######
%cd /home/hice1/nbalakrishna3/scratch
!pwd

In [None]:
import os
import json
import base64
from openai import OpenAI
from anthropic import Anthropic, HUMAN_PROMPT, AI_PROMPT
from dotenv import load_dotenv
from tqdm import tqdm

In [None]:
# load_dotenv()
# API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_API_KEY = ""
ANTHROPIC_API_KEY = ""

In [None]:
if not OPENAI_API_KEY:
    raise ValueError(" OPENAI_API_KEY not found in .env file")

In [None]:
IMAGE_FOLDER = "datasets/coco/images/train2017"          
BLIP_OUTPUT_PATH = "blip_multi_exp2_responses.jsonl"  
GPT_MODEL = "gpt-4.1-mini"
CLAUDE_MODEL = "claude-3-5-sonnet-20241022"
MAX_OUTPUT = 200               

In [None]:
openai_client = OpenAI(api_key=OPENAI_API_KEY)
anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY)
print(anthropic_client.models.list())

In [None]:
def encode_image(image_path):
    with open(image_path, "rb") as img:
        return base64.b64encode(img.read()).decode("utf-8")

In [None]:
def generate_questions(base64_image):
    prompt = """
You are preparing controlled experimental materials for multimodal evaluation.

Given the IMAGE (provided separately), generate the following:

============================================================
1. Correct Caption
============================================================
• Accurately describe the visible scene.
• 9–15 words, objective, simple, and factual.
• Should mention main objects; avoid inference beyond evidence.

============================================================
2. Visual Necessity Question Ladder (VNL): Levels L0 → L4
============================================================

GENERAL RULES:
• L1–L4 MUST require looking at the image to answer.
• All questions MUST be answerable using only the given image.
• Do NOT include the answers.
• No question should exceed 14 words.
• Return concise, natural wording.

------------------------------------------------------------
L0 – Baseline Question (Language-prior only)
------------------------------------------------------------
• A question humans can answer **without seeing the image**.
• May refer to the world generally (NOT the specific image).
• Purpose: control for language-only biases.
• 6–12 words.
Examples:
– “What season often has the coldest weather?”  
– “Which animal is larger, a dog or an elephant?”  
– “What do people usually use to take photographs?”

------------------------------------------------------------
L1 – Basic Visual Recognition
------------------------------------------------------------
• Requires the image.
• Ask about a **primary object** or its basic property.
• No reasoning, no inference.
Examples:
– “What object is the person holding?”  
– “What color is the animal?”  
– “How many people are visible?”

------------------------------------------------------------
L2 – Intermediate Visual Detail
------------------------------------------------------------
• Also requires the image.
• Ask about a **secondary property** of a main object.
• Slightly more specific than L1.
Examples:
– “What pattern is on the person’s shirt?”  
– “What type of hat is the man wearing?”  
– “What material is the table made of?”

------------------------------------------------------------
L3 – Relational / Spatial Reasoning
------------------------------------------------------------
• Requires image + spatial relations + relational understanding.
Examples:
– “Where is the dog positioned relative to the child?”  
– “What object is behind the bicycle?”  
– “Which person is closest to the camera?”

------------------------------------------------------------
L4 – High-Level Visual Reasoning
------------------------------------------------------------
• Hardest level; requires the entire scene.
• Ask about interactions, goals, implied roles, or multi-object context.
• Still must be answerable from the image alone (no external inference).
Examples:
– “What activity are the people engaged in?”  
– “Why is the man extending his arm?”  
– “What is the group collectively doing?”

============================================================
Return EXACTLY this JSON structure:
{
  "correct_caption": "<string>",
  "L0": "<string>",
  "L1": "<string>",
  "L2": "<string>",
  "L3": "<string>",
  "L4": "<string>"
}
============================================================


"""
    response = openai_client.responses.create(
        model=GPT_MODEL,
        max_output_tokens=MAX_OUTPUT,
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": prompt},
                    {
                        "type": "input_image",
                        "image_url": f"data:image/jpeg;base64,{base64_image}"
                    }
                ]
            }
        ]
    )

    return json.loads(response.output_text)

In [None]:
# V1 ADDED - in version to compute attention-based MDI 

def compute_mdi(attentions, n_img_tokens=32):
    """
    Compute Modality Dominance Index (MDI) from BLIP-2 cross-attention tensors.

    attentions:
        - tuple of (num_layers,) where each element is either:
              Tensor[batch, heads, tgt_len, src_len]
          OR nested tuples depending on HF version.
    n_img_tokens: number of image tokens in the cross-attention source sequence.

    Returns:
        Single float MDI value:
            visual_attn / (visual_attn + textual_attn + 1e-9)
        or None if no valid attention tensors were found.
    """

    flat_attns = []

    # ---- 1. Flatten nested tuples ----
    if isinstance(attentions, (list, tuple)):
        for a in attentions:
            if isinstance(a, (list, tuple)):
                flat_attns.extend([
                    x for x in a if isinstance(x, torch.Tensor)
                ])
            elif isinstance(a, torch.Tensor):
                flat_attns.append(a)
    elif isinstance(attentions, torch.Tensor):
        flat_attns.append(attentions)

    if not flat_attns:
        print("⚠️  No attention tensors found.")
        return None

    visual_scores = []
    textual_scores = []

    # ---- 2. Compute visual/textual attention for each layer ----
    for layer_attn in flat_attns:
        if not isinstance(layer_attn, torch.Tensor):
            continue

        # layer_attn shape: [batch, heads, tgt_len, src_len]
        # average over batch + heads
        attn_mean = layer_attn.mean(dim=(0, 1))  # -> [tgt_len, src_len]

        tgt_len, src_len = attn_mean.shape

        # safety check
        n_img_tokens_safe = min(n_img_tokens, src_len)

        # first n tokens = image tokens
        visual = attn_mean[:, :n_img_tokens_safe].mean().item()

        # rest = text tokens
        textual = attn_mean[:, n_img_tokens_safe:].mean().item()

        visual_scores.append(visual)
        textual_scores.append(textual)

    if not visual_scores:
        return None

    # Average over layers
    visual_avg = sum(visual_scores) / len(visual_scores)
    textual_avg = sum(textual_scores) / len(textual_scores)

    # ---- 3. Modality Dominance Index ----
    mdi = visual_avg / (visual_avg + textual_avg + 1e-9)

    return mdi

In [None]:
# V1 (Only BLIP compatibility)

def flatten_attn_tensors(attentions):
    """
    Flattens nested BLIP-2 attention structures into a simple list of tensors.
    """
    flat = []

    if isinstance(attentions, (list, tuple)):
        for a in attentions:
            if isinstance(a, (list, tuple)):
                flat.extend([x for x in a if isinstance(x, torch.Tensor)])
            elif isinstance(a, torch.Tensor):
                flat.append(a)
    elif isinstance(attentions, torch.Tensor):
        flat.append(attentions)

    return flat

In [None]:
import torch
import math

#V1 (Only BLIP compatibility)

def compute_attention_entropy(attentions):
    """
    Compute entropy of BLIP-2 decoder/cross-attention tensors.
    """
    flat = flatten_attn_tensors(attentions)
    if not flat:
        return None

    entropies = []

    for layer in flat:
        if not isinstance(layer, torch.Tensor):
            continue
        if layer.numel() == 0:     # <-- important fix
            continue

        # Normalize to [batch, heads, tgt, src]
        if layer.dim() == 3:
            layer = layer.unsqueeze(0)

        logits = layer.float()                # [batch, heads, tgt, src]
        probs  = torch.softmax(logits, dim=-1)
        probs  = probs.clamp(min=1e-9)

        entropy = -(probs * probs.log()).sum(dim=-1)   # [batch, heads, tgt]
        entropies.append(entropy.mean().item())        # scalar

    return sum(entropies) / len(entropies)

In [None]:
def pad_to_match(a, b):
    """
    Pad the src_len dimension so that 'a' and 'b' have the same shape.
    Padding is applied on the last dimension (src_len).
    Shapes expected: [batch, heads, tgt_len, src_len].
    """
    if a.size(-1) == b.size(-1):
        return a, b

    diff = a.size(-1) - b.size(-1)

    if diff > 0:
        # a is longer — pad b
        pad = (0, diff)  # pad right side of src_len
        b = torch.nn.functional.pad(b, pad)
    else:
        # b is longer — pad a
        pad = (0, -diff)
        a = torch.nn.functional.pad(a, pad)

    return a, b

#V1 (Only BLIP compatibility)

def compute_attention_shift(prev_attn, curr_attn):
    """
    Compute average L1 shift between two sets of attention tensors.
    Handles nested tuples, 3D/4D mismatches, and differing src_len.
    """
    prev_flat = flatten_attn_tensors(prev_attn)
    curr_flat = flatten_attn_tensors(curr_attn)

    shifts = []

    for A, B in zip(prev_flat, curr_flat):
        if not (isinstance(A, torch.Tensor) and isinstance(B, torch.Tensor)):
            continue
        if A.numel() == 0 or B.numel() == 0:
            continue

        # Normalize 3D → 4D: [heads, tgt, src] → [1, heads, tgt, src]
        if A.dim() == 3:
            A = A.unsqueeze(0)
        if B.dim() == 3:
            B = B.unsqueeze(0)

        # src_len might differ → pad
        A, B = pad_to_match(A, B)

        # Compute shift: mean absolute difference
        shift = torch.abs(A - B).mean().item()
        shifts.append(shift)

    if not shifts:
        return None

    return sum(shifts) / len(shifts)

In [None]:
# Multi-turn support

def ask_blip2(
    path,
    caption,
    question,
    history=None,
    max_new_tokens=50,
    return_mdi=False,
    return_attn=False,
    last_turn_only=False
):
    if history is None:
        history = []

    # Load image
    image = Image.open(path).convert("RGB")

    # ===== Build BLIP-2-Compatible Prompt =====
    prompt_parts = []

    # Caption
    prompt_parts.append(f"Caption: {caption}\n\n")


    if len(history) > 0:
        prompt_parts.append("Previous QA:\n")

# Don't need - generate_BLIP_outputs() take care of storing history depending on last_turn_only flag passed in
#         if last_turn_only:
#             # use ONLY last turn
#             q_prev, a_prev = history[-1]
#             prompt_parts.append(f"Question: {q_prev}\n")
#             prompt_parts.append(f"Answer: {a_prev}\n\n")
#         else:
#             # append ALL history turns
        for q_prev, a_prev in history:
            prompt_parts.append(f"Question: {q_prev}\n")
            prompt_parts.append(f"Answer: {a_prev}\n\n")
        

    # Current question
    prompt_parts.append(f"Question: {question}\n")
    prompt_parts.append("Answer: ")

    prompt = "".join(prompt_parts)

    # ---- Preprocess ----
    inputs = processor(
        image,
        prompt,
        return_tensors="pt"
    ).to(model.device, torch.float16)

    # FAST PATH (no attentions)
    if not return_mdi and not return_attn:
        output_ids = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            temperature=0.0,
            do_sample=False
        )
        answer = processor.tokenizer.decode(
            output_ids[0], skip_special_tokens=True
        )

        # Extract answer after "Answer:"
        if "Answer:" in answer:
            answer = answer.split("Answer:")[-1].strip()

        return answer

    # ---- SLOW PATH (with attentions) ----
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        temperature=0.0,
        do_sample=False,
        return_dict_in_generate=True,
        output_attentions=True
    )

    answer = processor.tokenizer.decode(
        outputs.sequences[0], skip_special_tokens=True
    )

    if "Answer:" in answer:
        answer = answer.split("Answer:")[-1].strip()

    # Extract attentions
    attns = (
        outputs.cross_attentions
        if hasattr(outputs, "cross_attentions") and outputs.cross_attentions
        else outputs.decoder_attentions
    )

    if return_mdi:
        mdi = compute_mdi(attns)
        if return_attn:
            return answer, mdi, attns
        return answer, mdi

    return answer

In [None]:
###### FOR PACE ICE ONLY - replace GT username below ######

# Tells HuggingFace to save all downloaded models + datasets in scratch directory instead of home directory
os.environ["HF_HOME"] = "/home/hice1/nbalakrishna3/scratch/huggingface"
os.environ["HF_DATASETS_CACHE"] = "/home/hice1/nbalakrishna3/scratch/hf_cache"
os.environ["TRANSFORMERS_CACHE"] = "/home/hice1/nbalakrishna3/scratch/hf_cache"

In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration, AutoConfig
import torch
from PIL import Image

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

MODEL_NAME = "Salesforce/blip2-flan-t5-xl"

# ---- 1. Load config and enable attentions ----
config = AutoConfig.from_pretrained(MODEL_NAME)
config.output_attentions = True
config.return_dict = True
config.return_dict_in_generate = True

# ---- 2. Load processor normally ----
print("Loading BLIP model...")
processor = Blip2Processor.from_pretrained(MODEL_NAME)

# ---- 3. Load model with modified config ----
model = Blip2ForConditionalGeneration.from_pretrained(
    MODEL_NAME,
    config=config,
    torch_dtype=torch.float16,
    device_map="auto"
)

model.eval()
print("✅ Model loaded (with attentions enabled)!")

In [None]:
def create_eval_prompt(caption, condition, question, model_answer):
    return f"""
You are an automated scoring module. 
You MUST output ONLY a single character: "0" or "1".
NEVER output words, sentences, explanations, punctuation, or reasoning.

YOUR TASK:
Judge whether the MODEL_ANSWER correctly matches what is visually true in the IMAGE.

IMPORTANT:
• The CAPTION is NOT ground truth. It was only context shown to another model.
  Do NOT trust it, and do NOT use it to judge correctness.
• Ignore any misleading, missing, or incorrect details in the CAPTION.
• Use ONLY the IMAGE + the QUESTION + world knowledge to judge correctness.
• If the MODEL_ANSWER matches the visible truth in the image, output "1".
• If the MODEL_ANSWER is wrong, unsupported, or contradicts the image, output "0".
• Your ENTIRE reply MUST be exactly one character: "0" or "1".

Now evaluate:

CAPTION (context only, NOT truth): {caption}
CONDITION: {condition}
QUESTION: {question}
MODEL_ANSWER: {model_answer}

Reply with ONLY "0" or "1".
"""

In [None]:
def eval_answer(image_path, caption, condition, question, model_answer):
    """
    Builds the judge prompt, encodes the image,
    calls Claude directly, and returns 0 or 1.
    """

    # ---- Build prompt ----
    prompt = create_eval_prompt(caption, condition, question, model_answer)

    # ---- Encode image ----
    with open(image_path, "rb") as f:
        img_bytes = f.read()
    b64img = base64.b64encode(img_bytes).decode("utf-8")

    # ---- Call Claude ----
    response = anthropic_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=5,
        temperature=0,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": "image/jpeg",
                            "data": b64img
                        }
                    },
                    {
                        "type": "text",
                        "text": prompt
                    }
                ]
            }
        ]
    )

    # ---- Parse output ----
    output = response.content[0].text.strip()

    if output not in ("0", "1"):
        raise ValueError(f"Unexpected Claude judge output: {output}")

    return int(output)

In [None]:
# V2
import random

from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_BLIP_outputs(subset_size=None, last_turn_only=False):
    all_image_files = [
        f for f in os.listdir(IMAGE_FOLDER)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ]
    
    if subset_size is not None:
        image_files = random.sample(all_image_files, subset_size)
    else:
        image_files = all_image_files

    print(f"Found {len(image_files)} images.\n")

    with open(BLIP_OUTPUT_PATH, "w", encoding="utf-8") as out:
        for img_file in tqdm(image_files, desc="Processing"):
            image_id = os.path.splitext(img_file)[0]
            path = os.path.join(IMAGE_FOLDER, img_file)

            try:
                # ---- 1) GPT captions + questions ----
                b64 = encode_image(path)
                q = generate_questions(b64)

                correct_caption = q["correct_caption"]
#                 incorrect_caption = q["incorrect_caption"]

                L0 = q["L0"]
                L1 = q["L1"]
                L2 = q["L2"]
                L3 = q["L3"]
                L4 = q["L4"]

                answers_correct = {}
                mdi_correct = {}
                entropy_correct = {}
                shift_correct = {}
                
                history_correct = [] 
                prev_attn = None

                for lvl, q in [("L0", L0), ("L1", L1), ("L2", L2), ("L3", L3), ("L4", L4)]:
                    ans, mdi, attn = ask_blip2(path, correct_caption, q, history=history_correct, return_mdi=True, return_attn=True, last_turn_only=last_turn_only)
                    
                    if last_turn_only: 
                        history_correct = [(q, ans)]
                    else:
                        history_correct.append((q, ans))

                    answers_correct[lvl] = ans
                    mdi_correct[lvl] = round(mdi, 3)

                    # entropy
                    ent = compute_attention_entropy(attn)
                    entropy_correct[lvl] = round(ent, 3) if ent is not None else None

                    # attention shift
                    if prev_attn is None:
                        shift_correct[lvl] = None
                    else:
                        shift = compute_attention_shift(prev_attn, attn)
                        shift_correct[lvl] = round(shift, 3) if shift is not None else None

                    prev_attn = attn
                

                # ---- 3) Base JSON structure ----
                output = {
                    "image_id": image_id,

                    # Single caption for this experiment
                    "caption": correct_caption,

                    # Questions per difficulty level
                    "questions": {
                        "L0": L0,
                        "L1": L1,
                        "L2": L2,
                        "L3": L3,
                        "L4": L4
                    },

                    # Model answers (BLIP/LLaVA/etc.)
                    "answers": answers_correct,   # dict: {"L0": ..., "L1": ..., ...}

                    # Metrics grouped by level
                    "metrics": {},                # will fill below

                    # Claude eval scores for each level
                    "eval_scores": {}             # will fill later
                }
        
                levels = ["L0", "L1", "L2", "L3", "L4"]

                for lvl in levels:
                    output["metrics"][lvl] = {
                    "mdi": mdi_correct.get(lvl),
                    "entropy": entropy_correct.get(lvl),
                    "shift": shift_correct.get(lvl)
                }

                # ---- 4) Parallel Claude evaluation ----
                jobs = []
                with ThreadPoolExecutor(max_workers=8) as ex:
                    for level, question in output["questions"].items():

                        # correct caption condition
                        jobs.append(ex.submit(
                            eval_answer,
                            path,
                            output["caption"],
                            "correct caption condition",
                            question,
                            output["answers"][level]
                        ))

                    # collect results
                    ordered_results = [j.result() for j in jobs]

                # ---- 5) Attach scores to JSON in correct structure ----
                idx = 0
                for level in ["L0", "L1", "L2", "L3", "L4"]:
                    score_c = ordered_results[idx]; idx += 1
                    output["eval_scores"][level] = score_c

                # ---- 6) Write one JSON line ----
                out.write(json.dumps(output, ensure_ascii=False) + "\n")

            except Exception as e:
                print(f"\nError with {image_id}: {e}")
                

    print(f"\nDone. JSONL saved to: {BLIP_OUTPUT_PATH}\n")

In [None]:
# For cross-modal comparison (will be implemented in another file)

# import json
# import pandas as pd
# import numpy as np

# def jsonl_to_df(jsonl_path):
#     """
#     Reads your blip_responses.jsonl or llava_responses.jsonl file
#     and converts it into a clean pandas DataFrame.
    
#     Handles:
#     - JSONL line-by-line format
#     - correct/incorrect caption scores
#     - nested mdi/entropy/shift dicts
#     - missing/null values safely
#     """

#     rows = []

#     with open(jsonl_path, "r") as f:
#         for line in f:
#             line = line.strip()
#             if not line:
#                 continue

#             # Load current JSON object (one per line)
#             entry = json.loads(line)

#             image_id = entry["image_id"]

#             # Loop over correct_caption / incorrect_caption
#             for caption_type in ["correct_caption", "incorrect_caption"]:

#                 # Loop over L0/L1/L2/L3 levels
#                 for lvl in ["L0", "L1", "L2", "L3"]:

#                     # Safely extract nested fields with .get()
#                     mdi_val = entry["mdi_scores"][caption_type].get(lvl)
#                     entropy_val = entry["entropy_scores"][caption_type].get(lvl)
#                     shift_val = entry["shift_scores"][caption_type].get(lvl)

#                     # Evaluation: 0 or 1
#                     correct_val = entry["eval_scores"][caption_type].get(
#                         f"{caption_type}_score"
#                     )

#                     row = {
#                         "image_id": image_id,
#                         "caption_type": caption_type,
#                         "level": lvl,
#                         "mdi": mdi_val,
#                         "entropy": entropy_val,
#                         "shift": shift_val,
#                         "correct": correct_val,
#                     }

#                     rows.append(row)

#     df = pd.DataFrame(rows)

#     # Convert None to NaN (cleaner for plotting)
#     df = df.replace({None: np.nan})

#     return df

In [None]:
if __name__ == "__main__":
    
    ######### BLIP #########
    
    #Generates dataset used (correct/incorrect captions, L0-L4 questions)
    #Evaluates BLIP-2 responses via Claude Sonnet 4.5 (0 - incorrect; 1 - correct)
    generate_BLIP_outputs(subset_size=5, last_turn_only=False)