# Linear Probing: System vs User Instruction Following

Extracts residual stream activations from both **TransformerLens** and **nnterp**, then trains a linear probe at each layer to predict whether the model follows the system prompt or the user prompt (Condition C of the dataset). Running both backends lets you compare numerical consistency and ergonomics.

In [None]:
# Dependencies are managed via uv (pyproject.toml at repo root).
# On a new instance, run from the repo root:
#   ./lambda-sync.sh <name>.sync.env setup
# Then source your config before launching Jupyter:
#   source <name>.sync.env && uv run jupyter lab

## 2. Imports

In [1]:
import json
import gc
import os
import time
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm.auto import tqdm
import torch
from transformers import AutoTokenizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import plotly.graph_objects as go
from plotly.subplots import make_subplots

## 3. Configuration

In [2]:
import re

MODEL_NAME = "meta-llama/Llama-3.1-8B-Instruct"

# Locate repo root by walking up until pyproject.toml is found
def _find_repo_root():
    p = Path.cwd()
    for candidate in [p, *p.parents]:
        if (candidate / "pyproject.toml").exists():
            return candidate
    return p

REPO_ROOT = _find_repo_root()
DATA_DIR        = REPO_ROOT / "phase0_behavioral_analysis" / "data" / "results"
ACTIVATIONS_DIR = REPO_ROOT / "phase1_linear_probing" / "data" / "activations"
REPORTS_DIR     = REPO_ROOT / "phase1_linear_probing" / "reports"

ACTIVATIONS_DIR.mkdir(parents=True, exist_ok=True)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)

# Auto-load *.sync.env from repo root so the kernel picks up HF_TOKEN
# without needing to source it in a terminal first.
def _load_sync_env(repo_root):
    pattern = re.compile(r'^export\s+(\w+)=(.*)')
    for env_file in sorted(repo_root.glob("*.sync.env")):
        with open(env_file) as f:
            for line in f:
                m = pattern.match(line.strip())
                if m:
                    key, val = m.group(1), m.group(2).strip('"\'')
                    os.environ.setdefault(key, val)
        return env_file  # load first file found, stop
    return None

_env_file = _load_sync_env(REPO_ROOT)
if _env_file:
    print(f"Loaded env from: {_env_file.name}")

LABEL_MODE = "binary"  # "binary" (followed_system vs followed_user) or "one_vs_rest" (followed_system vs all)

TOKEN_POSITIONS = ["last_prompt", "last_system", "last_user", "mean_all", "mean_system", "mean_user"]

N_CV_FOLDS = 5

MAX_SAMPLES = None  # set to an int to cap samples (for fast debugging)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

HF_TOKEN = os.environ.get("HF_TOKEN", "")
if not HF_TOKEN:
    print("WARNING: HF_TOKEN not set. Run: source <name>.sync.env")
else:
    os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN

print(f"Device       : {DEVICE}")
if DEVICE == "cuda":
    print(f"GPU          : {torch.cuda.get_device_name(0)}")
    print(f"VRAM         : {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
print(f"REPO_ROOT    : {REPO_ROOT}")
print(f"DATA_DIR     : {DATA_DIR}")
print(f"ACTIVATIONS  : {ACTIVATIONS_DIR}")
print(f"REPORTS      : {REPORTS_DIR}")

Loaded env from: enrique.sync.env
Device       : cuda
GPU          : NVIDIA A100-SXM4-40GB
VRAM         : 42.4 GB
REPO_ROOT    : /home/ubuntu/system-user-circuits
DATA_DIR     : /home/ubuntu/system-user-circuits/phase0_behavioral_analysis/data/results
ACTIVATIONS  : /home/ubuntu/system-user-circuits/phase1_linear_probing/data/activations
REPORTS      : /home/ubuntu/system-user-circuits/phase1_linear_probing/reports


## 4. Data Loading

In [3]:
def load_results(data_dir, model_name):
    safe_name = model_name.replace("/", "_")
    path = Path(data_dir) / f"{safe_name}_results.jsonl"
    records = []
    with open(path) as f:
        for line in f:
            line = line.strip()
            if line:
                records.append(json.loads(line))
    return pd.DataFrame(records)


df_all = load_results(DATA_DIR, MODEL_NAME)
df = df_all[df_all["condition"] == "C"].copy()

if LABEL_MODE == "binary":
    df = df[df["label"].isin(["followed_system", "followed_user"])].copy()

df["y"] = (df["label"] == "followed_system").astype(int)

if MAX_SAMPLES is not None:
    df = df.sample(n=min(MAX_SAMPLES, len(df)), random_state=42)

df = df.reset_index(drop=True)

print(f"Condition C samples : {len(df)}")
print(f"followed_system     : {df['y'].sum()} ({df['y'].mean():.1%})")
print(f"other               : {(df['y'] == 0).sum()} ({(1-df['y'].mean()):.1%})")
print(f"\nLabel breakdown:\n{df['label'].value_counts()}")

Condition C samples : 205
followed_system     : 49 (23.9%)
other               : 156 (76.1%)

Label breakdown:
label
followed_user      156
followed_system     49
Name: count, dtype: int64


## 5. Tokenizer & Prompt Utilities

In [4]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)


def build_formatted_prompt(system_text, user_text):
    messages = [
        {"role": "system", "content": system_text},
        {"role": "user", "content": user_text},
    ]
    return tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )


