In [1]:
# %% 0) Imports
import os, re, json, csv, pathlib
from dataclasses import dataclass
from typing import Dict, List, Iterable
from collections import Counter
from tqdm.auto import tqdm

import torch
from PIL import Image

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# %% A) Make sure the right packages/classes are available
import sys, platform
print("Python:", sys.version)
try:
    import torch, transformers
    from transformers import AutoProcessor, LlavaForConditionalGeneration
    print("PyTorch:", torch.__version__)
    print("Transformers:", transformers.__version__)
except Exception as e:
    print("Import check failed:", repr(e))

Python: 3.10.19 | packaged by Anaconda, Inc. | (main, Oct 21 2025, 16:41:31) [MSC v.1929 64 bit (AMD64)]
PyTorch: 2.9.0+cu126
Transformers: 4.57.1


In [3]:
# %% 1) Config — EDIT THESE
POPE_ROOT      = "POPE/output/coco"   # folder with random/popular/adversarial .json/.jsonl
COCO_IMG_ROOT  = "val2014"     # folder with COCO_val2014_*.jpg
HF_MODEL_ID    = "llava-hf/llava-1.5-7b-hf"      # e.g., "llava-hf/llava-1.5-7b-hf" or "llava-hf/llava-1.6-mistral-7b-hf"

DEVICE         = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE          = torch.float16 if DEVICE == "cuda" else torch.float32
MAX_NEW_TOKENS = 8
TEMPERATURE    = 0.0
CSV_OUT        = "pope_coco_predictions.csv"

print("DEVICE:", DEVICE)
print("MODEL:", HF_MODEL_ID)
print("POPE_ROOT:", POPE_ROOT)
print("COCO_IMG_ROOT:", COCO_IMG_ROOT)

DEVICE: cuda
MODEL: llava-hf/llava-1.5-7b-hf
POPE_ROOT: POPE/output/coco
COCO_IMG_ROOT: val2014


In [4]:
# List json/jsonl files under your POPE_ROOT so we can see real names/paths
from pathlib import Path

root = Path(POPE_ROOT)
assert root.exists(), f"POPE_ROOT not found: {root}"

cands = list(root.rglob("*.json")) + list(root.rglob("*.jsonl"))
print(f"Found {len(cands)} JSON/JSONL files under {root}:")
for p in cands[:50]:
    print(" -", p.relative_to(root))
if len(cands) == 0:
    print("No JSON files found. Double-check POPE_ROOT.")

Found 4 JSON/JSONL files under POPE\output\coco:
 - coco_pope_adversarial.json
 - coco_pope_popular.json
 - coco_pope_random.json
 - .ipynb_checkpoints\coco_pope_random-checkpoint.json


In [5]:
# %% 2) POPE loader (robust to .json vs .jsonl) + image id normalizer
# %% FIXED: auto-detect loader for POPE files (handles underscores, avoids -checkpoint)
import json, pathlib

def _iter_json_or_jsonl(path: pathlib.Path):
    with open(path, "r", encoding="utf-8") as f:
        txt = f.read().strip()
    # Try single JSON doc
    try:
        obj = json.loads(txt)
        if isinstance(obj, dict) and "data" in obj:
            for row in obj["data"]:
                yield row
            return
        if isinstance(obj, list):
            for row in obj:
                yield row
            return
        if isinstance(obj, dict):
            yield obj
            return
    except json.JSONDecodeError:
        pass
    # Fallback: JSONL
    for line in txt.splitlines():
        line = line.strip()
        if line:
            yield json.loads(line)

def _to_val2014_filename(x):
    if isinstance(x, int):
        return f"COCO_val2014_{x:012d}.jpg"
    if isinstance(x, str):
        return os.path.basename(x)
    raise TypeError(f"Unsupported image ref type: {type(x)}")

def load_pope_split_autodetect(pope_root: str) -> dict:
    """
    Recursively scans for files whose names contain 'random' / 'popular' / 'adversarial'
    (case-insensitive), prefers non '-checkpoint' files, picks the largest if multiples.
    Accepts .json OR .jsonl; accepts {'data': [...]} or JSONL lines.
    Normalizes rows to {image, question, answer in {'yes','no'}}.
    """
    root = pathlib.Path(pope_root)
    assert root.exists(), f"POPE_ROOT not found: {root}"

    files = list(root.rglob("*.json")) + list(root.rglob("*.jsonl"))
    assert files, f"No .json/.jsonl found under {root}"

    def pick_for(key: str) -> pathlib.Path:
        key_l = key.lower()
        cands = [p for p in files if key_l in p.name.lower()]
        # Prefer non-checkpoint
        primary = [p for p in cands if "checkpoint" not in p.name.lower()]
        use = primary or cands
        if not use:
            raise FileNotFoundError(f"No file name containing '{key}' under {root}. Found: {[p.name for p in files][:10]}")
        # choose the largest (often the real data file)
        use.sort(key=lambda p: p.stat().st_size, reverse=True)
        return use[0]

    selected = {
        "random": pick_for("random"),
        "popular": pick_for("popular"),
        "adversarial": pick_for("adversarial"),
    }

    print("Selected files (auto):")
    for k, p in selected.items():
        print(f" - {k}: {p.relative_to(root)}")

    out = {}
    for split, path in selected.items():
        rows = []
        for r in _iter_json_or_jsonl(path):
            ans = str(r.get("answer", "")).strip().lower()
            if ans in ("1","true","yes","y"): ans = "yes"
            elif ans in ("0","false","no","n"): ans = "no"
            img = r.get("image") or r.get("image_id") or r.get("img") or r.get("img_path")
            q   = r.get("question") or r.get("q") or r.get("text")
            rows.append({
                "image": _to_val2014_filename(img) if img is not None else None,
                "question": q,
                "answer": ans
            })
        rows = [x for x in rows if x["image"] and x["question"] and x["answer"] in {"yes","no"}]
        if not rows:
            raise RuntimeError(f"Loaded 0 normalized rows for split '{split}' from {path}")
        out[split] = rows

    return out

In [6]:
# Inspect what's actually inside your POPE files
from pathlib import Path
import json, itertools

def peek_pope_file(path, n=5):
    p = Path(path)
    print(f"\n=== Peek: {p} ===")
    txt = p.read_text(encoding="utf-8").strip()
    obj = None
    try:
        obj = json.loads(txt)
    except json.JSONDecodeError:
        # JSONL fallback
        lines = [json.loads(ln) for ln in txt.splitlines() if ln.strip()]
        obj = lines

    rows = obj.get("data", obj) if isinstance(obj, dict) else obj
    if not isinstance(rows, list):
        print("Unexpected top-level type:", type(rows))
        return

    print("Total items:", len(rows))
    for i, r in enumerate(itertools.islice(rows, n)):
        print(f"\n-- Row {i} keys:", list(r.keys()))
        # show a compact preview of key fields if present
        for k in ["image","image_id","img","img_id","img_path","file_name",
                  "question","q","text","prompt",
                  "answer","label","gt","target","ans","response","y"]:
            if k in r:
                print(f"  {k} => {r[k]!r}")

# List JSON files and peek the three we’ll use
from pathlib import Path
root = Path(POPE_ROOT)
cands = list(root.rglob("*.json")) + list(root.rglob("*.jsonl"))
print(f"Found {len(cands)} POPE json/jsonl under {root}:")
for p in cands:
    print(" -", p.relative_to(root))

# If your files are named like coco_pope_*.json, peek them directly:
peek_pope_file(root / "coco_pope_random.json")
peek_pope_file(root / "coco_pope_popular.json")
peek_pope_file(root / "coco_pope_adversarial.json")

