In [1]:


#===== form here its in jupiter
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json
import csv
import argparse
from collections import defaultdict, OrderedDict


def _print_table(headers, rows):
    widths = [len(str(h)) for h in headers]
    for r in rows:
        for i, c in enumerate(r):
            widths[i] = max(widths[i], len(str(c)))
    fmt = "  ".join("{:" + str(w) + "s}" for w in widths)
    print(fmt.format(*headers))
    print("  ".join("-" * w for w in widths))
    for r in rows:
        print(fmt.format(*[str(c) for c in r]))


def print_results_table(results_dir: str = os.path.join("models_and_tokenizers", "results"), method: str | None = None) -> None:
    """
    Print a compact results table combining summary.csv (if present) and any
    '*_results.jsonl' files in the directory. Summary rows take precedence; any
    (model, method) pairs missing from the summary are filled from JSONL.

        Columns: Model | Method | EM (%) | F1 (%) | N
    """
    if not os.path.isdir(results_dir):
        print(f"[!] Results directory not found: {results_dir}")
        return

    method_norm = method.lower() if isinstance(method, str) else None

    # Accumulator keyed by (model, method)
    rows_map: "OrderedDict[tuple[str,str], tuple[str,str,str,str,str]]" = OrderedDict()

    # 1) Pull from summary.csv when present (preferred)
    csv_path = os.path.join(results_dir, "summary.csv")
    if os.path.isfile(csv_path):
        try:
            with open(csv_path, "r", encoding="utf-8") as f:
                rdr = csv.DictReader(f)
                for r in rdr:
                    meth = str(r.get("method", ""))
                    if method_norm and meth.lower() != method_norm:
                        continue
                    try:
                        em = float(r.get("em", 0.0) or 0.0) * 100.0
                        f1 = float(r.get("f1", 0.0) or 0.0) * 100.0
                        n = int(r.get("n_examples", 0) or 0)
                    except Exception:
                        em, f1, n = 0.0, 0.0, 0
                    key = (str(r.get("model", "")), meth)
                    rows_map[key] = (
                        key[0], key[1], f"{em:.2f}", f"{f1:.2f}", str(n)
                    )
        except Exception as e:
            print(f"[!] Failed to read {csv_path}: {e}")

    # 2) Aggregate JSONL results and add any missing keys
    jsonl_files = [fn for fn in os.listdir(results_dir) if fn.endswith("_results.jsonl")]
    agg = defaultdict(lambda: {"em_sum": 0.0, "f1_sum": 0.0, "n": 0})
    for name in sorted(jsonl_files):
        path = os.path.join(results_dir, name)
        try:
            with open(path, "r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        rec = json.loads(line)
                    except Exception:
                        continue
                    method_val = str(rec.get("method", ""))
                    if method_norm and method_val.lower() != method_norm:
                        continue
                    key = (str(rec.get("model", "")), method_val)
                    agg[key]["em_sum"] += float(rec.get("em", 0.0))
                    agg[key]["f1_sum"] += float(rec.get("f1", 0.0))
                    agg[key]["n"] += 1
        except Exception as e:
            print(f"[!] Skipping {path}: {e}")

    for key, d in sorted(agg.items()):
        if key in rows_map:
            # Already covered by summary.csv; keep the summary version
            continue
        n = max(d["n"], 1)
        em_pct = 100.0 * d["em_sum"] / n
        f1_pct = 100.0 * d["f1_sum"] / n
        rows_map[key] = (key[0], key[1], f"{em_pct:.2f}", f"{f1_pct:.2f}", str(d["n"]))

    rows = list(rows_map.values())
    if rows:
        headers = ["Model", "Method", "EM (%)", "F1 (%)", "N"]
        _print_table(headers, rows)
    else:
        print(f"[!] No results found in {results_dir}.")


def main():
    parser = argparse.ArgumentParser(description="Print results table from model evaluation.")
    parser.add_argument(
        "--results-dir",
        type=str,
        default=os.path.join("models_and_tokenizers", "results"),
        help="Directory containing results (default: models_and_tokenizers/results).",
    )
    parser.add_argument(
        "--method",
        type=str,
        default=None,
        help="Optional filter by method (exact match, case-insensitive). Omit to show all methods.",
    )
    # Use parse_known_args so it works inside Jupyter (ignores notebook's own flags)
    args, _ = parser.parse_known_args()
    print_results_table(args.results_dir, method=args.method)


if __name__ == "__main__":
    main()


Model                               Method                  EM (%)  F1 (%)  N    
----------------------------------  ----------------------  ------  ------  -----
Qwen/Qwen2.5-7B-Instruct            PTQ4                    18.90   29.92   2048 
Qwen/Qwen2.5-7B-Instruct            AWQ-OT                  12.80   25.20   1750 
Qwen/Qwen2.5-7B-Instruct            AWQ_MIX                 10.46   24.34   10766
Qwen/Qwen2.5-7B-Instruct            Clean                   11.43   21.90   2048 
Qwen/Qwen2.5-7B-Instruct            INFO_VAR_MIX_s0         12.06   22.47   2048 
Qwen/Qwen2.5-7B-Instruct            SOTA_AWQ_PerLayerMixed  11.91   24.06   2048 
Qwen/Qwen2.5-7B-Instruct            SOTA_BNB_NF4            18.90   29.92   2048 
Qwen/Qwen2.5-7B-Instruct            TAQ_PerLayerPerTask     12.65   25.49   2048 
Qwen/Qwen3-4B-Instruct-2507         AWQ-OT                  9.81    24.33   2048 
Qwen/Qwen3-4B-Instruct-2507         AWQ_MIX                 8.54    23.20   2048 
Qwen/Qwen3-4B-In

In [2]:
# This script generates two motivation figures:
# 1) PCA projection of per-layer, per-task semantic directions (synthetic demo)
# 2) Layer-wise relevance/activation metric lines per task (synthetic demo)
#
# Replace the synthetic blocks with your real {d_i^k} and r_i when ready.
import os
import numpy as np
import matplotlib.pyplot as plt

rng = np.random.default_rng(7)

# ----- Config -----
num_layers = 32
hidden_dim = 128
tasks = ["Trivia", "Code", "Math"]

# Prepare output directory for figures
out_dir = os.path.join(os.getcwd(), "viz_results")
os.makedirs(out_dir, exist_ok=True)

# ----- Synthetic semantic directions d_i^k (unit vectors) -----
# Create a base per-layer vector, then add task-specific offsets
base_layer_vecs = rng.normal(size=(num_layers, hidden_dim))
base_layer_vecs /= np.linalg.norm(base_layer_vecs, axis=1, keepdims=True)

task_offsets = {t: rng.normal(scale=0.25, size=(num_layers, hidden_dim)) for t in tasks}

dirs = {}
for t in tasks:
    D = base_layer_vecs + task_offsets[t] + rng.normal(scale=0.05, size=(num_layers, hidden_dim))
    D /= np.linalg.norm(D, axis=1, keepdims=True)
    dirs[t] = D  # shape: [num_layers, hidden_dim]

# ----- PCA over all layers×tasks -----
X = np.concatenate([dirs[t] for t in tasks], axis=0)  # [num_layers * T, hidden_dim]
Xc = X - X.mean(axis=0, keepdims=True)
U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
PCs = Xc @ Vt.T[:, :2]  # first two components
explained = (S**2) / (S**2).sum()
explained2 = explained[:2].sum()

# Split back per task
split_PCs = {}
start = 0
for t in tasks:
    split_PCs[t] = PCs[start:start+num_layers]
    start += num_layers

# ----- Figure 1: PCA scatter labeled by layer, colored by task -----
fig1 = plt.figure(figsize=(9, 5), dpi=140)
for t in tasks:
    pts = split_PCs[t]
    plt.scatter(pts[:, 0], pts[:, 1], alpha=0.75, label=t, s=24)
    # Light annotations for every 4th layer to avoid clutter
    for i in range(0, num_layers, 4):
        x, y = pts[i]
        plt.text(x, y, f"{i}", fontsize=7, ha="center", va="center")

plt.title(f"PCA of Semantic Directions by Layer and Task (var exp ~ {explained2*100:.1f}%)")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.legend(title="Task", frameon=True)
plt.tight_layout()
pca_path = os.path.join(out_dir, "taq_pca_layers.png")
plt.savefig(pca_path, bbox_inches="tight")
plt.close(fig1)

# ----- Synthetic layer-wise relevance r_i per task (higher = keep more bits) -----
# Shape patterns to mimic different task-critical layers
x = np.arange(num_layers)
rel = {}
rel["Trivia"] = 0.6*np.exp(-((x-22)/6.5)**2) + 0.25*np.sin(x/3.5) + 0.2 + rng.normal(scale=0.03, size=num_layers)
rel["Code"]   = 0.55*np.exp(-((x-10)/5.0)**2) + 0.20*np.cos(x/4.0) + 0.18 + rng.normal(scale=0.03, size=num_layers)
rel["Math"]   = 0.50*np.exp(-((x-14)/4.5)**2) + 0.22*np.sin(x/4.4) + 0.16 + rng.normal(scale=0.03, size=num_layers)

# Normalize to [0,1] for a clean axis
for t in tasks:
    v = rel[t]
    v = (v - v.min()) / (v.max() - v.min())
    rel[t] = v

# ----- Figure 2: Lines over layers (one line per task) -----
fig2 = plt.figure(figsize=(9, 5), dpi=140)
for t in tasks:
    plt.plot(x, rel[t], marker="o", linewidth=1.75, markersize=3, label=t)

plt.title("Layer-wise Relevance by Task (higher ⇒ allocate more bits)")
plt.xlabel("Layer")
plt.ylabel("Relevance / Activation-variance proxy (normalized)")
plt.grid(True, alpha=0.3)
plt.legend(title="Task", ncols=len(tasks))
plt.tight_layout()
lines_path = os.path.join(out_dir, "taq_layer_relevance_lines.png")
plt.savefig(lines_path, bbox_inches="tight")
plt.close(fig2)

# ----- Print PCA coords for specific layers -----
request_layers = [0, 8, 16, 24, 32]
print("Requested PCA coordinates (PC1, PC2) per task for selected layers:")
for idx in request_layers:
    if not (0 <= idx < num_layers):
        print(f"  [skip] layer {idx} is out of range (0..{num_layers-1})")
        continue
    for t in tasks:
        pt = split_PCs[t][idx]
        print(f"  Task={t:6s} Layer={idx:2d}: ({pt[0]:.4f}, {pt[1]:.4f})")

pca_path, lines_path

Requested PCA coordinates (PC1, PC2) per task for selected layers:
  Task=Trivia Layer= 0: (0.0485, -0.3452)
  Task=Code   Layer= 0: (-0.0207, -0.0267)
  Task=Math   Layer= 0: (0.0117, -0.2023)
  Task=Trivia Layer= 8: (0.0275, -0.3047)
  Task=Code   Layer= 8: (0.2710, 0.0811)
  Task=Math   Layer= 8: (0.1461, -0.0523)
  Task=Trivia Layer=16: (0.1290, 0.3419)
  Task=Code   Layer=16: (-0.3552, 0.2158)
  Task=Math   Layer=16: (-0.4433, 0.1342)
  Task=Trivia Layer=24: (-0.0294, 0.2398)
  Task=Code   Layer=24: (-0.1665, 0.1724)
  Task=Math   Layer=24: (-0.1192, 0.0244)
  [skip] layer 32 is out of range (0..31)


('/home/amitlevi/work/Per-Task Quantization/viz_results/taq_pca_layers.png',
 '/home/amitlevi/work/Per-Task Quantization/viz_results/taq_layer_relevance_lines.png')

In [10]:
# %% [markdown]
# Per-layer "Prompt Entropy" for Llama-3.1-8B-Instruct across task types
# - Metric from "Layer by Layer: Uncovering Hidden Representations in Language Models"
#   (matrix-based entropy of K = Z Z^T; see §§3.2–3.3). Higher => richer, less-compressed features.
#   Paper: https://arxiv.org/abs/2502.02013

# %%capture

# %%
import os, math, random, json, gc
from typing import List, Dict, Tuple
import torch
import torch.nn.functional as F
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
import matplotlib.pyplot as plt

# -----------------------------
# Config
# -----------------------------
MODEL_ID = "microsoft/Phi-4-mini-instruct"   # requires gated access
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE = torch.float16

# scoring params
MAX_TEXTS_PER_TASK = 128          # keep modest for speed; raise if you have time
MAX_TOKENS = 512                  # cap per text for scoring forward
BATCH_SIZE = 4
RESERVOIR_TOKENS_PER_LAYER = 256  # subsample tokens per layer to bound eigendecomp size
SEED = 7
random.seed(SEED)
torch.manual_seed(SEED)

# -----------------------------
# Load model + tokenizer
# -----------------------------
hf_token = os.environ.get("HUGGING_FACE_HUB_TOKEN", None)
tok_kwargs = dict(trust_remote_code=True, use_fast=False)
mdl_kwargs = dict(torch_dtype=DTYPE, trust_remote_code=True, low_cpu_mem_usage=True, device_map=None)

if hf_token:
    tok_kwargs["token"] = hf_token
    mdl_kwargs["token"] = hf_token

tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, **tok_kwargs)
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **mdl_kwargs
).to(DEVICE).eval()
# padding hygiene (Llama-style)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id
tokenizer.padding_side = "right"