def _encode_len(text):
    return len(tokenizer.encode(text, add_special_tokens=False))


def find_token_positions(system_text, user_text):
    messages = [
        {"role": "system", "content": system_text},
        {"role": "user", "content": user_text},
    ]
    full_str = tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    sys_str = tokenizer.apply_chat_template(
        [{"role": "system", "content": system_text}],
        tokenize=False,
        add_generation_prompt=False,
    )
    sys_user_str = tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=False
    )

    n_full = _encode_len(full_str)
    n_sys = min(_encode_len(sys_str), n_full - 1)
    n_sys_user = min(_encode_len(sys_user_str), n_full - 1)

    n_sys = max(n_sys, 1)
    n_sys_user = max(n_sys_user, n_sys + 1)

    return {
        "last_prompt": n_full - 1,
        "last_system": n_sys - 1,
        "last_user": n_sys_user - 1,
        "mean_all": (0, n_full),
        "mean_system": (0, n_sys),
        "mean_user": (n_sys, n_sys_user),
    }

## 6. Precompute Prompts & Token Positions

In [5]:
formatted_prompts = []
position_maps = []
input_ids_list = []

for _, row in tqdm(df.iterrows(), total=len(df), desc="Preparing"):
    fp = build_formatted_prompt(row["system_prompt"], row["user_prompt"])
    pm = find_token_positions(row["system_prompt"], row["user_prompt"])
    ids = tokenizer(fp, return_tensors="pt", add_special_tokens=False).input_ids
    formatted_prompts.append(fp)
    position_maps.append(pm)
    input_ids_list.append(ids)

print(f"Prepared {len(formatted_prompts)} prompts")
print(f"Token count range: {min(t.shape[1] for t in input_ids_list)} – {max(t.shape[1] for t in input_ids_list)}")
print(f"Example positions : {position_maps[0]}")

Preparing:   0%|          | 0/205 [00:00<?, ?it/s]

Prepared 205 prompts
Token count range: 59 – 109
Example positions : {'last_prompt': 58, 'last_system': 37, 'last_user': 54, 'mean_all': (0, 59), 'mean_system': (0, 38), 'mean_user': (38, 55)}


## 7. Extraction Helpers

Shared utilities used by both backends.

In [6]:
def slice_activation(act_tensor, pos_val):
    if isinstance(pos_val, int):
        return act_tensor[0, pos_val, :].float().cpu().numpy()
    start, end = pos_val
    return act_tensor[0, start:end, :].float().mean(dim=0).cpu().numpy()


def unwrap_saved(proxy):
    return proxy.value if hasattr(proxy, "value") else proxy


def build_activation_array(buffers, n_samples, n_layers):
    return {
        pos: np.array(
            [[buffers[pos][layer][i] for layer in range(n_layers)] for i in range(n_samples)]
        )
        for pos in TOKEN_POSITIONS
    }


def save_activations(activations, path):
    np.savez_compressed(path, **activations)
    print(f"Saved: {path}")


def load_activations(path):
    loaded = np.load(path)
    return {k: loaded[k] for k in loaded.files}

## 8. TransformerLens: Load → Extract → Save → Cleanup

In [7]:
from transformer_lens import HookedTransformer

model_tl = HookedTransformer.from_pretrained(
    MODEL_NAME,
    dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
    device=DEVICE,
)
model_tl.eval()

N_LAYERS = model_tl.cfg.n_layers
D_MODEL = model_tl.cfg.d_model
print(f"Loaded via TL  |  layers={N_LAYERS}  d_model={D_MODEL}")

`torch_dtype` is deprecated! Use `dtype` instead!


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]



Loaded pretrained model meta-llama/Llama-3.1-8B-Instruct into HookedTransformer
Loaded via TL  |  layers=32  d_model=4096


In [8]:
buffers_tl = {pos: [[] for _ in range(N_LAYERS)] for pos in TOKEN_POSITIONS}

t0 = time.time()
for ids, pm in tqdm(zip(input_ids_list, position_maps), total=len(input_ids_list), desc="TL extract"):
    ids_gpu = ids.to(DEVICE)
    with torch.no_grad():
        _, cache = model_tl.run_with_cache(
            ids_gpu,
            prepend_bos=False,
            names_filter=lambda name: name.endswith("resid_post"),
        )
    for layer in range(N_LAYERS):
        act = cache["resid_post", layer]
        for pos_name in TOKEN_POSITIONS:
            buffers_tl[pos_name][layer].append(slice_activation(act, pm[pos_name]))
    del cache
    if DEVICE == "cuda":
        torch.cuda.empty_cache()

tl_time = time.time() - t0
print(f"TL extraction: {tl_time:.1f}s  ({tl_time/len(input_ids_list):.2f}s/sample)")

activations_tl = build_activation_array(buffers_tl, len(input_ids_list), N_LAYERS)
del buffers_tl

safe = MODEL_NAME.replace("/", "_")
save_activations(activations_tl, ACTIVATIONS_DIR / f"act_tl_{safe}.npz")

print("Shapes:")
for pos, arr in activations_tl.items():
    print(f"  {pos}: {arr.shape}")

TL extract:   0%|          | 0/205 [00:00<?, ?it/s]