Found 4 POPE json/jsonl under POPE\output\coco:
 - coco_pope_adversarial.json
 - coco_pope_popular.json
 - coco_pope_random.json
 - .ipynb_checkpoints\coco_pope_random-checkpoint.json

=== Peek: POPE\output\coco\coco_pope_random.json ===
Total items: 3000

-- Row 0 keys: ['question_id', 'image', 'text', 'label']
  image => 'COCO_val2014_000000310196.jpg'
  text => 'Is there a snowboard in the image?'
  label => 'yes'

-- Row 1 keys: ['question_id', 'image', 'text', 'label']
  image => 'COCO_val2014_000000310196.jpg'
  text => 'Is there a car in the image?'
  label => 'no'

-- Row 2 keys: ['question_id', 'image', 'text', 'label']
  image => 'COCO_val2014_000000310196.jpg'
  text => 'Is there a person in the image?'
  label => 'yes'

-- Row 3 keys: ['question_id', 'image', 'text', 'label']
  image => 'COCO_val2014_000000310196.jpg'
  text => 'Is there a sandwich in the image?'
  label => 'no'

-- Row 4 keys: ['question_id', 'image', 'text', 'label']
  image => 'COCO_val2014_000000310196.

In [7]:
# Robust POPE loader: tolerates many schema variants
import json, pathlib, os

YES_TOKENS = {"1","true","yes","y","present","a"}   # include 'a' as some datasets use A/B
NO_TOKENS  = {"0","false","no","n","absent","b"}

def _read_any(path: pathlib.Path):
    txt = path.read_text(encoding="utf-8").strip()
    try:
        obj = json.loads(txt)
        if isinstance(obj, dict) and "data" in obj:
            return obj["data"]
        if isinstance(obj, list):
            return obj
        if isinstance(obj, dict):
            return [obj]
    except json.JSONDecodeError:
        pass
    # JSONL fallback
    return [json.loads(ln) for ln in txt.splitlines() if ln.strip()]

def _canon_answer(row):
    # try common fields
    raw = None
    for k in ("answer","label","gt","target","ans","response","y"):
        if k in row:
            raw = row[k]
            break
    if raw is None:
        return None
    # normalize types
    if isinstance(raw, bool):
        return "yes" if raw else "no"
    if isinstance(raw, (int, float)):
        return "yes" if int(raw) == 1 else "no"
    s = str(raw).strip().lower()
    if s in YES_TOKENS: return "yes"
    if s in NO_TOKENS:  return "no"
    # sometimes options are like {"A":"yes","B":"no"} and label is "A"/"B"
    if s in {"a","b"}:
        # heuristic: map A->yes, B->no unless 'options' says otherwise
        opts = row.get("options") or row.get("choices")
        if isinstance(opts, dict):
            inv = {v.strip().lower(): k.lower() for k,v in opts.items()}
            # if options say 'yes' maps to 'a' explicitly, honor that
            if "yes" in inv and "no" in inv:
                return "yes" if s == inv["yes"] else "no"
        return "yes" if s == "a" else "no"
    return None

def _to_val2014_filename(x):
    if isinstance(x, int):
        return f"COCO_val2014_{x:012d}.jpg"
    if isinstance(x, str):
        return os.path.basename(x)
    return None

def _canon_image(row):
    for k in ("image","image_id","img","img_id","img_path","file_name","filename","path"):
        if k in row:
            v = row[k]
            fn = _to_val2014_filename(v)
            if fn:
                return fn
    return None

def _canon_question(row):
    for k in ("question","q","text","prompt","instruction"):
        if k in row:
            v = row[k]
            if isinstance(v, str) and v.strip():
                return v.strip()
    return None

def _pick(pope_root: pathlib.Path, key_substr: str):
    cands = [p for p in pope_root.rglob("*.json")] + [p for p in pope_root.rglob("*.jsonl")]
    cands = [p for p in cands if key_substr.lower() in p.name.lower() and "checkpoint" not in p.name.lower()]
    if not cands:
        raise FileNotFoundError(f"No file containing '{key_substr}' under {pope_root}")
    cands.sort(key=lambda p: p.stat().st_size, reverse=True)
    return cands[0]

def load_pope_split_robust(pope_root: str) -> dict:
    root = pathlib.Path(pope_root)
    assert root.exists(), f"POPE_ROOT not found: {root}"

    files = {
        "random": _pick(root, "random"),
        "popular": _pick(root, "popular"),
        "adversarial": _pick(root, "adversarial"),
    }
    print("Using files:")
    for k,p in files.items():
        print(f" - {k}: {p.relative_to(root)} (size {p.stat().st_size/1024:.1f} KB)")

    out = {}
    for split, path in files.items():
        raw_rows = _read_any(path)
        norm = []
        for r in raw_rows:
            img = _canon_image(r)
            q   = _canon_question(r)
            a   = _canon_answer(r)
            if img and q and a in {"yes","no"}:
                norm.append({"image": img, "question": q, "answer": a})
        if not norm:
            # show a few raw rows to help debug
            print(f"\n[WARN] 0 normalized rows for {split}. First raw rows:")
            for sample in raw_rows[:3]:
                print(sample)
            raise RuntimeError(f"0 normalized rows for split '{split}' from {path}")
        out[split] = norm
    return out

In [8]:
pope = load_pope_split_robust(POPE_ROOT)

for split in ("random","popular","adversarial"):
    print(f"{split}: {len(pope[split])} rows")
    print("  sample:", pope[split][0])

Using files:
 - random: coco_pope_random.json (size 362.5 KB)
 - popular: coco_pope_popular.json (size 361.5 KB)
 - adversarial: coco_pope_adversarial.json (size 361.7 KB)
random: 3000 rows
  sample: {'image': 'COCO_val2014_000000310196.jpg', 'question': 'Is there a snowboard in the image?', 'answer': 'yes'}
popular: 3000 rows
  sample: {'image': 'COCO_val2014_000000310196.jpg', 'question': 'Is there a snowboard in the image?', 'answer': 'yes'}
adversarial: 3000 rows
  sample: {'image': 'COCO_val2014_000000310196.jpg', 'question': 'Is there a snowboard in the image?', 'answer': 'yes'}


In [9]:
# %% 4) Load LLaVA from Hugging Face
processor = AutoProcessor.from_pretrained(HF_MODEL_ID)
model = LlavaForConditionalGeneration.from_pretrained(
    HF_MODEL_ID,
    torch_dtype=DTYPE,
    low_cpu_mem_usage=True,
    device_map="auto" if DEVICE == "cuda" else None
).to(DEVICE)
model.eval()
print("Loaded HF LLaVA:", HF_MODEL_ID, "on", DEVICE)

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
`torch_dtype` is deprecated! Use `dtype` instead!

oading checkpoint shards: 100%|█████████████████████████████████████████████████████████| 3/3 [00:08<00:00,  2.72s/it]

Loaded HF LLaVA: llava-hf/llava-1.5-7b-hf on cuda


In [10]:
# %% 5) Hugging Face LLaVA inference (replaces llava_answer_yesno/infer_yesno)

YES_RE = re.compile(r"\byes\b", re.I)
NO_RE  = re.compile(r"\bno\b", re.I)

def normalize_yesno(text: str) -> str:
    if not text:
        return "no"
    t = text.strip().lower()
    ys = [m.start() for m in YES_RE.finditer(t)]
    ns = [m.start() for m in NO_RE.finditer(t)]
    if ys or ns:
        return "yes" if (ys[-1] if ys else -1) > (ns[-1] if ns else -1) else "no"
    return "yes" if t.startswith("y") else "no"


