# Evaluate FF1 retrieval and PR/F1

Compute Accuracy@K and threshold PR/F1 for each FF1 layer.


In [None]:
from pathlib import Path
import os
import sys


def in_colab() -> bool:
    return "COLAB_GPU" in os.environ or "COLAB_RELEASE_TAG" in os.environ


if in_colab():
    from google.colab import drive

    drive.mount("/content/drive")
    REPO_ROOT = Path(os.environ.get("REPO_ROOT", "/content/localLatin"))
    CANON_ROOT = Path(os.environ.get("CANON_ROOT", "/content/drive/MyDrive/localLatin_data/canon"))
    RUNS_ROOT = Path(os.environ.get("RUNS_ROOT", "/content/drive/MyDrive/localLatin_runs/ff1_lata_postact"))
else:
    def find_repo_root(start: Path) -> Path:
        for candidate in [start, *start.parents]:
            if (candidate / "canon").exists() and (candidate / "src").exists():
                return candidate
        raise FileNotFoundError("Could not locate repo root containing canon/ and src/")

    REPO_ROOT = Path(os.environ.get("REPO_ROOT", "")) if os.environ.get("REPO_ROOT") else find_repo_root(Path.cwd())
    CANON_ROOT = Path(os.environ.get("CANON_ROOT", str(REPO_ROOT / "canon")))
    RUNS_ROOT = Path(os.environ.get("RUNS_ROOT", str(REPO_ROOT / "runs" / "ff1_lata_postact")))

sys.path.append(str(REPO_ROOT / "src"))

print(f"REPO_ROOT: {REPO_ROOT}")
print(f"CANON_ROOT: {CANON_ROOT}")
print(f"RUNS_ROOT: {RUNS_ROOT}")


In [None]:
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from canon_retrieval import (
    accuracy_at_k,
    l2_normalize,
    sanity_checks,
    similarity_matrix,
    sweep_thresholds,
    upper_triangle,
    upper_triangle_labels,
)

RUN_DIR = None  # set to a specific run folder, e.g. f"{RUNS_ROOT}/run_YYYYMMDD_HHMMSS"

if RUN_DIR is None:
    candidates = sorted(Path(RUNS_ROOT).glob("run_*/"))
    if not candidates:
        raise FileNotFoundError("No run_* folder found. Run extraction first.")
    RUN_DIR = str(candidates[-1])

meta = pd.read_csv(f"{RUN_DIR}/meta.csv")
folder_ids = meta["folder_id"].tolist()
is_winnable = meta["is_winnable"].tolist()

print(f"Using run: {RUN_DIR}")


In [None]:
layer_rows = []
threshold_rows = []

for layer_num in range(1, 13):
    emb_path = f"{RUN_DIR}/ff1_layer{layer_num}_embeddings_norm.npy"
    emb = np.load(emb_path)

    sim = similarity_matrix(emb)
    checks = sanity_checks(sim)

    acc1 = accuracy_at_k(sim, folder_ids, is_winnable, k=1)
    acc3 = accuracy_at_k(sim, folder_ids, is_winnable, k=3)
    acc5 = accuracy_at_k(sim, folder_ids, is_winnable, k=5)

    sim_upper = upper_triangle(sim)
    labels = upper_triangle_labels(folder_ids)
    thresholds = np.linspace(sim_upper.min(), sim_upper.max(), 400)
    curve = sweep_thresholds(sim_upper, labels, thresholds)
    curve["layer"] = layer_num
    curve.to_csv(f"{RUN_DIR}/threshold_curve_layer{layer_num}.csv", index=False)

    best_idx = curve["f1"].idxmax()
    best_row = curve.loc[best_idx]

    layer_rows.append(
        {
            "layer": layer_num,
            "acc@1": acc1,
            "acc@3": acc3,
            "acc@5": acc5,
            "best_threshold": float(best_row["threshold"]),
            "best_precision": float(best_row["precision"]),
            "best_recall": float(best_row["recall"]),
            "best_f1": float(best_row["f1"]),
            "diag_mean": checks["diag_mean"],
            "off_diag_mean": checks["off_diag_mean"],
        }
    )

summary = pd.DataFrame(layer_rows)
summary.to_csv(f"{RUN_DIR}/ff1_layer_summary.csv", index=False)
summary


In [None]:
plt.figure(figsize=(8, 4))
plt.plot(summary["layer"], summary["acc@1"], label="Acc@1")
plt.plot(summary["layer"], summary["acc@3"], label="Acc@3")
plt.plot(summary["layer"], summary["acc@5"], label="Acc@5")
plt.xlabel("Layer")
plt.ylabel("Accuracy@K")
plt.title("FF1 post-activation retrieval accuracy")
plt.legend()
plt.tight_layout()
plt.show()