TL extraction: 15.6s  (0.08s/sample)
Saved: /home/ubuntu/system-user-circuits/phase1_linear_probing/data/activations/act_tl_meta-llama_Llama-3.1-8B-Instruct.npz
Shapes:
  last_prompt: (205, 32, 4096)
  last_system: (205, 32, 4096)
  last_user: (205, 32, 4096)
  mean_all: (205, 32, 4096)
  mean_system: (205, 32, 4096)
  mean_user: (205, 32, 4096)


In [9]:
del model_tl
gc.collect()
if DEVICE == "cuda":
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    print(f"VRAM freed. Current usage: {torch.cuda.memory_allocated()/1e9:.2f} GB")

VRAM freed. Current usage: 0.04 GB


## 9. nnsight: Load → Extract → Save → Cleanup

Uses `nnsight.LanguageModel` with the explicit `tracer.invoke()` pattern.
See `NNSIGHT_NOTES.md` for API details.

In [21]:
from nnsight import LanguageModel

# nnsight 0.5.x API notes (verified with probe_syntax_test.py):
#
# 1. model.trace(input_ids=ids) FAILS — kwargs-only means no Invoker is created,
#    so the body code runs outside the forward pass (interleaving=False).
#    Use either: model.trace(string_or_tensor)  (positional arg)
#            or: model.trace() + tracer.invoke(input_ids=ids)  (explicit invoker)
#
# 2. .save() returns torch.Tensor directly — no .value wrapper.
#    Tensors may have requires_grad=True, so use .detach() before .numpy().
#
# 3. With a single invoke, the batch dim is squeezed:
#    output[0] shape is (seq_len, d_model), NOT (1, seq_len, d_model).
#
# 4. nnterp.StandardizedTransformer fails to load Llama on nnsight 0.5.x
#    (FakeTensor incompatibility in check_model_renaming scan).
#    Use nnsight.LanguageModel directly instead.

model_nn = LanguageModel(
    MODEL_NAME,
    device_map="auto",
    dispatch=True,
    torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
)

N_LAYERS_NN = len(list(model_nn.model.layers))
assert N_LAYERS_NN == N_LAYERS, f"Layer count mismatch: TL={N_LAYERS}, nnsight={N_LAYERS_NN}"
print(f"Loaded via nnsight LanguageModel  |  layers={N_LAYERS_NN}")

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Loaded via nnsight LanguageModel  |  layers=32


In [22]:
buffers_nn = {pos: [[] for _ in range(N_LAYERS_NN)] for pos in TOKEN_POSITIONS}

t0 = time.time()
for ids, pm in tqdm(zip(input_ids_list, position_maps), total=len(input_ids_list), desc="nnsight extract"):
    ids_gpu = ids.to(DEVICE)

    saved = [None] * N_LAYERS_NN
    with model_nn.trace() as tracer:
        with tracer.invoke(input_ids=ids_gpu):
            for layer in range(N_LAYERS_NN):
                saved[layer] = model_nn.model.layers[layer].output[0].save()

    for layer in range(N_LAYERS_NN):
        hs = saved[layer]  # (seq_len, d_model) — batch dim squeezed
        if hs.dim() == 3:
            hs = hs[0]
        for pos_name in TOKEN_POSITIONS:
            pos_val = pm[pos_name]
            if isinstance(pos_val, int):
                vec = hs[pos_val, :].detach().float().cpu().numpy()
            else:
                start, end = pos_val
                vec = hs[start:end, :].detach().float().mean(0).cpu().numpy()
            buffers_nn[pos_name][layer].append(vec)

    if DEVICE == "cuda":
        torch.cuda.empty_cache()

nn_time = time.time() - t0
print(f"nnsight extraction: {nn_time:.1f}s  ({nn_time/len(input_ids_list):.2f}s/sample)")

activations_nn = build_activation_array(buffers_nn, len(input_ids_list), N_LAYERS)
del buffers_nn

safe = MODEL_NAME.replace("/", "_")
save_activations(activations_nn, ACTIVATIONS_DIR / f"act_nn_{safe}.npz")

if "tl_time" in globals():
    print(f"\nSpeed comparison:")
    print(f"  TransformerLens : {tl_time:.1f}s  ({tl_time/len(input_ids_list):.2f}s/sample)")
    print(f"  nnsight         : {nn_time:.1f}s  ({nn_time/len(input_ids_list):.2f}s/sample)")
    print(f"  ratio (TL/nnsight): {tl_time/nn_time:.2f}x")

nnsight extract:   0%|          | 0/205 [00:00<?, ?it/s]

nnsight extraction: 15.0s  (0.07s/sample)
Saved: /home/ubuntu/system-user-circuits/phase1_linear_probing/data/activations/act_nn_meta-llama_Llama-3.1-8B-Instruct.npz

Speed comparison:
  TransformerLens : 15.6s  (0.08s/sample)
  nnsight         : 15.0s  (0.07s/sample)
  ratio (TL/nnsight): 1.04x


In [None]:
from accelerate.hooks import remove_hook_from_submodules
remove_hook_from_submodules(model_nn._model)
del model_nn
gc.collect()
if DEVICE == "cuda":
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    print(f"VRAM after cleanup: {torch.cuda.memory_allocated()/1e9:.2f} GB")

## 10. Load Saved Activations

Run this cell if restarting after extraction to skip re-running the models.