def hf_llava_answer_yesno(processor, model, image_path: str, question: str,
                          max_new_tokens=8, temperature=0.0) -> str:
    """
    Run HuggingFace LlavaForConditionalGeneration on one (image, question) pair.
    Returns 'yes' or 'no'.
    """
    # prompt format for HF Llava models
    prompt = (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. "
        "Do not add any other words.\n"
        f"Question: {question}\n"
        "ASSISTANT:"
    )
    image = Image.open(image_path).convert("RGB")
    inputs = processor(images=image, text=prompt, return_tensors="pt").to(DEVICE)

    with torch.inference_mode():
        output_ids = model.generate(
            **inputs,
            do_sample=False if temperature == 0.0 else True,
            temperature=float(temperature),
            max_new_tokens=int(max_new_tokens),
        )

    out = processor.batch_decode(output_ids, skip_special_tokens=True)[0]
    return normalize_yesno(out)


def infer_yesno(img_path: str, question: str) -> str:
    return hf_llava_answer_yesno(
        processor, model,
        img_path, question,
        max_new_tokens=MAX_NEW_TOKENS,
        temperature=TEMPERATURE,
    )

In [11]:
# %% 6) Metrics + evaluator

@dataclass
class Metrics:
    tp:int=0; tn:int=0; fp:int=0; fn:int=0
    def scores(self):
        acc = (self.tp + self.tn) / max(1, self.tp + self.tn + self.fp + self.fn)
        prec = self.tp / max(1, self.tp + self.fp)
        rec  = self.tp / max(1, self.tp + self.fn)
        f1   = 0.0 if (prec+rec)==0 else 2*prec*rec/(prec+rec)
        return {"Accuracy": acc, "Precision": prec, "Recall": rec, "F1": f1}

def eval_rows(rows: List[dict], coco_img_root: str, infer_fn) -> dict:
    m = Metrics()
    for r in tqdm(rows, leave=False, desc="Evaluating"):
        img_file = r["image"]
        img_path = img_file if os.path.isabs(img_file) else os.path.join(coco_img_root, img_file)
        if not os.path.exists(img_path):
            continue  # skip missing files
        pred = infer_fn(img_path, r["question"])
        gt   = r["answer"]
        if   pred=="yes" and gt=="yes": m.tp += 1
        elif pred=="no"  and gt=="no" : m.tn += 1
        elif pred=="yes" and gt=="no" : m.fp += 1
        elif pred=="no"  and gt=="yes": m.fn += 1
    return m.scores()

def pct(d):  # pretty-print as percentages
    return {k: round(v*100, 2) for k,v in d.items()}

In [12]:
# %% 7) Smoke test
N = 8
if N:
    test_rows = pope["random"][:N]
    print("Running smoke test on", len(test_rows), "items...")
    sm = eval_rows(test_rows, COCO_IMG_ROOT, infer_yesno)
    print("SMOKE:", pct(sm))
else:
    print("Skipping smoke test.")

Running smoke test on 8 items...


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
                                                                                                                       

SMOKE: {'Accuracy': 100.0, 'Precision': 100.0, 'Recall': 100.0, 'F1': 100.0}




In [16]:
# %% 8) Full POPE (COCO) run + report
import time

def run_full_pope(pope, coco_img_root, infer_fn):
    results = {}
    counts  = {}
    all_rows = []
    t0 = time.time()
    for split in ("random","popular","adversarial"):
        print(f"\nRunning split: {split}")
        s = eval_rows(pope[split], coco_img_root, infer_fn)
        results[split] = s
        all_rows.extend(pope[split])
    overall = eval_rows(all_rows, coco_img_root, infer_fn)
    t1 = time.time()

    print("\nPOPE (COCO) results:")
    for k in ("random","popular","adversarial"):
        print(f"- {k.title():12}:", pct(results[k]))
    print(f"- Overall     :", pct(overall))
    print(f"\nTotal time: {t1 - t0:.1f}s for {sum(len(pope[s]) for s in ('random','popular','adversarial'))} items")
    return results, overall

results, overall = run_full_pope(pope, COCO_IMG_ROOT, infer_yesno)


Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.83, 'Precision': 96.05, 'Recall': 81.0, 'F1': 87.88}
- Popular     : {'Accuracy': 86.93, 'Precision': 91.91, 'Recall': 81.0, 'F1': 86.11}
- Adversarial : {'Accuracy': 84.03, 'Precision': 86.23, 'Recall': 81.0, 'F1': 83.53}
- Overall     : {'Accuracy': 86.6, 'Precision': 91.22, 'Recall': 81.0, 'F1': 85.81}

Total time: 2889.7s for 9000 items




In [17]:
# %% 11) Build a steering vector (activation contrast: image vs. image-blind)
# Uses your already-loaded: processor, model, DEVICE; and your loaded 'pope' splits
# Idea: for each (image, question), compare hidden states with the real image vs a blank image.
# Steering dir v = mean( h_with_image - h_blank_image ) over a calibration set.

import torch
from PIL import Image, ImageColor

@torch.no_grad()
def _last_hidden_state(processor, model, image, question, return_full_output=False):
    prompt = (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. "
        "Do not add any other words.\n"
        f"Question: {question}\n"
        "ASSISTANT:"
    )
    inputs = processor(images=image, text=prompt, return_tensors="pt").to(DEVICE)
    out = model(**inputs, output_hidden_states=True, use_cache=False)
    # out.hidden_states is the final LM hidden states (one per layer if enabled by the HF port).
    # Some HF LLaVA ports expose only the final layer via `out.hidden_states[-1]`.
    last_h = out.hidden_states[-1][:, -1, :]  # (batch=1, seq, dim) -> (1, dim) for last token
    return (last_h, out) if return_full_output else last_h

# A simple neutral "blank" image (224x224 mid-gray)
_blank = Image.new("RGB", (224, 224), ImageColor.getrgb("#808080"))

def build_steering_vector(pope, coco_img_root, processor, model, 
                          split="random", 
                          num_calib=400, 
                          seed=0):
    torch.manual_seed(seed)
    rows = pope[split][:num_calib]
    vecs = []
    for r in tqdm(rows, desc="Calibrating (build v)", leave=False):
        img_path = os.path.join(coco_img_root, r["image"])
        if not os.path.exists(img_path):
            continue
        img = Image.open(img_path).convert("RGB")
        h_real = _last_hidden_state(processor, model, img, r["question"])   # (1, d)
        h_blank= _last_hidden_state(processor, model, _blank, r["question"])# (1, d)
        vecs.append((h_real - h_blank).cpu())
    if not vecs:
        raise RuntimeError("No calibration vectors collected (check image paths).")
    v = torch.stack(vecs, dim=0).mean(dim=0).squeeze(0)  # (d,)
    # Normalize to unit norm (helps tune alpha meaningfully)
    v = v / (v.norm(p=2) + 1e-8)
    return v

steer_v = build_steering_vector(pope, COCO_IMG_ROOT, processor, model, split="random", num_calib=400, seed=0)
print("Steering vector shape:", tuple(steer_v.shape))

                                                                                                                       

Steering vector shape: (4096,)




In [18]:
# %% 12) Steered yes/no inference
# We apply steering at the final hidden state: h* = h + alpha * v
# Then compute logits = lm_head(h*), compare Yes/No token scores.

# Helper: get token ids for 'Yes' and 'No' continuations (LLaMA uses leading space)
tok = processor.tokenizer
YES_IDS = tok.encode(" Yes", add_special_tokens=False)
NO_IDS  = tok.encode(" No",  add_special_tokens=False)
YES_ID  = YES_IDS[-1] if YES_IDS else tok.encode("Yes", add_special_tokens=False)[-1]
NO_ID   = NO_IDS[-1]  if NO_IDS  else tok.encode("No",  add_special_tokens=False)[-1]
print("YES_ID:", YES_ID, "NO_ID:", NO_ID)