# -----------------------------
# Helper: #layers
# -----------------------------
def get_num_layers_llama(m):
    if hasattr(m, "model") and hasattr(m.model, "layers"):
        return len(m.model.layers)
    if hasattr(m, "model") and hasattr(m.model, "decoder") and hasattr(m.model.decoder, "layers"):
        return len(m.model.decoder.layers)
    return 0

N_LAYERS = get_num_layers_llama(model)
print(f"Model: {MODEL_ID} | Layers: {N_LAYERS} | Device: {DEVICE}")

# -----------------------------
# Datasets & prompt builders
# -----------------------------
def load_texts_for_task(task: str, k: int) -> List[str]:
    """
    Returns a list of *input-only* texts (no labels) to score hidden states.
    """
    if task == "trivia":
        ds = load_dataset("trivia_qa", "rc.nocontext", split="validation")
        idx = list(range(len(ds))); random.shuffle(idx); idx = idx[:k]
        texts = [f"Question: {ds[i]['question']}\nAnswer:" for i in idx]
        return texts

    if task == "math":
        # gsm8k 'test' is smaller and public
        ds = load_dataset("gsm8k", "main", split="test")
        idx = list(range(len(ds))); random.shuffle(idx); idx = idx[:k]
        texts = [f"Solve the math problem step by step.\nProblem: {ds[i]['question']}\nAnswer:" for i in idx]
        return texts

    if task == "code":
        # mbpp has natural-language prompts for code-gen
        try:
            ds = load_dataset("mbpp", split="test")
        except Exception:
            # fallback to train if test missing
            ds = load_dataset("mbpp", split="train")
        idx = list(range(len(ds))); random.shuffle(idx); idx = idx[:k]
        def to_prompt(ex):
            # Some variants store 'text', others ('prompt','text')
            q = ex.get("text") or ex.get("prompt") or ex.get("task_id") or ""
            return f"Write a Python function as specified.\nTask: {q}\nCode:"
        texts = [to_prompt(ds[i]) for i in idx]
        return texts

    raise ValueError(f"Unknown task: {task}")