In [None]:
safe = MODEL_NAME.replace("/", "_")
activations_tl = load_activations(ACTIVATIONS_DIR / f"act_tl_{safe}.npz")
activations_nn = load_activations(ACTIVATIONS_DIR / f"act_nn_{safe}.npz")
N_LAYERS = activations_tl[TOKEN_POSITIONS[0]].shape[1]
print(f"N_LAYERS={N_LAYERS}")
for pos in TOKEN_POSITIONS:
    print(f"  TL  {pos}: {activations_tl[pos].shape}")
    print(f"  NN  {pos}: {activations_nn[pos].shape}")

## 11. Numerical Comparison: TL vs nnterp

Checks whether both backends produce numerically consistent activations. If TL reimplements the model differently from HuggingFace, cosine similarity will be < 1.

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

print(f"{'position':<16} {'layer 0':>10} {'layer mid':>10} {'layer last':>10} {'mean all':>10}")
print("-" * 60)

mid = N_LAYERS // 2
for pos in TOKEN_POSITIONS:
    tl = activations_tl[pos]  # (n_samples, n_layers, d_model)
    nn = activations_nn[pos]

    def mean_cos(layer):
        a = tl[:, layer, :].astype(np.float32)
        b = nn[:, layer, :].astype(np.float32)
        sims = np.array([cosine_similarity(a[[i]], b[[i]])[0, 0] for i in range(len(a))])
        return sims.mean()

    c0 = mean_cos(0)
    cm = mean_cos(mid)
    cl = mean_cos(N_LAYERS - 1)
    ca = np.mean([mean_cos(l) for l in range(N_LAYERS)])
    print(f"{pos:<16} {c0:>10.4f} {cm:>10.4f} {cl:>10.4f} {ca:>10.4f}")

print("\n1.0 = identical, <1.0 = divergence between TL reimplementation and HF")

position            layer 0  layer mid layer last   mean all
------------------------------------------------------------
last_prompt          1.0000     1.0000     1.0000     1.0000
last_system          1.0000     1.0000     1.0000     1.0000
last_user            1.0000     1.0000     1.0000     1.0000
mean_all             1.0000     1.0000     1.0000     1.0000
mean_system          1.0000     1.0000     1.0000     1.0000
mean_user            1.0000     1.0000     1.0000     1.0000

1.0 = identical, <1.0 = divergence between TL reimplementation and HF


## 12. Linear Probing

In [None]:
y = df["y"].values


def probe_all_positions(activations, n_folds=N_CV_FOLDS):
    n_layers = activations[TOKEN_POSITIONS[0]].shape[1]
    cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
    results = {}
    for pos_name in tqdm(TOKEN_POSITIONS, desc="Probing"):
        X = activations[pos_name]
        rows = []
        for layer in range(n_layers):
            X_layer = X[:, layer, :]
            pipe = Pipeline([
                ("scaler", StandardScaler()),
                ("clf", LogisticRegression(max_iter=1000, C=1.0, solver="lbfgs")),
            ])
            scores = cross_val_score(pipe, X_layer, y, cv=cv, scoring="balanced_accuracy")
            rows.append({"layer": layer, "mean": scores.mean(), "std": scores.std()})
        results[pos_name] = pd.DataFrame(rows)
    return results

In [None]:
print("Probing TransformerLens activations...")
probe_results_tl = probe_all_positions(activations_tl)

print("\nProbing nnterp activations...")
probe_results_nn = probe_all_positions(activations_nn)

print("\nPeak balanced accuracy — TransformerLens:")
for pos, df_res in probe_results_tl.items():
    best = df_res.loc[df_res["mean"].idxmax()]
    print(f"  {pos:<16} {best['mean']:.3f} ± {best['std']:.3f}  @ layer {int(best['layer'])}")

print("\nPeak balanced accuracy — nnterp:")
for pos, df_res in probe_results_nn.items():
    best = df_res.loc[df_res["mean"].idxmax()]
    print(f"  {pos:<16} {best['mean']:.3f} ± {best['std']:.3f}  @ layer {int(best['layer'])}")

Probing TransformerLens activations...


Probing:   0%|          | 0/6 [00:00<?, ?it/s]


Probing nnterp activations...


Probing:   0%|          | 0/6 [00:00<?, ?it/s]


Peak balanced accuracy — TransformerLens:
  last_prompt      0.954 ± 0.039  @ layer 24
  last_system      0.639 ± 0.078  @ layer 31
  last_user        0.953 ± 0.038  @ layer 18
  mean_all         0.964 ± 0.037  @ layer 16
  mean_system      0.648 ± 0.085  @ layer 23
  mean_user        0.961 ± 0.037  @ layer 20

Peak balanced accuracy — nnterp:
  last_prompt      0.954 ± 0.039  @ layer 24
  last_system      0.611 ± 0.044  @ layer 11
  last_user        0.953 ± 0.038  @ layer 18
  mean_all         0.964 ± 0.037  @ layer 15
  mean_system      0.611 ± 0.044  @ layer 6
  mean_user        0.961 ± 0.037  @ layer 16


## 12b. Control Probe (Permuted Labels)

Trains the identical probe on **shuffled labels** to establish a chance-level baseline.  
If the real probe accuracy ≫ control accuracy, the residual stream genuinely encodes the system-vs-user decision.  
Control is run on TL activations only (TL ≈ nnterp numerically, so one is sufficient).

In [None]:
N_PERMUTATIONS = 10   # number of shuffles to average over

rng = np.random.default_rng(0)