@torch.no_grad()
def hf_llava_answer_yesno_steered(processor, model, image_path: str, question: str, 
                                  v: torch.Tensor, alpha: float = 1.0) -> str:
    # Forward once to get hidden state for the last token (pre-logits)
    prompt = (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. "
        "Do not add any other words.\n"
        f"Question: {question}\n"
        "ASSISTANT:"
    )
    img = Image.open(image_path).convert("RGB")
    inputs = processor(images=img, text=prompt, return_tensors="pt").to(DEVICE)
    out = model(**inputs, output_hidden_states=True, use_cache=False)
    # Grab hidden state for last position
    h_last = out.hidden_states[-1][:, -1, :]              # (1, d)
    # Apply steering at hidden state
    h_steer = h_last + alpha * v.to(h_last.dtype).to(h_last.device)
    # Map to logits via the LM head (same as internal final projection)
    # HF LLaVA wraps a LlamaForCausalLM under model.language_model
    lm_head = model.get_output_embeddings()               # weight tied to vocab
    logits = lm_head(h_steer)                             # (1, vocab)
    # Compare Yes vs No
    yes_logit = logits[0, YES_ID].item()
    no_logit  = logits[0, NO_ID].item()
    return "yes" if yes_logit >= no_logit else "no"

def infer_yesno_steered(img_path: str, question: str, alpha: float) -> str:
    return hf_llava_answer_yesno_steered(processor, model, img_path, question, steer_v, alpha)

# Quick smoke on 8 items with steering
print("Steered smoke (alpha=1.0):")
sm = eval_rows(pope["random"][:8], COCO_IMG_ROOT, lambda p,q: infer_yesno_steered(p,q,alpha=1.0))
print("SMOKE (steered):", pct(sm))

YES_ID: 3869 NO_ID: 1939
Steered smoke (alpha=1.0):


                                                                                                                       

SMOKE (steered): {'Accuracy': 100.0, 'Precision': 100.0, 'Recall': 100.0, 'F1': 100.0}




In [19]:
# %% 13) Full POPE run: baseline vs steered (grid over alpha)
import numpy as np, time

def run_full_pope_with_alpha(pope, coco_img_root, alpha):
    return run_full_pope(
        pope, coco_img_root, 
        infer_fn=lambda p,q: infer_yesno_steered(p,q,alpha)
    )

# Baseline (already computed earlier as 'results, overall'), but re-run to time-match
print("\n== Baseline (no steering) ==")
baseline_res, baseline_overall = run_full_pope(pope, COCO_IMG_ROOT, infer_yesno)

# Try a few steering strengths; tune by the overall F1 (or Accuracy).
alphas = [0.25, 0.5, 1.0, 1.5]
grid = []
for a in alphas:
    print(f"\n== Steered (alpha={a}) ==")
    res_a, overall_a = run_full_pope_with_alpha(pope, COCO_IMG_ROOT, a)
    grid.append((a, res_a, overall_a))

# Summarize
def _brief(o): 
    x = pct(o); 
    return f"Acc={x['Accuracy']:.2f}  F1={x['F1']:.2f}"

print("\n== Summary (Overall) ==")
print(f"baseline: {_brief(baseline_overall)}")
for a, _, ov in grid:
    print(f"alpha={a:>4}: {_brief(ov)}")


== Baseline (no steering) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.83, 'Precision': 96.05, 'Recall': 81.0, 'F1': 87.88}
- Popular     : {'Accuracy': 86.93, 'Precision': 91.91, 'Recall': 81.0, 'F1': 86.11}
- Adversarial : {'Accuracy': 84.03, 'Precision': 86.23, 'Recall': 81.0, 'F1': 83.53}
- Overall     : {'Accuracy': 86.6, 'Precision': 91.22, 'Recall': 81.0, 'F1': 85.81}

Total time: 2838.0s for 9000 items

== Steered (alpha=0.25) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.9, 'Precision': 95.84, 'Recall': 81.33, 'F1': 87.99}
- Popular     : {'Accuracy': 86.97, 'Precision': 91.66, 'Recall': 81.33, 'F1': 86.19}
- Adversarial : {'Accuracy': 84.0, 'Precision': 85.92, 'Recall': 81.33, 'F1': 83.56}
- Overall     : {'Accuracy': 86.62, 'Precision': 90.95, 'Recall': 81.33, 'F1': 85.88}

Total time: 2279.9s for 9000 items

== Steered (alpha=0.5) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.9, 'Precision': 95.55, 'Recall': 81.6, 'F1': 88.03}
- Popular     : {'Accuracy': 87.0, 'Precision': 91.48, 'Recall': 81.6, 'F1': 86.26}
- Adversarial : {'Accuracy': 83.97, 'Precision': 85.65, 'Recall': 81.6, 'F1': 83.58}
- Overall     : {'Accuracy': 86.62, 'Precision': 90.71, 'Recall': 81.6, 'F1': 85.91}

Total time: 2245.7s for 9000 items

== Steered (alpha=1.0) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 89.27, 'Precision': 95.38, 'Recall': 82.53, 'F1': 88.49}
- Popular     : {'Accuracy': 86.9, 'Precision': 90.43, 'Recall': 82.53, 'F1': 86.3}
- Adversarial : {'Accuracy': 83.7, 'Precision': 84.51, 'Recall': 82.53, 'F1': 83.51}
- Overall     : {'Accuracy': 86.62, 'Precision': 89.88, 'Recall': 82.53, 'F1': 86.05}

Total time: 2245.0s for 9000 items

== Steered (alpha=1.5) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 89.47, 'Precision': 95.05, 'Recall': 83.27, 'F1': 88.77}
- Popular     : {'Accuracy': 86.73, 'Precision': 89.47, 'Recall': 83.27, 'F1': 86.26}
- Adversarial : {'Accuracy': 83.17, 'Precision': 83.1, 'Recall': 83.27, 'F1': 83.18}
- Overall     : {'Accuracy': 86.46, 'Precision': 88.94, 'Recall': 83.27, 'F1': 86.01}

Total time: 2327.1s for 9000 items

== Summary (Overall) ==
baseline: Acc=86.60  F1=85.81
alpha=0.25: Acc=86.62  F1=85.88
alpha= 0.5: Acc=86.62  F1=85.91
alpha= 1.0: Acc=86.62  F1=86.05
alpha= 1.5: Acc=86.46  F1=86.01




In [20]:
# %% A1) Build steering vector: image vs **language-only** (no <image>)
import torch
from PIL import Image

@torch.no_grad()
def _last_hidden_state_langonly(processor, model, question):
    prompt = (
        "USER:\n"   # <-- no <image> token here
        "Answer the question strictly with 'Yes' or 'No'. "
        "Do not add any other words.\n"
        f"Question: {question}\n"
        "ASSISTANT:"
    )
    inputs = processor(text=prompt, return_tensors="pt").to(DEVICE)
    out = model(**inputs, output_hidden_states=True, use_cache=False)
    return out.hidden_states[-1][:, -1, :]  # (1, d)

@torch.no_grad()
def _last_hidden_state_image(processor, model, image, question):
    prompt = (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. "
        "Do not add any other words.\n"
        f"Question: {question}\n"
        "ASSISTANT:"
    )
    inputs = processor(images=image, text=prompt, return_tensors="pt").to(DEVICE)
    out = model(**inputs, output_hidden_states=True, use_cache=False)
    return out.hidden_states[-1][:, -1, :]  # (1, d)

