In [None]:
#Colab setup
!pip -q install -U transformers accelerate bitsandbytes datasets sentencepiece

import torch
import numpy as np
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM

device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)

In [None]:
#Load TruthfulQA

ds = load_dataset("truthful_qa", "generation")
ds

In [None]:
ds["validation"][0]

In [None]:
# Choose split
split_name = "validation"
questions_ds = ds[split_name]
print("Split size:", len(questions_ds))

In [None]:
# Load model with eager attention so output_attentions works

model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.float16,
    attn_implementation="eager",   # IMPORTANT for output_attentions=True
)
model.eval()

In [None]:
# Helpers, Prompt formatting

from transformers.tokenization_utils_base import BatchEncoding

def build_prompt(question: str) -> torch.Tensor:
    messages = [
        {"role": "system", "content": "Answer briefly and directly."},
        {"role": "user", "content": question},
    ]

    try:
        out = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt",
        )
        # Some versions return a tensor; others return BatchEncoding/dict
        if isinstance(out, (BatchEncoding, dict)):
            input_ids = out["input_ids"]
        else:
            input_ids = out  # already a tensor
    except Exception:
        plain_prompt = f"System: Answer briefly and directly.\nUser: {question}\nAssistant:"
        input_ids = tokenizer(plain_prompt, return_tensors="pt")["input_ids"]

    return input_ids

In [None]:
# Top k eigenvalues a laplacian

def laplacian_topk_eigs(A: torch.Tensor, k: int, symmetrize=True):
    """
    A: [n, n] attention adjacency (nonnegative)
    Returns largest k eigenvalues of unnormalized Laplacian L = D - A.
    """
    if symmetrize:
        A = 0.5 * (A + A.T)

    A = torch.clamp(A, min=0.0).to(torch.float32)
    d = A.sum(dim=1)
    L = torch.diag(d) - A

    eigs = torch.linalg.eigvalsh(L).real
    eigs_sorted = torch.sort(eigs).values  # ascending
    k_eff = min(k, eigs_sorted.numel())
    topk = eigs_sorted[-k_eff:]            # largest k (ascending within top-k)

    # Pad on the left with NaNs if too short
    if k_eff < k:
        pad = torch.full((k - k_eff,), float("nan"))
        topk = torch.cat([pad, topk], dim=0)
    return topk  # [k]

In [None]:
# Top k eigenvalues for each attention head

def per_head_topk(attentions, prompt_len, total_len, topk=20):
    """
    attentions: tuple length L, each [1,H,S,S]
    returns eig tensor [L,H,topk] for generated-token block only.
    """
    gen_idx = slice(prompt_len, total_len)

    L = len(attentions)
    H = attentions[0].shape[1]

    eigs_LHK = torch.empty((L, H, topk), dtype=torch.float32)
    for l in range(L):
        # [H, gen, gen]
        att_l = attentions[l][0, :, gen_idx, gen_idx].detach()
        for h in range(H):
            eigs_LHK[l, h] = laplacian_topk_eigs(att_l[h], k=topk, symmetrize=True)

    return eigs_LHK  # [L,H,topk]

In [None]:
TEMPERATURE = 1.0
MAX_NEW_TOKENS = 128
TOPK = 20
N_QUESTIONS = 25   # set larger later; start small to test

# Expected model dims (from your message)
Layer = 22
Head = 32

# Create column names once: eig_l{l}_h{h}_{j}
# NOTE: 22*32*20 = 14080 columns (large but fine)
def make_feature_columns(L, H, K):
    cols = []
    for l in range(L):
        for h in range(H):
            for j in range(K):
                cols.append(f"eig_l{l}_h{h}_{j}")
    return cols

feature_cols = make_feature_columns(Layer, Head, TOPK)

In [None]:
# Main loop: build DataFrame rows

rows = []
failed = 0

for idx in range(min(N_QUESTIONS, len(questions_ds))):
    ex = questions_ds[idx]
    question = ex["question"]

    # Build prompt
    prompt_ids = build_prompt(question).to(model.device)
    prompt_len = prompt_ids.shape[1]

    # Generate answer (so we have generated tokens to analyze)
    with torch.no_grad():
        gen_ids = model.generate(
            prompt_ids,
            max_new_tokens=MAX_NEW_TOKENS,
            temperature=TEMPERATURE,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
        )

    total_len = gen_ids.shape[1]
    gen_len = total_len - prompt_len

    # Skip very short generations (you can choose to keep + pad instead)
    if gen_len < 5:
        failed += 1
        continue

    # Forward pass with attentions on the full sequence
    with torch.no_grad():
        outputs = model(
            input_ids=gen_ids,
            output_attentions=True,
            use_cache=False,
            return_dict=True
        )

    attentions = outputs.attentions
    L = len(attentions)
    H = attentions[0].shape[1]

    # Assert expected shape ((22,32,20))
    if (L != Layer) or (H != Head):
        raise RuntimeError(f"Model dims changed: got L={L}, H={H}, expected L={Layer}, H={Head}")

    # Compute [L,H,TOPK]
    eigs_LHK = per_head_topk(attentions, prompt_len, total_len, topk=TOPK)

    # Flatten to one row (length L*H*TOPK)
    feat = eigs_LHK.reshape(-1).cpu().numpy()

    row = {
        "row_id": idx,
        "question": question,
        "prompt_len": int(prompt_len),
        "gen_len": int(gen_len),
    }
    # Add eigenvalue features
    row.update({c: float(v) for c, v in zip(feature_cols, feat)})

    rows.append(row)

In [None]:
print("Rows kept:", len(rows), "Rows skipped(short):", failed)

In [None]:
import pandas as pd
df_features = pd.DataFrame(rows)
df_features.head()

In [None]:
len(df_features.columns)