def probe_control(activations, n_permutations=N_PERMUTATIONS, n_folds=N_CV_FOLDS):
    """Same probe, but labels are randomly permuted each time. Mean over permutations."""
    n_layers = activations[TOKEN_POSITIONS[0]].shape[1]
    cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
    results = {}
    for pos_name in tqdm(TOKEN_POSITIONS, desc="Control probing"):
        X = activations[pos_name]
        layer_scores = np.zeros((n_permutations, n_layers))
        for p in range(n_permutations):
            y_perm = rng.permutation(y)
            for layer in range(n_layers):
                X_layer = X[:, layer, :]
                pipe = Pipeline([
                    ("scaler", StandardScaler()),
                    ("clf", LogisticRegression(max_iter=1000, C=1.0, solver="lbfgs")),
                ])
                scores = cross_val_score(pipe, X_layer, y_perm, cv=cv,
                                         scoring="balanced_accuracy")
                layer_scores[p, layer] = scores.mean()
        rows = [
            {"layer": l,
             "mean": layer_scores[:, l].mean(),
             "std":  layer_scores[:, l].std()}
            for l in range(n_layers)
        ]
        results[pos_name] = pd.DataFrame(rows)
    return results


print("Running control probe (shuffled labels) on TL activations...")
probe_results_ctrl = probe_control(activations_tl)

print("\nControl probe peak balanced accuracy (should be ≈ 0.50):")
for pos, df_res in probe_results_ctrl.items():
    best = df_res.loc[df_res["mean"].idxmax()]
    real_best = probe_results_tl[pos].loc[probe_results_tl[pos]["mean"].idxmax()]
    gap = real_best["mean"] - best["mean"]
    print(f"  {pos:<16} ctrl={best['mean']:.3f}  real={real_best['mean']:.3f}  gap=+{gap:.3f}")

## 12b. Control Probe (Metadata-Only)

A **metadata-only classifier** trained on surface features of the prompt — sequence length, system-segment length, user-segment length — **without looking at any activations**.

If this control reaches similar accuracy as the representation probe, the probe's signal could be explained by spurious correlations in input length rather than the model's internal representations.  
If the control stays near chance (≈ 0.50) while the real probe is high, the representations genuinely encode the system-vs-user decision.

In [None]:
ALL_CONSTRAINT_TYPES = ["language", "format", "starting_word"]
ALL_SYS_STRENGTHS    = ["weak", "medium", "strong"]
ALL_USR_STYLES       = ["with_instruction", "polite", "jailbreak"]

def one_hot(val, categories):
    return [int(val == c) for c in categories]

# Metadata features — no activations used:
#   4 length features + 3 constraint_type + 3 sys strength + 3 usr style + 1 direction = 14 dims
X_meta = np.array([
    [
        pm["last_prompt"] + 1,                            # total tokens
        pm["mean_system"][1],                             # system segment length
        pm["mean_user"][1] - pm["mean_user"][0],          # user segment length
        pm["mean_user"][0],                               # user segment start position
        *one_hot(row["constraint_type"], ALL_CONSTRAINT_TYPES),
        *one_hot(row["strength"],        ALL_SYS_STRENGTHS),
        *one_hot(row["user_style"],      ALL_USR_STYLES),
        int(row["direction"] == "b_to_a"),
    ]
    for pm, (_, row) in zip(position_maps, df.iterrows())
], dtype=np.float32)

print(f"X_meta shape: {X_meta.shape}")  # expect (205, 14)

cv = StratifiedKFold(n_splits=N_CV_FOLDS, shuffle=True, random_state=42)
pipe_meta = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(max_iter=1000, C=1.0, solver="lbfgs")),
])
meta_scores = cross_val_score(pipe_meta, X_meta, y, cv=cv, scoring="balanced_accuracy")

ctrl_meta_acc = meta_scores.mean()
ctrl_meta_std = meta_scores.std()

print(f"Metadata-only control  : {ctrl_meta_acc:.3f} ± {ctrl_meta_std:.3f}")
print(f"Majority-class baseline: 0.500 (by definition of balanced_accuracy)")
print()
print("Real probe peak (TL):")
for pos, df_res in probe_results_tl.items():
    best = df_res.loc[df_res["mean"].idxmax()]
    gap = best["mean"] - ctrl_meta_acc
    print(f"  {pos:<16} {best['mean']:.3f}  gap vs metadata ctrl: +{gap:.3f}")

X_meta shape: (205, 14)
Metadata-only control  : 0.883 ± 0.036
Majority-class baseline: 0.500 (by definition of balanced_accuracy)

Real probe peak (TL):
  last_prompt      0.954  gap vs metadata ctrl: +0.071
  last_system      0.639  gap vs metadata ctrl: +-0.244
  last_user        0.953  gap vs metadata ctrl: +0.070
  mean_all         0.964  gap vs metadata ctrl: +0.081
  mean_system      0.648  gap vs metadata ctrl: +-0.235
  mean_user        0.961  gap vs metadata ctrl: +0.078


## 13. Visualization

Solid lines = TransformerLens, dashed lines = nnterp. Color = token position.

In [None]:
COLORS = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd", "#8c564b"]

fig = go.Figure()