def build_steer_langonly(pope, coco_img_root, processor, model, split="random", num_calib=400, seed=0):
    torch.manual_seed(seed)
    rows = pope[split][:num_calib]
    vecs = []
    for r in tqdm(rows, desc="Calibrating (image v. lang-only)", leave=False):
        p = os.path.join(coco_img_root, r["image"])
        if not os.path.exists(p): 
            continue
        img = Image.open(p).convert("RGB")
        h_img  = _last_hidden_state_image(processor, model, img, r["question"])
        h_lang = _last_hidden_state_langonly(processor, model, r["question"])
        vecs.append((h_img - h_lang).cpu())
    v = torch.stack(vecs).mean(0).squeeze(0)
    v = v / (v.norm(p=2) + 1e-8)
    return v

steer_v_lang = build_steer_langonly(pope, COCO_IMG_ROOT, processor, model, split="random", num_calib=600)
print("steer_v_lang:", tuple(steer_v_lang.shape))

                                                                                                                       

steer_v_lang: (4096,)




In [21]:
# %% A2) Re-run with the new vector (try a small alpha grid)
def infer_yesno_steered_lang(img_path, q, alpha=1.0):
    return hf_llava_answer_yesno_steered(processor, model, img_path, q, steer_v_lang, alpha)

for a in [0.5, 1.0, 1.5]:
    print(f"\n== Steered (lang-only, alpha={a}) ==")
    _ = run_full_pope(pope, COCO_IMG_ROOT, lambda p,q: infer_yesno_steered_lang(p,q,a))


== Steered (lang-only, alpha=0.5) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.87, 'Precision': 96.27, 'Recall': 80.87, 'F1': 87.9}
- Popular     : {'Accuracy': 87.03, 'Precision': 92.24, 'Recall': 80.87, 'F1': 86.18}
- Adversarial : {'Accuracy': 84.23, 'Precision': 86.7, 'Recall': 80.87, 'F1': 83.68}
- Overall     : {'Accuracy': 86.71, 'Precision': 91.57, 'Recall': 80.87, 'F1': 85.89}

Total time: 2241.0s for 9000 items

== Steered (lang-only, alpha=1.0) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.63, 'Precision': 96.32, 'Recall': 80.33, 'F1': 87.6}
- Popular     : {'Accuracy': 86.97, 'Precision': 92.62, 'Recall': 80.33, 'F1': 86.04}
- Adversarial : {'Accuracy': 84.17, 'Precision': 87.0, 'Recall': 80.33, 'F1': 83.54}
- Overall     : {'Accuracy': 86.59, 'Precision': 91.82, 'Recall': 80.33, 'F1': 85.69}

Total time: 2263.1s for 9000 items

== Steered (lang-only, alpha=1.5) ==

Running split: random


                                                                                                                       


Running split: popular


                                                                                                                       


Running split: adversarial


                                                                                                                       


POPE (COCO) results:
- Random      : {'Accuracy': 88.4, 'Precision': 96.45, 'Recall': 79.73, 'F1': 87.3}
- Popular     : {'Accuracy': 86.73, 'Precision': 92.71, 'Recall': 79.73, 'F1': 85.73}
- Adversarial : {'Accuracy': 84.03, 'Precision': 87.24, 'Recall': 79.73, 'F1': 83.32}
- Overall     : {'Accuracy': 86.39, 'Precision': 91.98, 'Recall': 79.73, 'F1': 85.42}

Total time: 2287.7s for 9000 items




In [23]:
# %% B1) Capture a layer's output and build v there (robust to HF variants)
import contextlib, torch
from PIL import Image

def _get_llama_layers_from_llava(model):
    """
    Return the list of decoder layers (LlamaDecoderLayer) robustly across HF LLaVA variants.
    """
    # Common: LlavaForConditionalGeneration has .language_model as LlamaForCausalLM
    if hasattr(model, "language_model"):
        lm = model.language_model
        # Case A: language_model is LlamaForCausalLM with .model (LlamaModel)
        if hasattr(lm, "model") and hasattr(lm.model, "layers"):
            return lm.model.layers
        # Case B: language_model is already LlamaModel
        if hasattr(lm, "layers"):
            return lm.layers
    # Fallbacks (rare)
    if hasattr(model, "model") and hasattr(model.model, "layers"):
        return model.model.layers
    raise AttributeError("Could not locate LLaMA decoder layers in this HF LLaVA build.")

LAYERS = _get_llama_layers_from_llava(model)
NUM_LAYERS = len(LAYERS)
print(f"Found {NUM_LAYERS} decoder layers.")
STEER_LAYER_IDX = min(28, NUM_LAYERS - 1)  # pick a high-ish layer safely

def _forward_with_layer_capture(inputs, layer_idx):
    """
    Run a forward pass and capture the output of the chosen decoder layer.
    Returns tensor of shape (1, hidden_size) for the **last token**.
    """
    captured = {}
    target_layer = LAYERS[layer_idx]

    def hook_fn(module, inp, out):
        # out: (batch, seq, hidden) -- we take the last token
        captured["h"] = out[:, -1, :].detach()
        return out

    with torch.no_grad(), contextlib.ExitStack() as stack:
        handle = target_layer.register_forward_hook(hook_fn)
        stack.enter_context(handle)
        _ = model(**inputs, output_hidden_states=False, use_cache=False)
    return captured.get("h")  # (1, hidden)

@torch.no_grad()
def _layer_hidden_image(question, image, layer_idx):
    prompt = (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. Do not add any other words.\n"
        f"Question: {question}\nASSISTANT:"
    )
    inputs = processor(images=image, text=prompt, return_tensors="pt").to(DEVICE)
    return _forward_with_layer_capture(inputs, layer_idx)

@torch.no_grad()
def _layer_hidden_langonly(question, layer_idx):
    # no <image> → pure language prior
    prompt = (
        "USER:\n"
        "Answer the question strictly with 'Yes' or 'No'. Do not add any other words.\n"
        f"Question: {question}\nASSISTANT:"
    )
    inputs = processor(text=prompt, return_tensors="pt").to(DEVICE)
    return _forward_with_layer_capture(inputs, layer_idx)

def build_midlayer_vector(pope, coco_img_root, layer_idx=STEER_LAYER_IDX, num_calib=600):
    vecs = []
    rows = pope["random"][:num_calib]
    for r in tqdm(rows, desc=f"Build v @ layer {layer_idx}", leave=False):
        p = os.path.join(coco_img_root, r["image"])
        if not os.path.exists(p): 
            continue
        img = Image.open(p).convert("RGB")
        h_img  = _layer_hidden_image(r["question"], img, layer_idx)
        h_lang = _layer_hidden_langonly(r["question"], layer_idx)
        if h_img is None or h_lang is None:
            continue
        vecs.append((h_img - h_lang).cpu())
    if not vecs:
        raise RuntimeError("No calibration vectors collected — check image paths/split names.")
    v = torch.stack(vecs).mean(0).squeeze(0)
    v = v / (v.norm(p=2) + 1e-8)
    return v

steer_v_mid = build_midlayer_vector(pope, COCO_IMG_ROOT, layer_idx=STEER_LAYER_IDX, num_calib=600)
print("mid-layer v:", tuple(steer_v_mid.shape), "layer idx:", STEER_LAYER_IDX)

Found 32 decoder layers.


                                                                                                                       

mid-layer v: (4096,) layer idx: 28




In [24]:
# %% B2) Inference with mid-layer steering (hook adds α·v to chosen layer's output)
import contextlib

@torch.no_grad()
def hf_llava_yesno_midlayer_steer(img_path, question, layer_idx, v, alpha=1.0):
    prompt = (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. Do not add any other words.\n"
        f"Question: {question}\nASSISTANT:"
    )
    img = Image.open(img_path).convert("RGB")
    inputs = processor(images=img, text=prompt, return_tensors="pt").to(DEVICE)

    target_layer = LAYERS[layer_idx]

    def hook_fn(module, inp, out):
        # out: (batch, seq, hidden) → add α·v to all time-steps (cheap), or only last:
        add = alpha * v.to(out.dtype).to(out.device)
        return out + add  # broadcast over (batch, seq, hidden)

    with torch.inference_mode():
        handle = target_layer.register_forward_hook(hook_fn)
        output_ids = model.generate(
            **inputs,
            do_sample=False,
            max_new_tokens=8,
        )
        handle.remove()

    out_txt = processor.batch_decode(output_ids, skip_special_tokens=True)[0]
    return normalize_yesno(out_txt)