TASKS = ["trivia", "math", "code"]

# -----------------------------
# Metric: Prompt Entropy (matrix-based Shannon entropy on Gram of token states)
# Ref: §3.2–3.3 'Matrix-Based Entropy' and 'Prompt Entropy' in the paper.
# -----------------------------
@torch.no_grad()
def prompt_entropy_per_layer(
    texts: List[str],
    model,
    tokenizer,
    max_tokens: int,
    batch_size: int,
    reservoir_per_layer: int,
    device: str
) -> List[float]:
    """
    Compute *average* prompt entropy across a set of texts for each transformer layer.
    Implementation:
      - Run with output_hidden_states=True to get H_l \in R^{B,T,D}
      - For each layer, flatten to N x D (N=B*T), reservoir-sample up to R rows,
        center, build Gram G = X X^T / N, eigenvalues -> p, entropy H = -sum p log p.
    Returns:
      List of length n_layers with the mean entropy over texts/batches.
    """
    nL = get_num_layers_llama(model)
    sums = [0.0 for _ in range(nL)]
    counts = [0 for _ in range(nL)]

    # simple reservoir per layer
    def entropy_from_hidden(X: torch.Tensor) -> float:
        # X: [N, D] float16/32 on device
        if X.shape[0] < 2:
            return 0.0
        # reservoir sample up to R rows
        N = X.shape[0]
        R = min(reservoir_per_layer, N)
        if R < N:
            idx = torch.randint(0, N, (R,), device=X.device)
            X = X.index_select(0, idx)
        # center (token-wise)
        X = X.to(torch.float32)
        X = X - X.mean(dim=0, keepdim=True)
        # Gram and eigenvalues
        # scale by R to keep magnitudes stable
        G = (X @ X.T) / max(1, X.shape[0])
        # symmetric -> use eigvalsh
        evals = torch.linalg.eigvalsh(G).clamp(min=0.0)
        s = float(evals.sum().item())
        if s <= 0:
            return 0.0
        p = (evals / s).double()
        H = float((-(p * (p + 1e-12).log())).sum().item())  # natural log
        return H

    # batching over texts
    for start in range(0, len(texts), batch_size):
        batch = texts[start:start+batch_size]
        enc = tokenizer(batch, return_tensors="pt", padding=True, truncation=True, max_length=max_tokens).to(device)
        out = model(**enc, output_hidden_states=True, use_cache=False)
        hs = out.hidden_states  # tuple length nL+1 (0=embeddings)
        for li in range(1, len(hs)):  # 1..nL
            H = hs[li]                       # [B, T, D]
            X = H.reshape(-1, H.shape[-1])   # [N, D]
            val = entropy_from_hidden(X)
            sums[li-1] += val
            counts[li-1] += 1
        # free a bit
        del out, hs, enc
        torch.cuda.empty_cache() if device.startswith("cuda") else None

    # mean across batches
    means = [ (sums[i] / max(1, counts[i])) for i in range(nL) ]
    return means