for i, pos_name in enumerate(TOKEN_POSITIONS):
    color = COLORS[i % len(COLORS)]

    for backend_label, probe_results, dash in [
        ("TL",     probe_results_tl, "solid"),
        ("nnterp", probe_results_nn, "dash"),
    ]:
        df_res = probe_results[pos_name]
        layers = df_res["layer"].values
        means  = df_res["mean"].values
        stds   = df_res["std"].values

        show_legend = backend_label == "TL"
        fig.add_trace(go.Scatter(
            x=layers, y=means,
            mode="lines",
            name=pos_name,
            legendgroup=pos_name,
            showlegend=show_legend,
            line=dict(color=color, width=2, dash=dash),
        ))

        if backend_label == "TL":
            fig.add_trace(go.Scatter(
                x=np.concatenate([layers, layers[::-1]]),
                y=np.concatenate([means + stds, (means - stds)[::-1]]),
                fill="toself", fillcolor=color, opacity=0.10,
                line=dict(color="rgba(0,0,0,0)"),
                legendgroup=pos_name, showlegend=False, hoverinfo="skip",
            ))

# Metadata control: single flat horizontal band (same value for all layers/positions)
fig.add_hline(
    y=ctrl_meta_acc,
    line_dash="dashdot", line_color="black", line_width=1.5,
    annotation_text=f"metadata-only ctrl ({ctrl_meta_acc:.2f})",
    annotation_position="top left",
)

# Chance line
fig.add_hline(
    y=0.5, line_dash="dot", line_color="gray",
    annotation_text="chance (0.50)", annotation_position="bottom right",
)

label_str = "followed_user" if LABEL_MODE == "binary" else "other"
fig.update_layout(
    title=(
        f"Linear Probe: followed_system vs {label_str}<br>"
        f"<sub>{MODEL_NAME} | solid=TL, dashed=nnterp | dash-dot=metadata-only control | {N_CV_FOLDS}-fold CV</sub>"
    ),
    xaxis_title="Layer",
    yaxis_title="Balanced Accuracy",
    yaxis=dict(range=[0.4, 1.02]),
    legend_title="Token Position",
    width=1100,
    height=580,
    template="plotly_white",
    hovermode="x unified",
)

fig.show()

safe = MODEL_NAME.replace("/", "_")
out = REPORTS_DIR / f"probe_{safe}_{LABEL_MODE}_comparison.html"
fig.write_html(str(out))
print(f"Saved: {out}")

## 14. Summary Table

In [None]:
rows = []
for pos in TOKEN_POSITIONS:
    for backend, probe_results in [("TL", probe_results_tl), ("nnterp", probe_results_nn)]:
        df_res = probe_results[pos]
        best = df_res.loc[df_res["mean"].idxmax()]
        rows.append({
            "backend": backend,
            "token_position": pos,
            "peak_balanced_acc": round(best["mean"], 4),
            "std": round(best["std"], 4),
            "peak_layer": int(best["layer"]),
            "peak_layer_%": f"{int(best['layer']) / N_LAYERS * 100:.0f}%",
            "metadata_ctrl": round(ctrl_meta_acc, 4),
            "gap_vs_ctrl": round(best["mean"] - ctrl_meta_acc, 4),
        })

summary = (
    pd.DataFrame(rows)
    .sort_values(["token_position", "backend"])
    .reset_index(drop=True)
)
print(summary.to_string(index=False))

backend token_position  peak_balanced_acc    std  peak_layer peak_layer_%  metadata_ctrl  gap_vs_ctrl
     TL    last_prompt             0.9540 0.0386          24          75%         0.8831       0.0709
 nnterp    last_prompt             0.9540 0.0386          24          75%         0.8831       0.0709
     TL    last_system             0.6394 0.0778          31          97%         0.8831      -0.2436
 nnterp    last_system             0.6108 0.0440          11          34%         0.8831      -0.2723
     TL      last_user             0.9530 0.0381          18          56%         0.8831       0.0699
 nnterp      last_user             0.9530 0.0381          18          56%         0.8831       0.0699
     TL       mean_all             0.9642 0.0369          16          50%         0.8831       0.0811
 nnterp       mean_all             0.9642 0.0369          15          47%         0.8831       0.0811
     TL    mean_system             0.6483 0.0845          23          72%         

In [None]:
import plotly.graph_objects as go

fig2 = go.Figure()

for backend_label, probe_results, dash, width in [
    ("TL",     probe_results_tl, "solid", 2.5),
    ("nnterp", probe_results_nn, "dash",  2.0),
]:
    df_res = probe_results["last_prompt"]
    layers = df_res["layer"].values
    means  = df_res["mean"].values
    stds   = df_res["std"].values

    fig2.add_trace(go.Scatter(
        x=layers, y=means,
        mode="lines",
        name=f"last_prompt ({backend_label})",
        line=dict(color="#1f77b4" if backend_label == "TL" else "#4a9fd4",
                  width=width, dash=dash),
    ))

    # CI band for TL only
    if backend_label == "TL":
        fig2.add_trace(go.Scatter(
            x=np.concatenate([layers, layers[::-1]]),
            y=np.concatenate([means + stds, (means - stds)[::-1]]),
            fill="toself", fillcolor="#1f77b4", opacity=0.12,
            line=dict(color="rgba(0,0,0,0)"),
            showlegend=False, hoverinfo="skip",
        ))

# Metadata control line
fig2.add_hline(
    y=ctrl_meta_acc,
    line_dash="dashdot", line_color="crimson", line_width=2,
    annotation_text=f"metadata-only ctrl ({ctrl_meta_acc:.3f})",
    annotation_position="top right",
    annotation_font_color="crimson",
)