def infer_yesno_mid(img_path, q, alpha=1.0):
    return hf_llava_yesno_midlayer_steer(img_path, q, STEER_LAYER_IDX, steer_v_mid, alpha)

# Quick sanity on a few samples
print("Mid-layer steered smoke:")
print(pct(eval_rows(pope["random"][:16], COCO_IMG_ROOT, lambda p,q: infer_yesno_mid(p,q,alpha=1.0))))

Mid-layer steered smoke:


                                                                                                                       

{'Accuracy': 100.0, 'Precision': 100.0, 'Recall': 100.0, 'F1': 100.0}




In [25]:
# %% Full-cycle runner for mid-layer steering
import time, csv

def run_full_pope_with_mid_steer(pope, coco_img_root, alpha=1.0):
    """
    Uses infer_yesno_mid(img_path, q, alpha) which you validated in B2.
    Prints per-split + overall metrics and returns (results_dict, overall_dict).
    """
    def infer_fn(p, q): 
        return infer_yesno_mid(p, q, alpha=alpha)

    t0 = time.time()
    results = {}
    all_rows = []
    for split in ("random","popular","adversarial"):
        print(f"\nRunning split (alpha={alpha}): {split}")
        s = eval_rows(pope[split], coco_img_root, infer_fn)
        results[split] = s
        all_rows.extend(pope[split])
    overall = eval_rows(all_rows, coco_img_root, infer_fn)
    t1 = time.time()

    print("\nPOPE (COCO) mid-layer steered results:")
    for k in ("random","popular","adversarial"):
        print(f"- {k.title():12}:", pct(results[k]))
    print(f"- Overall     :", pct(overall))
    print(f"\nTotal time: {t1 - t0:.1f}s for {sum(len(pope[s]) for s in ('random','popular','adversarial'))} items")
    return results, overall

def save_predictions_mid(pope, coco_img_root, out_csv, alpha=1.0):
    """
    Saves per-question predictions for the steered run.
    """
    with open(out_csv, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["split","image","question","gt_answer","pred_answer","alpha","layer_idx"])
        for split in ("random","popular","adversarial"):
            for r in tqdm(pope[split], leave=False, desc=f"Saving {split} (alpha={alpha})"):
                img_file = r["image"]
                img_path = img_file if os.path.isabs(img_file) else os.path.join(coco_img_root, img_file)
                if not os.path.exists(img_path):
                    continue
                pred = infer_yesno_mid(img_path, r["question"], alpha=alpha)
                w.writerow([split, img_file, r["question"], r["answer"], pred, alpha, STEER_LAYER_IDX])

In [26]:
# %% Run one full steered evaluation (set your alpha here)
ALPHA = 1.0  # try 0.5, 1.0, 1.5, 2.0
results_mid, overall_mid = run_full_pope_with_mid_steer(pope, COCO_IMG_ROOT, alpha=ALPHA)
print("\nOverall (mid-layer, alpha =", ALPHA, "):", pct(overall_mid))


Running split (alpha=1.0): random


                                                                                                                       


Running split (alpha=1.0): popular


                                                                                                                       


Running split (alpha=1.0): adversarial


                                                                                                                       


POPE (COCO) mid-layer steered results:
- Random      : {'Accuracy': 88.9, 'Precision': 96.27, 'Recall': 80.93, 'F1': 87.94}
- Popular     : {'Accuracy': 87.07, 'Precision': 92.25, 'Recall': 80.93, 'F1': 86.22}
- Adversarial : {'Accuracy': 84.27, 'Precision': 86.71, 'Recall': 80.93, 'F1': 83.72}
- Overall     : {'Accuracy': 86.74, 'Precision': 91.58, 'Recall': 80.93, 'F1': 85.93}

Total time: 2735.2s for 9000 items

Overall (mid-layer, alpha = 1.0 ): {'Accuracy': 86.74, 'Precision': 91.58, 'Recall': 80.93, 'F1': 85.93}




In [36]:
def normalize_yesno_compat(s: str) -> str:
    """Return exactly 'yes' or 'no' to match eval_rows expectations."""
    t = (s or "").strip().lower()
    # common variants
    if t.startswith("y"): return "yes"
    if t.startswith("n"): return "no"
    # fallback: search tokens
    if "yes" in t: return "yes"
    if "no"  in t: return "no"
    # absolute fallback: default to 'no'
    return "no"

In [37]:
import torch, numpy as np, os
from PIL import Image

# Map output text to strict Yes/No
def strict_yesno(text: str) -> str:
    t = text.strip().lower()
    return "Yes" if t.startswith("y") else "No"

# Build a Yes/No-only prompt (same style as yours)
def yn_prompt(question: str) -> str:
    return (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. Do not add any other words.\n"
        f"Question: {question}\nASSISTANT:"
    )

# Token IDs for "yes"/"no" (robust to BPE variants)
def get_yes_no_ids(tokenizer):
    yes_ids = [tokenizer.convert_tokens_to_ids("Yes"),
               tokenizer.convert_tokens_to_ids("yes")]
    no_ids  = [tokenizer.convert_tokens_to_ids("No"),
               tokenizer.convert_tokens_to_ids("no")]
    yes_ids  = [i for i in yes_ids if i is not None and i >= 0]
    no_ids   = [i for i in no_ids  if i is not None and i >= 0]
    # Fallback: first-subtoken of the string
    if not yes_ids:
        yes_ids = [tokenizer("Yes", add_special_tokens=False).input_ids[0]]
    if not no_ids:
        no_ids = [tokenizer("No", add_special_tokens=False).input_ids[0]]
    return list(dict.fromkeys(yes_ids)), list(dict.fromkeys(no_ids))

YES_IDS, NO_IDS = get_yes_no_ids(processor.tokenizer)

# Get logits for the last generated position without sampling
@torch.no_grad()
def get_last_logits(model, inputs):
    out = model(**inputs, use_cache=False, return_dict=True, output_hidden_states=False)
    # last position in the prompt; we want NEXT-token distribution, so take last logits row
    return out.logits[:, -1, :]  # (B,V)

# Convert yes/no logits to probability
def yes_prob_from_logits(logits, yes_ids=YES_IDS, no_ids=NO_IDS):
    # pool over possible tokenizations
    yes_logit = logits[..., yes_ids].logsumexp(dim=-1)
    no_logit  = logits[...,  no_ids].logsumexp(dim=-1)
    # binary softmax
    m = torch.stack([yes_logit, no_logit], dim=-1)
    p = torch.softmax(m, dim=-1)[..., 0]
    return p  # (B,)


In [38]:
#A1 Safer mid-layer hook (last-token only + recentre)
class MidLayerSteerer:
    def __init__(self, v, alpha_base=0.8, last_token_only=True, recenter=True):
        v = v.float()
        self.v = v / (v.norm(p=2) + 1e-8)            # unit direction
        self.alpha_base = float(alpha_base)
        self.last_token_only = last_token_only
        self.recenter = recenter
        self.handle = None
        self.gates = None  # set per batch

    def _hook(self, module, inp, out):
        # out: (B, T, D)
        B, T, D = out.shape
        v = self.v.to(out.device, out.dtype).view(1,1,D)
        g = self.gates
        if g is None:
            g = torch.full((B,1,1), 0.25, dtype=out.dtype, device=out.device)
        else:
            g = g.view(B,1,1).to(out.dtype).to(out.device)
        add = self.alpha_base * g * v
        if self.last_token_only:
            mask = torch.zeros((B,T,1), dtype=out.dtype, device=out.device)
            mask[:, -1, :] = 1.0
            out = out + add * mask
        else:
            out = out + add
        if self.recenter:
            out = out - out.mean(dim=-1, keepdim=True)
        return out

    def attach(self, transformer_block):
        self.handle = transformer_block.register_forward_hook(self._hook)

    def detach(self):
        if self.handle is not None:
            self.handle.remove()
            self.handle = None