# -----------------------------
# Run scoring for each task
# -----------------------------
layer_to_series: Dict[str, List[float]] = {}
for task in TASKS:
    print(f"[Scoring] {task} …")
    texts = load_texts_for_task(task, MAX_TEXTS_PER_TASK)
    vals = prompt_entropy_per_layer(
        texts, model, tokenizer,
        max_tokens=MAX_TOKENS, batch_size=BATCH_SIZE,
        reservoir_per_layer=RESERVOIR_TOKENS_PER_LAYER,
        device=DEVICE
    )
    layer_to_series[task] = vals

# -----------------------------
# Normalize per-task (for clean overlay)
# -----------------------------
def znormalize(xs: List[float]) -> List[float]:
    m = sum(xs)/len(xs)
    v = sum((x-m)*(x-m) for x in xs)/max(1, len(xs)-1)
    s = math.sqrt(v + 1e-12)
    return [(x - m) / s for x in xs]

normed = {k: znormalize(v) for k, v in layer_to_series.items()}

# -----------------------------
# Plot: one line per task
# -----------------------------
plt.figure(figsize=(9, 5))
xs = list(range(1, N_LAYERS + 1))
for task in TASKS:
    plt.plot(xs, normed[task], label=task)  # (no explicit colors per house style)

plt.xlabel("Layer")
plt.ylabel("Prompt Entropy (z-score per task)")
plt.title("Layer-wise Prompt Entropy (matrix-based) — Llama-3.1-8B-Instruct")
plt.legend(loc="best")
plt.tight_layout()
plt.show()

# -----------------------------
# (Optional) Save raw numbers
# -----------------------------
out = {
    "model": MODEL_ID,
    "metric": "prompt_entropy (matrix-based)",
    "layers": N_LAYERS,
    "params": {
        "max_texts_per_task": MAX_TEXTS_PER_TASK,
        "max_tokens": MAX_TOKENS,
        "batch_size": BATCH_SIZE,
        "reservoir_tokens_per_layer": RESERVOIR_TOKENS_PER_LAYER,
        "seed": SEED,
    },
    "series": layer_to_series,
}
with open("layer_prompt_entropy_llama31_8b_instruct.json", "w") as f:
    json.dump(out, f, indent=2)
print("Saved raw metric to layer_prompt_entropy_llama31_8b_instruct.json")


AttributeError: partially initialized module 'torchvision' has no attribute 'extension' (most likely due to a circular import)