# Chance line
fig2.add_hline(
    y=0.5, line_dash="dot", line_color="gray", line_width=1,
    annotation_text="chance (0.50)", annotation_position="bottom right",
)

fig2.update_layout(
    title=(
        f"Linear Probe — last_prompt token<br>"
        f"<sub>{MODEL_NAME} | {N_CV_FOLDS}-fold CV balanced accuracy</sub>"
    ),
    xaxis_title="Layer",
    yaxis_title="Balanced Accuracy",
    yaxis=dict(range=[0.4, 1.02]),
    legend=dict(x=0.02, y=0.05),
    width=800, height=480,
    template="plotly_white",
)

fig2.show()

safe = MODEL_NAME.replace("/", "_")
csv_path = REPORTS_DIR / f"probe_{safe}_summary.csv"
summary.to_csv(str(csv_path), index=False)
print(f"Saved: {csv_path}")

In [None]:
df.groupby(["constraint_type"])["y"].mean()

Unnamed: 0_level_0,y
constraint_type,Unnamed: 1_level_1
format,0.423077
language,0.12963
starting_word,0.288889


## Use "format" only

In [None]:
df_fmt = df[df["constraint_type"] == "format"].copy().reset_index(drop=True)
y_fmt = df_fmt["y"].values

print(f"Format-only samples: {len(df_fmt)}")
print(f"  followed_system: {y_fmt.sum()} ({y_fmt.mean():.1%})")
print(f"  followed_user:   {(y_fmt == 0).sum()} ({1 - y_fmt.mean():.1%})")

fmt_idx = df[df["constraint_type"] == "format"].index.values

activations_fmt = {}
for pos in TOKEN_POSITIONS:
    activations_fmt[pos] = activations_tl[pos][fmt_idx]

print(f"\nActivation shape check: {activations_fmt['last_prompt'].shape}")

Format-only samples: 52
  followed_system: 22 (42.3%)
  followed_user:   30 (57.7%)

Activation shape check: (52, 32, 4096)


In [None]:
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

n_layers = activations_fmt["last_prompt"].shape[1]

n_folds_fmt = 3
cv_fmt = StratifiedKFold(n_splits=n_folds_fmt, shuffle=True, random_state=42)

pos_name = "last_prompt"
X_pos = activations_fmt[pos_name]
rows_fmt = []
for layer in range(n_layers):
    X_layer = X_pos[:, layer, :]
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("clf", LogisticRegression(max_iter=500, C=1.0, solver="lbfgs")),
    ])
    scores = cross_val_score(pipe, X_layer, y_fmt, cv=cv_fmt, scoring="balanced_accuracy")
    rows_fmt.append({"layer": layer, "mean": scores.mean(), "std": scores.std()})

probe_fmt = pd.DataFrame(rows_fmt)
best = probe_fmt.loc[probe_fmt["mean"].idxmax()]
print(f"Format-only probe (last_prompt):")
print(f"  Peak: {best['mean']:.3f} ± {best['std']:.3f} @ layer {int(best['layer'])}")

Format-only probe (last_prompt):
  Peak: 0.879 ± 0.090 @ layer 22


In [None]:
ALL_SYS_STRENGTHS = sorted(df_fmt["strength"].unique())
ALL_USR_STYLES    = sorted(df_fmt["user_style"].unique())

def one_hot(val, categories):
    return [int(val == c) for c in categories]

# Rebuild position maps for format-only samples
position_maps_fmt = [position_maps[i] for i in fmt_idx]

X_meta_fmt = np.array([
    [
        pm["last_prompt"] + 1,
        pm["mean_system"][1],
        pm["mean_user"][1] - pm["mean_user"][0],
        pm["mean_user"][0],
        *one_hot(row["strength"],   ALL_SYS_STRENGTHS),
        *one_hot(row["user_style"], ALL_USR_STYLES),
        int(row["direction"] == "b_to_a"),
    ]
    for pm, (_, row) in zip(position_maps_fmt, df_fmt.iterrows())
], dtype=np.float32)

pipe_meta_fmt = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(max_iter=1000, C=1.0, solver="lbfgs")),
])
meta_scores_fmt = cross_val_score(pipe_meta_fmt, X_meta_fmt, y_fmt,
                                   cv=cv_fmt, scoring="balanced_accuracy")

ctrl_fmt = meta_scores_fmt.mean()
ctrl_fmt_std = meta_scores_fmt.std()

gap = best["mean"] - ctrl_fmt

print(f"Format-only metadata control: {ctrl_fmt:.3f} ± {ctrl_fmt_std:.3f}")
print(f"Format-only probe peak:       {best['mean']:.3f} ± {best['std']:.3f}")
print(f"Gap:                           +{gap:.3f}")

Format-only metadata control: 0.895 ± 0.088
Format-only probe peak:       0.879 ± 0.090
Gap:                           +-0.017


In [None]:
N_PERMUTATIONS = 10
rng = np.random.default_rng(0)

perm_accs = []
X_best_layer = activations_fmt[pos_name][:, int(best["layer"]), :]
for _ in range(N_PERMUTATIONS):
    y_shuf = rng.permutation(y_fmt)
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("clf", LogisticRegression(max_iter=500, C=1.0, solver="lbfgs")),
    ])
    scores = cross_val_score(pipe, X_best_layer, y_shuf, cv=cv_fmt, scoring="balanced_accuracy")
    perm_accs.append(scores.mean())