In [39]:
@torch.no_grad()
def infer_yesno_gated_mid(img_path, question, layer_idx=None, v=None, alpha_base=0.8):
    assert v is not None, "Provide the steering vector v (e.g., steer_v_mid)."
    LIDX = STEER_LAYER_IDX if layer_idx is None else layer_idx
    block = LAYERS[LIDX]

    from PIL import Image
    img = Image.open(img_path).convert("RGB")
    prompt = yn_prompt(question)
    inputs = processor(images=img, text=prompt, return_tensors="pt").to(DEVICE)

    gate = compute_gate_for_item(model, processor, img, question)

    steerer = MidLayerSteerer(v, alpha_base=alpha_base, last_token_only=True, recenter=True)
    steerer.gates = torch.tensor([gate], device=DEVICE)

    handle = None
    try:
        handle = block.register_forward_hook(steerer._hook)
        out_ids = model.generate(**inputs, do_sample=False, max_new_tokens=8)
    finally:
        if handle is not None:
            handle.remove()

    txt = processor.batch_decode(out_ids, skip_special_tokens=True)[0]
    return normalize_yesno_compat(txt)

In [40]:
from collections import Counter
smoke_preds = [infer_yesno_gated_mid(os.path.join(COCO_IMG_ROOT, r["image"]), r["question"], v=steer_v_mid, alpha_base=0.8)
               for r in pope["random"][:16]]
print("Gated-mid smoke acc:", pct(eval_rows(pope["random"][:16], COCO_IMG_ROOT,
      lambda p,q: infer_yesno_gated_mid(p,q, v=steer_v_mid, alpha_base=0.8))))
print("Pred distribution:", Counter(smoke_preds))

                                                                                                                       

Gated-mid smoke acc: {'Accuracy': 50.0, 'Precision': 50.0, 'Recall': 100.0, 'F1': 66.67}
Pred distribution: Counter({'yes': 16})




In [42]:
# B1. Logit hook + inference
class YesNoLogitBias:
    def __init__(self, tokenizer, bias_yes=-0.25, bias_no=+0.05):
        self.bias_yes = float(bias_yes)
        self.bias_no  = float(bias_no)
        self.yes_ids, self.no_ids = get_yes_no_ids(tokenizer)
        self.handle = None

    def _hook(self, module, inp, out):
        # out: (B,T,V)
        if self.bias_yes != 0:
            out[..., self.yes_ids] = out[..., self.yes_ids] + self.bias_yes
        if self.bias_no != 0:
            out[..., self.no_ids]  = out[..., self.no_ids]  + self.bias_no
        return out

    def attach(self, lm_head):
        self.handle = lm_head.register_forward_hook(self._hook)

    def detach(self):
        if self.handle: 
            self.handle.remove()
            self.handle = None

@torch.no_grad()
def infer_yesno_logit_bias(img_path, question, bias_yes=-0.25, bias_no=+0.05):
    prompt = yn_prompt(question)
    from PIL import Image
    img = Image.open(img_path).convert("RGB")
    inputs = processor(images=img, text=prompt, return_tensors="pt").to(DEVICE)

    def _hook(module, inp, out):
        # out: (B,T,V)
        yes_ids, no_ids = YES_IDS, NO_IDS
        if bias_yes != 0:
            out[..., yes_ids] = out[..., yes_ids] + bias_yes
        if bias_no != 0:
            out[..., no_ids]  = out[..., no_ids]  + bias_no
        return out

    handle = None
    try:
        handle = model.lm_head.register_forward_hook(_hook)
        out_ids = model.generate(**inputs, do_sample=False, max_new_tokens=8)
    finally:
        if handle: handle.remove()

    txt = processor.batch_decode(out_ids, skip_special_tokens=True)[0]
    return normalize_yesno_compat(txt)

# Smoke
from collections import Counter
smoke_preds = [infer_yesno_logit_bias(os.path.join(COCO_IMG_ROOT, r["image"]), r["question"])
               for r in pope["random"][:16]]
print("Logit-bias smoke acc:", pct(eval_rows(pope["random"][:16], COCO_IMG_ROOT,
      lambda p,q: infer_yesno_logit_bias(p,q))))
print("Pred distribution:", Counter(smoke_preds))

                                                                                                                       

Logit-bias smoke acc: {'Accuracy': 50.0, 'Precision': 50.0, 'Recall': 100.0, 'F1': 66.67}
Pred distribution: Counter({'yes': 16})




In [43]:
# C1. Collect p(Yes) on a dev slice and fit τ
@torch.no_grad()
def yes_prob_unsteered(img_path, question):
    prompt = yn_prompt(question)
    img = Image.open(img_path).convert("RGB")
    inputs = processor(images=img, text=prompt, return_tensors="pt").to(DEVICE)
    logits = get_last_logits(model, inputs)  # (1,V)
    p = yes_prob_from_logits(logits)         # (1,)
    return float(p.item())

def fit_tau(dev_rows, coco_img_root, taus=None, metric="accuracy"):
    if taus is None:
        taus = np.linspace(0.30, 0.70, 81)  # 0.30..0.70 step 0.005
    p_yes, y = [], []
    for r in dev_rows:
        img_file = r["image"]
        img_path = img_file if os.path.isabs(img_file) else os.path.join(coco_img_root, img_file)
        if not os.path.exists(img_path): 
            continue
        p = yes_prob_unsteered(img_path, r["question"])
        p_yes.append(p)
        y.append(1 if r["answer"].lower() == "yes" else 0)
    p_yes = np.array(p_yes); y = np.array(y)

    best_tau, best_val = 0.5, -1.0
    for t in taus:
        yhat = (p_yes > t).astype(int)
        if metric == "f1":
            tp = ((yhat==1)&(y==1)).sum(); fp = ((yhat==1)&(y==0)).sum(); fn = ((yhat==0)&(y==1)).sum()
            prec = tp/(tp+fp+1e-9); rec = tp/(tp+fn+1e-9)
            val = 2*prec*rec/(prec+rec+1e-9)
        else:
            val = (yhat == y).mean()
        if val > best_val:
            best_val, best_tau = val, t
    return float(best_tau), float(best_val)

In [44]:
def infer_yesno_thresholded(img_path, question, tau):
    p = yes_prob_unsteered(img_path, question)
    return "yes" if p > tau else "no"

# Smoke
from collections import Counter
# Fit τ on a small mixed slice (adjust N if needed)
dev_mix = pope["random"][:120] + pope["popular"][:120] + pope["adversarial"][:120]
tau_star, val = fit_tau(dev_mix, COCO_IMG_ROOT, metric="accuracy")
print(f"Fitted τ ≈ {tau_star:.3f} (dev acc {val:.3f})")
smoke_preds = [infer_yesno_thresholded(os.path.join(COCO_IMG_ROOT, r["image"]), r["question"], tau_star)
               for r in pope["random"][:16]]
print("Threshold smoke acc:", pct(eval_rows(pope["random"][:16], COCO_IMG_ROOT,
      lambda p,q: infer_yesno_thresholded(p,q,tau_star))))
