In [None]:
import sys
import os
project_root = os.path.abspath("..")
sys.path.insert(0, project_root)

In [None]:
from os import path
import os
import torch
from transformers import AutoProcessor

In [None]:
# Configuration for Figure 8a: Ground Truth Deviation Analysis
MODELS_CONFIG = {
    "llama-11b": {"layer": 14, "data_dir": "embeds/coco_deviation/llama-11b"},
    "llava-7b": {"layer": 12, "data_dir": "embeds/coco_deviation/llava-7b"},
}

In [None]:
from utils.extract_embeds import VLMForExtraction

# We'll load the tokenizer for each model in the loop below

In [None]:
import numpy as np
from scipy import stats
from pycocotools.coco import COCO
from utils.coco import load_coco_annotations, get_subject_spatial_id
from utils.linalg import project_onto_plane, euclidean_distances
import json

# Load COCO annotations once
coco = load_coco_annotations("data/coco/annotations/instances_val2017.json")

def verdict(sequences, pos_word, neg_word, tokenizer):
    response = tokenizer.decode(sequences[0]).split("assistant")[-1].split("ASSISTANT")[-1]
    if pos_word.lower() in response.lower() and neg_word.lower() not in response.lower():
        return True
    elif neg_word.lower() in response.lower() and pos_word.lower() not in response.lower():
        return False
    else:
        return None

def mann_whitney_report(blue, red):
    blue = np.asarray(blue)
    red = np.asarray(red)
    # two-sided test
    U_two, p_two = stats.mannwhitneyu(blue, red, alternative="two-sided", method="auto")
    # one-sided: blue > red
    U_greater, p_greater = stats.mannwhitneyu(blue, red, alternative="greater", method="auto")
    # one-sided: blue < red
    U_less, p_less = stats.mannwhitneyu(blue, red, alternative="less", method="auto")
    print(f"Mann–Whitney U (two-sided): U = {U_two}, p = {p_two:.3g}")
    print(f"Mann–Whitney U (blue > red): U = {U_greater}, p = {p_greater:.3g}")
    print(f"Mann–Whitney U (blue < red): U = {U_less}, p = {p_less:.3g}")
    return {
        "U_two": U_two, "p_two": p_two,
        "U_greater": U_greater, "p_greater": p_greater,
        "U_less": U_less, "p_less": p_less,
    }

In [None]:
def pipeline(model_name, data_dir, layer, tokenizer):
    """Run ground truth deviation analysis for Figure 8a"""
    print(f"\n{'='*60}")
    print(f"Analyzing {model_name} at layer {layer}")
    print(f"{'='*60}\n")
    
    universal_id = torch.load(f"embeds/universal_id/{model_name}.pt")
    x_axis = torch.load(f"embeds/universal_id/{model_name}_x.pt")[layer]["universal"].to(torch.float32).numpy()
    y_axis = torch.load(f"embeds/universal_id/{model_name}_y.pt")[layer]["universal"].to(torch.float32).numpy()
    pos_distances = []
    neg_distances = []
    scores = []
    
    from matplotlib import pyplot as plt
    total = len(os.listdir(data_dir))
    correct = 0
    bad = []
    log = []
    
    for fname in os.listdir(data_dir):
        object_a, object_b, pos_word, neg_word, id = fname.split("_")
        object_a, object_b = object_a.replace("+", " "), object_b.replace("+", " ")
        id = int(id)
        token_a, token_b = [
            tokenizer.tokenize(" " + tw)[-1] for tw in [object_a, object_b]
        ]
        
        distances = []
        x_loc = []
        y_loc = []
        gt_x_loc = []
        gt_y_loc = []
        
        for object, token, color in [(object_a, token_a, "red"), (object_b, token_b, "blue")]:
            _, gt_embeds = get_subject_spatial_id(id, object, coco, universal_id[layer]["universal"])
            gt_embeds = gt_embeds.to(torch.float32).numpy()[np.newaxis,:]
            text_embeds = torch.load(path.join(data_dir, fname, "text.pt"))[layer][token].to(torch.float32).numpy()[np.newaxis,:]
            embeds = torch.load(path.join(data_dir, fname, "embeds.pt"))[layer][token].to(torch.float32).numpy() - text_embeds
            
            gt_coords, _ = project_onto_plane(gt_embeds, x_axis, y_axis)
            coords, _ = project_onto_plane(embeds, x_axis, y_axis)
            
            x_loc.append(coords[0, 0])
            gt_x_loc.append(gt_coords[0, 0])
            y_loc.append(coords[0, 1])
            gt_y_loc.append(gt_coords[0, 1])
            
        if pos_word == "right":
            distances.append((x_loc[0] - x_loc[1]).item() - (gt_x_loc[0] - gt_x_loc[1]).item())
        elif pos_word == "left":
            distances.append((x_loc[1] - x_loc[0]).item() - (gt_x_loc[1] - gt_x_loc[0]).item())
        elif pos_word == "below":
            distances.append((y_loc[0] - y_loc[1]).item() - (gt_y_loc[0] - gt_y_loc[1]).item())
        elif pos_word == "above":
            distances.append((y_loc[1] - y_loc[0]).item() - (gt_y_loc[1] - gt_y_loc[0]).item())

        sequences = torch.load(path.join(data_dir, fname, f"sequences.pt"), weights_only=False)
        v = verdict(sequences, pos_word, neg_word, tokenizer)

        if v is None:
            continue
        
        scores.append((distances[0], id))
        if v:
            correct += 1
            pos_distances.extend(distances)
            log.append({"id": id, "distance": distances[0], "verdict": True})
        else:
            bad.append(id)
            neg_distances.extend(distances)
            log.append({"id": id, "distance": distances[0], "verdict": False})
    
    # Print results
    print(f"Accuracy: {correct/total:.2%}")
    
    # Plot histogram
    plt.figure(figsize=(4, 4))
    plt.hist(pos_distances, bins=10, alpha=0.7, density=True, color='steelblue', label='Positive')
    plt.hist(neg_distances, bins=10, alpha=0.7, density=True, color='salmon', label='Negative')
    plt.xlabel('Margin Relative to GT')
    plt.ylabel('Density')
    plt.title(f'{model_name} Layer {layer}')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    # Statistical test
    print(mann_whitney_report(pos_distances, neg_distances))

    # Save metadata for Figure 8b
    os.makedirs("metadata", exist_ok=True)
    output_path = f"metadata/{model_name}_{layer}.json"
    with open(output_path, 'w') as f:
        json.dump(log, f, indent=2)
    print(f"\nSaved metadata to {output_path}")

    return log

In [None]:
# Run analysis for both models (Figure 8a)
for model_name, config in MODELS_CONFIG.items():
    # Load tokenizer for this model
    model_str = VLMForExtraction.get_model_string(model_name)
    tokenizer = AutoProcessor.from_pretrained(model_str).tokenizer
    
    # Run pipeline
    pipeline(
        model_name=model_name,
        data_dir=config["data_dir"],
        layer=config["layer"],
        tokenizer=tokenizer
    )