perm_mean = np.mean(perm_accs)
print(f"Permuted-label baseline (layer {int(best['layer'])}): {perm_mean:.3f}")
print(f"Real probe at same layer:                       {best['mean']:.3f}")
print(f"Gap vs permuted:                                +{best['mean'] - perm_mean:.3f}")

Permuted-label baseline (layer 22): 0.535
Real probe at same layer:                       0.879
Gap vs permuted:                                +0.344


In [None]:
import plotly.graph_objects as go

fig = go.Figure()

layers = probe_fmt["layer"].values
means  = probe_fmt["mean"].values
stds   = probe_fmt["std"].values

# Probe line
fig.add_trace(go.Scatter(
    x=layers, y=means, mode="lines",
    name="last_prompt (TL)",
    line=dict(color="#1f77b4", width=2.5),
))

# CI band
fig.add_trace(go.Scatter(
    x=np.concatenate([layers, layers[::-1]]),
    y=np.concatenate([means + stds, (means - stds)[::-1]]),
    fill="toself", fillcolor="#1f77b4", opacity=0.12,
    line=dict(color="rgba(0,0,0,0)"),
    showlegend=False, hoverinfo="skip",
))

# Metadata control
fig.add_hline(
    y=ctrl_fmt, line_dash="dashdot", line_color="crimson", line_width=2,
    annotation_text=f"metadata ctrl ({ctrl_fmt:.3f})",
    annotation_position="top right",
    annotation_font_color="crimson",
)

# Permuted baseline
fig.add_hline(
    y=perm_mean, line_dash="dot", line_color="gray", line_width=1.5,
    annotation_text=f"permuted labels ({perm_mean:.3f})",
    annotation_position="bottom right",
)

# Chance
fig.add_hline(
    y=0.5, line_dash="dot", line_color="lightgray", line_width=1,
    annotation_text="chance (0.50)", annotation_position="bottom right",
)

fig.update_layout(
    title=(
        f"Linear Probe — FORMAT constraint only (last_prompt)<br>"
        f"<sub>{MODEL_NAME} | {n_folds_fmt}-fold CV | n={len(df_fmt)} samples</sub>"
    ),
    xaxis_title="Layer",
    yaxis_title="Balanced Accuracy",
    yaxis=dict(range=[0.3, 1.05]),
    width=900, height=500,
    template="plotly_white",
)

fig.show()

safe = MODEL_NAME.replace("/", "_")
out_path = REPORTS_DIR / f"probe_format_only_{safe}.html"
fig.write_html(str(out_path))
print(f"Saved: {out_path}")

In [None]:
print("=" * 60)
print("FORMAT-ONLY")
print("=" * 60)
print(f"Samples:              {len(df_fmt)}")
print(f"SCR (system win rate): {y_fmt.mean():.1%}")
print(f"")
print(f"Probe peak:            {best['mean']:.3f} ± {best['std']:.3f} @ layer {int(best['layer'])}")
print(f"Metadata control:      {ctrl_fmt:.3f} ± {ctrl_fmt_std:.3f}")
print(f"Permuted control:      {perm_mean:.3f}")
print(f"")
print(f"Gap (probe - metadata): +{best['mean'] - ctrl_fmt:.3f}")
print(f"Gap (probe - permuted): +{best['mean'] - perm_mean:.3f}")
print(f"")
if best["mean"] - ctrl_fmt > 0.05:
    print("Probe shows signal ABOVE metadata. Some activation-level info exists.")
else:
    print("Probe does NOT clearly beat metadata. Signal may be surface features.")
if best["mean"] - perm_mean > 0.1:
    print("Probe clearly above chance (permuted). Labels are learnable.")
else:
    print("Probe barely above permuted baseline. Weak or no signal.")
print("=" * 60)

FORMAT-ONLY
Samples:              52
SCR (system win rate): 42.3%

Probe peak:            0.879 ± 0.090 @ layer 22
Metadata control:      0.895 ± 0.088
Permuted control:      0.535

Gap (probe - metadata): +-0.017
Gap (probe - permuted): +0.344

Probe does NOT clearly beat metadata. Signal may be surface features.
Probe clearly above chance (permuted). Labels are learnable.


**Model**: Llama-3.1-8B-Instruct

## Key Results

- **Probe accuracy**: 0.964 (mean_all, layer 16)
- **Metadata-only control**: 0.883, using constraint type, strength, user style, and direction alone, without any activations
- **Gap**: +0.08, likely not statistically significant given the small sample size and high variance
- When restricting to the format constraint only (n=52, SCR≈42%), the probe (0.879) failed to beat the metadata control (0.895), producing a negative gap.

## Proposed Next Steps

1. **Expand and rebalance the dataset**: Maybe more constraint types, more samples per type, and ensure each type has a mix of system-win and user-win outcomes. (related to 2.)

2. **Characterize SCR across model sizes and architectures**: Understanding how conflict resolution behavior varies (e.g., small models with inconsistent SCR vs. larger models with near-perfect SCR) will help identify the best target models for analysis, ones where the behavior is stable enough to study but not trivially perfect.

3. **The negative-result pathway**: If, after dataset improvements, the probe still fails to beat the metadata control, this might suggest that conflict resolution in these models is primarily driven by surface-level cues (position bias, keyword matching) rather than a clean internal circuit.