print("Pred distribution:", Counter(smoke_preds))

Fitted τ ≈ 0.300 (dev acc 0.886)


                                                                                                                       

Threshold smoke acc: {'Accuracy': 100.0, 'Precision': 100.0, 'Recall': 100.0, 'F1': 100.0}
Pred distribution: Counter({'yes': 8, 'no': 8})




In [45]:
import os, time, numpy as np
from PIL import Image
import torch, csv

def yn_prompt(question: str) -> str:
    return (
        "USER: <image>\n"
        "Answer the question strictly with 'Yes' or 'No'. Do not add any other words.\n"
        f"Question: {question}\nASSISTANT:"
    )

@torch.no_grad()
def get_last_logits(model, inputs):
    out = model(**inputs, use_cache=False, return_dict=True, output_hidden_states=False)
    return out.logits[:, -1, :]  # (B,V)

# Robust “yes/no” token pooling for probability
def _get_yes_no_ids(tokenizer):
    yes_ids = [tokenizer.convert_tokens_to_ids("Yes"),
               tokenizer.convert_tokens_to_ids("yes")]
    no_ids  = [tokenizer.convert_tokens_to_ids("No"),
               tokenizer.convert_tokens_to_ids("no")]
    yes_ids = [i for i in yes_ids if i is not None and i >= 0]
    no_ids  = [i for i in no_ids  if i is not None and i >= 0]
    if not yes_ids:
        yes_ids = [tokenizer("Yes", add_special_tokens=False).input_ids[0]]
    if not no_ids:
        no_ids  = [tokenizer("No",  add_special_tokens=False).input_ids[0]]
    # dedupe
    yes_ids = list(dict.fromkeys(yes_ids)); no_ids = list(dict.fromkeys(no_ids))
    return yes_ids, no_ids

YES_IDS, NO_IDS = _get_yes_no_ids(processor.tokenizer)

def yes_prob_from_logits(logits, yes_ids=YES_IDS, no_ids=NO_IDS):
    yes_logit = logits[..., yes_ids].logsumexp(dim=-1)
    no_logit  = logits[...,  no_ids].logsumexp(dim=-1)
    m = torch.stack([yes_logit, no_logit], dim=-1)
    return torch.softmax(m, dim=-1)[..., 0]  # p(yes)

@torch.no_grad()
def yes_prob_unsteered(img_path, question):
    prompt = yn_prompt(question)
    img = Image.open(img_path).convert("RGB")
    inputs = processor(images=img, text=prompt, return_tensors="pt").to(DEVICE)
    logits = get_last_logits(model, inputs)
    p = yes_prob_from_logits(logits)
    return float(p.item())

def fit_tau(dev_rows, coco_img_root, taus=None, metric="accuracy"):
    if taus is None:
        taus = np.linspace(0.30, 0.70, 81)  # 0.30..0.70 step 0.005
    p_yes, y = [], []
    for r in dev_rows:
        img_file = r["image"]
        img_path = img_file if os.path.isabs(img_file) else os.path.join(coco_img_root, img_file)
        if not os.path.exists(img_path):
            continue
        p = yes_prob_unsteered(img_path, r["question"])
        p_yes.append(p)
        y.append(1 if r["answer"].lower().startswith("y") else 0)
    p_yes = np.array(p_yes); y = np.array(y)
    best_tau, best_val = 0.5, -1.0
    for t in taus:
        yhat = (p_yes > t).astype(int)
        if metric == "f1":
            tp = ((yhat==1)&(y==1)).sum(); fp = ((yhat==1)&(y==0)).sum(); fn = ((yhat==0)&(y==1)).sum()
            prec = tp/(tp+fp+1e-9); rec = tp/(tp+fn+1e-9)
            val = 2*prec*rec/(prec+rec+1e-9)
        else:
            val = (yhat == y).mean()
        if val > best_val:
            best_val, best_tau = val, t
    return float(best_tau), float(best_val)

In [46]:
def infer_yesno_thresholded(img_path, question, tau):
    p = yes_prob_unsteered(img_path, question)
    return "yes" if p > tau else "no"

def run_pope_thresholded(pope, coco_img_root, tau):
    def infer_fn(p,q): return infer_yesno_thresholded(p,q,tau)
    t0 = time.time()
    res = {}
    all_rows = []
    for split in ("random","popular","adversarial"):
        print(f"\nRunning split [Threshold τ={tau:.3f}]: {split}")
        s = eval_rows(pope[split], coco_img_root, infer_fn)
        res[split] = s
        all_rows.extend(pope[split])
    overall = eval_rows(all_rows, coco_img_root, infer_fn)
    dt = time.time()-t0
    print("\nPOPE (COCO) Thresholded results:")
    for k in ("random","popular","adversarial"):
        print(f"- {k.title():12}:", pct(res[k]))
    print(f"- Overall     :", pct(overall))
    print(f"\nTotal time: {dt:.1f}s for {sum(len(pope[s]) for s in ('random','popular','adversarial'))} items")
    return res, overall

def save_predictions_thresholded(pope, coco_img_root, out_csv, tau):
    with open(out_csv, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["split","image","question","gt_answer","pred_answer","tau"])
        for split in ("random","popular","adversarial"):
            for r in pope[split]:
                img_file = r["image"]
                img_path = img_file if os.path.isabs(img_file) else os.path.join(coco_img_root, img_file)
                if not os.path.exists(img_path):
                    continue
                pred = infer_yesno_thresholded(img_path, r["question"], tau)
                w.writerow([split, img_file, r["question"], r["answer"], pred, tau])

In [47]:
# Pick a mixed calibration slice (tweak counts to your liking)
N_DEV_PER_SPLIT = 300  # try 200–400; larger is stabler but slower
dev_mix = pope["random"][:N_DEV_PER_SPLIT] + pope["popular"][:N_DEV_PER_SPLIT] + pope["adversarial"][:N_DEV_PER_SPLIT]

tau_star, dev_score = fit_tau(dev_mix, COCO_IMG_ROOT, metric="accuracy")
print(f"Fitted τ ≈ {tau_star:.3f} on dev (acc {dev_score:.3f})")

# Full evaluation
results_thr, overall_thr = run_pope_thresholded(pope, COCO_IMG_ROOT, tau=tau_star)
print("\nOverall (thresholded, τ =", f"{tau_star:.3f}", "):", pct(overall_thr))

# Optional: save per-question predictions
OUT_CSV = f"pope_predictions_thresholded_tau{tau_star:.3f}.csv"
save_predictions_thresholded(pope, COCO_IMG_ROOT, OUT_CSV, tau_star)
print("Saved:", OUT_CSV)

Fitted τ ≈ 0.300 on dev (acc 0.872)

Running split [Threshold τ=0.300]: random


                                                                                                                       


Running split [Threshold τ=0.300]: popular


                                                                                                                       


Running split [Threshold τ=0.300]: adversarial


                                                                                                                       


POPE (COCO) Thresholded results:
- Random      : {'Accuracy': 89.53, 'Precision': 95.27, 'Recall': 83.2, 'F1': 88.83}
- Popular     : {'Accuracy': 87.0, 'Precision': 90.04, 'Recall': 83.2, 'F1': 86.49}
- Adversarial : {'Accuracy': 83.4, 'Precision': 83.53, 'Recall': 83.2, 'F1': 83.37}
- Overall     : {'Accuracy': 86.64, 'Precision': 89.36, 'Recall': 83.2, 'F1': 86.17}

Total time: 2210.1s for 9000 items

Overall (thresholded, τ = 0.300 ): {'Accuracy': 86.64, 'Precision': 89.36, 'Recall': 83.2, 'F1': 86.17}
Saved: pope_predictions_thresholded_tau0.300.csv
