# 01 – Discovery: Self-Referential "I" Token Representations

Initial exploration of whether the "I" token carries different representations depending
on whether the model is referring to itself vs. producing quoted speech vs. roleplaying.

- **Model:** Mistral-7B-Instruct-v0.3
- **Method:** Generate responses, extract hidden states at the first "I" token, compare across conditions
- **Analyses:** PCA/UMAP visualization, linear probes, cosine similarity, self-model direction

In [None]:
import matplotlib
matplotlib.use('Agg')

import torch
import numpy as np
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, AutoModelForCausalLM
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.metrics.pairwise import cosine_similarity as sklearn_cosine
from sklearn.decomposition import PCA
import umap
import warnings
warnings.filterwarnings("ignore")

# ── Load model ──────────────────────────────────────────────────────────────
MODEL_NAME = "mistralai/Mistral-7B-Instruct-v0.3"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16,
    device_map="auto",
    output_hidden_states=True,
)
model.eval()

# Mistral has no pad token by default
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    model.config.pad_token_id = model.config.eos_token_id

print(f"Loaded {MODEL_NAME}")
print(f"Layers: {model.config.num_hidden_layers}, Hidden dim: {model.config.hidden_size}")
print(f"Device map: {model.hf_device_map if hasattr(model, 'hf_device_map') else 'N/A'}")


def make_instruct_prompt(user_msg: str) -> str:
    """Wrap a user message in the Mistral instruct template."""
    return f"[INST] {user_msg} [/INST]"

N_LAYERS = model.config.num_hidden_layers  # typically 32 for Mistral-7B

## Prompt Definitions

Three conditions with 20 prompts each:
1. **Self-referential** — the model talks about *itself*
2. **Quoted speech** — the model produces a character saying "I …"
3. **Roleplay** — the model *is* a character and says "I …"

In [None]:
# ── Condition 1: Self-referential prompts ───────────────────────────────────
self_ref_prompts = [
    "Tell me about yourself in one sentence.",
    "Describe what you are in one sentence.",
    "Introduce yourself briefly.",
    "What are you? Answer in one sentence.",
    "Who are you? Answer in one sentence.",
    "Explain what kind of entity you are in one sentence.",
    "Give a one-sentence description of yourself.",
    "In one sentence, tell me your nature.",
    "Describe your identity in one sentence.",
    "What is your purpose? Answer in one sentence.",
    "Tell me what you do in one sentence.",
    "Characterize yourself in one sentence.",
    "State what you are in one sentence.",
    "Give me a short self-description.",
    "How would you describe yourself? Answer in one sentence.",
    "Summarize what you are in one sentence.",
    "Present yourself in one sentence.",
    "What defines you? Answer in one sentence.",
    "Provide a one-line self-introduction.",
    "Express your identity in one sentence.",
]

# ── Condition 2: Quoted speech prompts ──────────────────────────────────────
_characters_quoted = [
    "Mary", "the sailor", "the detective", "Sarah", "the king",
    "the child", "the professor", "the warrior", "Tom", "the astronaut",
    "Maria", "the coach", "Dr. Chen", "the farmer", "Elena",
    "the pilot", "the grandmother", "James", "the general", "Lisa",
]
quoted_speech_prompts = [
    f"Complete this story in one sentence: {char} turned to John and said 'I"
    for char in _characters_quoted
]

# ── Condition 3: Roleplay prompts ──────────────────────────────────────────
_characters_roleplay = [
    "a pirate captain", "a medieval knight", "a Roman senator",
    "a cowboy", "a Victorian gentleman", "a samurai",
    "a space explorer", "a wizard", "a noir detective",
    "a pharaoh", "a Viking", "a French chef",
    "a Shakespearean actor", "a robot", "a Greek philosopher",
    "a jazz musician", "an Arctic explorer", "a mad scientist",
    "a dragon", "an alien",
]
roleplay_prompts = [
    f"Respond as {char} in one sentence."
    for char in _characters_roleplay
]

# Wrap all prompts through the instruct template
self_ref_prompts  = [make_instruct_prompt(p) for p in self_ref_prompts]
quoted_speech_prompts = [make_instruct_prompt(p) for p in quoted_speech_prompts]
roleplay_prompts  = [make_instruct_prompt(p) for p in roleplay_prompts]

print(f"Self-ref:  {len(self_ref_prompts)} prompts")
print(f"Quoted:    {len(quoted_speech_prompts)} prompts")
print(f"Roleplay:  {len(roleplay_prompts)} prompts")
print()
print("Example self-ref:", self_ref_prompts[0])
print("Example quoted:  ", quoted_speech_prompts[0])
print("Example roleplay:", roleplay_prompts[0])

## Extraction Function

Generate a response, then run a forward pass on the full sequence (prompt + generation)
to get hidden states at every layer.  Find the first "I" token in the *generated* portion
and return its activation vector at each layer.

In [None]:
@torch.no_grad()
def get_i_token_representations(prompt, model, tokenizer, target_token="I", max_new_tokens=80):
    """Generate text and extract hidden-state vectors at the first 'I' token in the response.

    Returns
    -------
    dict with keys:
        'layers'  : dict  layer_idx -> np.ndarray (hidden_dim,)
        'text'    : str   generated text (response only)
        'tokens'  : list  token strings for the full sequence
    or None if no target token found in the generated portion.
    """
    # Tokenize prompt
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    prompt_len = inputs["input_ids"].shape[1]

    # Generate
    gen_out = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False,         # greedy for reproducibility
        pad_token_id=tokenizer.pad_token_id,
    )
    full_ids = gen_out[0]  # (seq_len,)
    generated_ids = full_ids[prompt_len:]
    generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True)

    # Forward pass on full sequence to get hidden states
    outputs = model(full_ids.unsqueeze(0), output_hidden_states=True)
    hidden_states = outputs.hidden_states  # tuple of (1, seq_len, hidden_dim)

    # Decode all tokens for inspection
    all_tokens = [tokenizer.decode(tid) for tid in full_ids]

    # Find first "I" token in the generated portion
    target_pos = None
    for pos in range(prompt_len, len(full_ids)):
        tok_str = tokenizer.decode(full_ids[pos]).strip()
        if tok_str == target_token:
            target_pos = pos
            break

    if target_pos is None:
        return None

    # Extract activations at that position from every layer
    layer_reps = {}
    for layer_idx in range(len(hidden_states)):
        vec = hidden_states[layer_idx][0, target_pos, :].cpu().float().numpy()
        layer_reps[layer_idx] = vec

    return {
        "layers": layer_reps,
        "text": generated_text,
        "tokens": all_tokens,
    }

## Run Extractions

Loop over all three conditions.  Some prompts may not produce an "I" token — we skip those.

In [None]:
conditions = {
    "self_ref": self_ref_prompts,
    "quoted":   quoted_speech_prompts,
    "roleplay": roleplay_prompts,
}

results = {}  # condition_name -> list of layer_rep dicts
texts   = {}  # condition_name -> list of generated texts

for cond_name, prompts in conditions.items():
    results[cond_name] = []
    texts[cond_name]   = []
    print(f"\n{'='*60}")
    print(f"Condition: {cond_name} ({len(prompts)} prompts)")
    print(f"{'='*60}")
    for i, prompt in enumerate(prompts):
        out = get_i_token_representations(prompt, model, tokenizer)
        if out is None:
            print(f"  [{i+1:2d}/{len(prompts)}] SKIPPED (no 'I' token found)")
            continue
        results[cond_name].append(out["layers"])
        texts[cond_name].append(out["text"])
        if i < 3:
            print(f"  [{i+1:2d}/{len(prompts)}] {out['text'][:80]}...")
        else:
            print(f"  [{i+1:2d}/{len(prompts)}] OK")

print(f"\n--- Sample counts ---")
for cond_name in results:
    print(f"  {cond_name}: {len(results[cond_name])} samples")

## PCA & UMAP Visualization

Visualize the "I" token representations at several layers using PCA (2-D) and UMAP (2-D).
We expect the conditions to separate more clearly in later layers if the model encodes
self-reference distinctly.

In [None]:
# Layers to visualize
analysis_layers = [0, N_LAYERS // 4, N_LAYERS // 2, 3 * N_LAYERS // 4, N_LAYERS]
# N_LAYERS corresponds to the final layer (hidden_states has N_LAYERS+1 entries: embedding + each layer)
print(f"Analysis layers: {analysis_layers}")


def collect_representations(results, layer_idx):
    """Gather vectors and labels for a given layer across all conditions."""
    X, y, labels = [], [], []
    label_map = {"self_ref": 0, "quoted": 1, "roleplay": 2}
    for cond_name, reps_list in results.items():
        for rep in reps_list:
            if layer_idx in rep:
                X.append(rep[layer_idx])
                y.append(label_map[cond_name])
                labels.append(cond_name)
    return np.array(X), np.array(y), labels


color_map = {"self_ref": "red", "quoted": "blue", "roleplay": "green"}
label_names = {0: "self_ref", 1: "quoted", 2: "roleplay"}

fig, axes = plt.subplots(2, len(analysis_layers), figsize=(5 * len(analysis_layers), 10))
fig.suptitle('"I" Token Representations across Layers', fontsize=16, y=1.02)

for col, layer_idx in enumerate(analysis_layers):
    X, y, labels = collect_representations(results, layer_idx)
    if len(X) == 0:
        continue

    # PCA
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X)

    ax = axes[0, col]
    for cls_id in sorted(set(y)):
        mask = y == cls_id
        name = label_names[cls_id]
        ax.scatter(X_pca[mask, 0], X_pca[mask, 1],
                   c=color_map[name], label=name, alpha=0.7, s=40)
    ax.set_title(f"Layer {layer_idx} (PCA)")
    if col == 0:
        ax.set_ylabel("PC2")
    ax.set_xlabel("PC1")
    ax.legend(fontsize=7)

    # UMAP
    reducer = umap.UMAP(n_components=2, random_state=42, n_neighbors=min(15, len(X) - 1))
    X_umap = reducer.fit_transform(X)

    ax = axes[1, col]
    for cls_id in sorted(set(y)):
        mask = y == cls_id
        name = label_names[cls_id]
        ax.scatter(X_umap[mask, 0], X_umap[mask, 1],
                   c=color_map[name], label=name, alpha=0.7, s=40)
    ax.set_title(f"Layer {layer_idx} (UMAP)")
    if col == 0:
        ax.set_ylabel("UMAP2")
    ax.set_xlabel("UMAP1")
    ax.legend(fontsize=7)

plt.tight_layout()
plt.savefig("pca_umap_visualization.png", dpi=150, bbox_inches="tight")
plt.show()
print("Saved pca_umap_visualization.png")

## Linear Probes

Train simple logistic-regression probes to see how linearly separable the conditions are.
- **Binary probe:** self-ref vs. quoted speech
- **3-way probe:** self-ref vs. quoted vs. roleplay

We use stratified 5-fold cross-validation.

In [None]:
print(f"{'Layer':>6s}  {'Binary (self v quot)':>22s}  {'3-way':>12s}")
print("-" * 46)

probe_results = {}

for layer_idx in analysis_layers:
    X, y, _ = collect_representations(results, layer_idx)
    if len(X) == 0:
        continue

    # ── Binary: self-ref (0) vs quoted (1) ──────────────────────────────────
    mask_bin = (y == 0) | (y == 1)
    X_bin, y_bin = X[mask_bin], y[mask_bin]

    if len(np.unique(y_bin)) < 2 or len(y_bin) < 5:
        acc_bin = float("nan")
    else:
        clf_bin = LogisticRegression(max_iter=2000, C=1.0, solver="lbfgs")
        k = min(5, min(np.bincount(y_bin)))
        cv = StratifiedKFold(n_splits=max(k, 2), shuffle=True, random_state=42)
        scores_bin = cross_val_score(clf_bin, X_bin, y_bin, cv=cv, scoring="accuracy")
        acc_bin = scores_bin.mean()

    # ── 3-way ───────────────────────────────────────────────────────────────
    if len(np.unique(y)) < 3 or len(y) < 5:
        acc_3 = float("nan")
    else:
        clf_3 = LogisticRegression(max_iter=2000, C=1.0, solver="lbfgs", multi_class="multinomial")
        k3 = min(5, min(np.bincount(y)))
        cv3 = StratifiedKFold(n_splits=max(k3, 2), shuffle=True, random_state=42)
        scores_3 = cross_val_score(clf_3, X, y, cv=cv3, scoring="accuracy")
        acc_3 = scores_3.mean()

    probe_results[layer_idx] = {"binary": acc_bin, "three_way": acc_3}
    print(f"{layer_idx:6d}  {acc_bin:22.3f}  {acc_3:12.3f}")

## Cosine Similarity Analysis

Compute mean representations per condition and measure cosine similarities.
Also compare within-condition vs. between-condition pairwise similarities.

In [None]:
cond_names = ["self_ref", "quoted", "roleplay"]

fig, axes = plt.subplots(1, len(analysis_layers), figsize=(5 * len(analysis_layers), 4))
if len(analysis_layers) == 1:
    axes = [axes]

for col, layer_idx in enumerate(analysis_layers):
    X, y, _ = collect_representations(results, layer_idx)
    if len(X) == 0:
        continue

    # Mean representations per condition
    means = {}
    for cls_id, name in label_names.items():
        mask = y == cls_id
        if mask.sum() > 0:
            means[name] = X[mask].mean(axis=0, keepdims=True)

    # Cosine similarity matrix between condition means
    mean_vecs = np.vstack([means[n] for n in cond_names if n in means])
    present_names = [n for n in cond_names if n in means]
    sim_matrix = sklearn_cosine(mean_vecs)

    ax = axes[col]
    im = ax.imshow(sim_matrix, vmin=0.8, vmax=1.0, cmap="RdYlGn")
    ax.set_xticks(range(len(present_names)))
    ax.set_xticklabels(present_names, rotation=45, ha="right", fontsize=8)
    ax.set_yticks(range(len(present_names)))
    ax.set_yticklabels(present_names, fontsize=8)
    ax.set_title(f"Layer {layer_idx}")
    for i in range(len(present_names)):
        for j in range(len(present_names)):
            ax.text(j, i, f"{sim_matrix[i,j]:.3f}", ha="center", va="center", fontsize=8)

fig.suptitle("Cosine Similarity between Condition Means", fontsize=14)
plt.tight_layout()
plt.savefig("cosine_similarity.png", dpi=150, bbox_inches="tight")
plt.show()
print("Saved cosine_similarity.png")

# Within vs between condition similarities
print("\nWithin-condition vs Between-condition pairwise cosine similarity:")
print(f"{'Layer':>6s}  {'Within':>10s}  {'Between':>10s}  {'Gap':>10s}")
print("-" * 42)
for layer_idx in analysis_layers:
    X, y, _ = collect_representations(results, layer_idx)
    if len(X) == 0:
        continue
    full_sim = sklearn_cosine(X)

    within_sims, between_sims = [], []
    for i in range(len(X)):
        for j in range(i + 1, len(X)):
            if y[i] == y[j]:
                within_sims.append(full_sim[i, j])
            else:
                between_sims.append(full_sim[i, j])

    w = np.mean(within_sims) if within_sims else float("nan")
    b = np.mean(between_sims) if between_sims else float("nan")
    print(f"{layer_idx:6d}  {w:10.4f}  {b:10.4f}  {w - b:10.4f}")

## Self-Model Direction Analysis

Train a logistic regression classifier on self-ref vs. quoted and use its weight vector
as a "self-model direction."  Then project all three conditions onto this direction.

If the model has a coherent notion of self-reference, roleplay representations should
fall somewhere *between* self-ref and quoted speech on this axis.

In [None]:
fig, axes = plt.subplots(1, len(analysis_layers), figsize=(5 * len(analysis_layers), 4))
if len(analysis_layers) == 1:
    axes = [axes]

for col, layer_idx in enumerate(analysis_layers):
    X, y, _ = collect_representations(results, layer_idx)
    if len(X) == 0:
        continue

    # Train self-ref (0) vs quoted (1) classifier
    mask_bin = (y == 0) | (y == 1)
    X_bin, y_bin = X[mask_bin], y[mask_bin]

    if len(np.unique(y_bin)) < 2:
        continue

    clf = LogisticRegression(max_iter=2000, C=1.0, solver="lbfgs")
    clf.fit(X_bin, y_bin)

    # Self-model direction = classifier weight vector (normalized)
    direction = clf.coef_[0]
    direction = direction / np.linalg.norm(direction)

    # Project all samples
    projections = X @ direction  # dot product

    # Box plot by condition
    ax = axes[col]
    data_by_cond = []
    tick_labels = []
    colors = []
    for cls_id, name in label_names.items():
        mask = y == cls_id
        if mask.sum() > 0:
            data_by_cond.append(projections[mask])
            tick_labels.append(name)
            colors.append(color_map[name])

    bp = ax.boxplot(data_by_cond, labels=tick_labels, patch_artist=True)
    for patch, c in zip(bp['boxes'], colors):
        patch.set_facecolor(c)
        patch.set_alpha(0.5)
    ax.set_title(f"Layer {layer_idx}")
    ax.set_ylabel("Self-model projection")
    ax.tick_params(axis='x', rotation=30)

fig.suptitle("Self-Model Direction: Projection of 'I' Representations", fontsize=14)
plt.tight_layout()
plt.savefig("self_model_direction.png", dpi=150, bbox_inches="tight")
plt.show()
print("Saved self_model_direction.png")

## Layer-by-Layer Probe Accuracy

Run the binary and 3-way linear probes at *every* layer to see where self-referential
information emerges and peaks.

In [None]:
all_layers = list(range(N_LAYERS + 1))  # 0 = embedding, 1..N_LAYERS = transformer layers

binary_accs = []
binary_stds = []
three_accs  = []
three_stds  = []

for layer_idx in all_layers:
    X, y, _ = collect_representations(results, layer_idx)
    if len(X) == 0:
        binary_accs.append(float("nan"))
        binary_stds.append(0)
        three_accs.append(float("nan"))
        three_stds.append(0)
        continue

    # Binary
    mask_bin = (y == 0) | (y == 1)
    X_bin, y_bin = X[mask_bin], y[mask_bin]
    if len(np.unique(y_bin)) < 2 or len(y_bin) < 5:
        binary_accs.append(float("nan"))
        binary_stds.append(0)
    else:
        clf = LogisticRegression(max_iter=2000, C=1.0, solver="lbfgs")
        k = min(5, min(np.bincount(y_bin)))
        cv = StratifiedKFold(n_splits=max(k, 2), shuffle=True, random_state=42)
        scores = cross_val_score(clf, X_bin, y_bin, cv=cv, scoring="accuracy")
        binary_accs.append(scores.mean())
        binary_stds.append(scores.std())

    # 3-way
    if len(np.unique(y)) < 3 or len(y) < 5:
        three_accs.append(float("nan"))
        three_stds.append(0)
    else:
        clf3 = LogisticRegression(max_iter=2000, C=1.0, solver="lbfgs", multi_class="multinomial")
        k3 = min(5, min(np.bincount(y)))
        cv3 = StratifiedKFold(n_splits=max(k3, 2), shuffle=True, random_state=42)
        scores3 = cross_val_score(clf3, X, y, cv=cv3, scoring="accuracy")
        three_accs.append(scores3.mean())
        three_stds.append(scores3.std())

binary_accs = np.array(binary_accs)
binary_stds = np.array(binary_stds)
three_accs  = np.array(three_accs)
three_stds  = np.array(three_stds)

fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(all_layers, binary_accs, "o-", color="purple", label="Binary (self-ref vs quoted)", markersize=4)
ax.fill_between(all_layers, binary_accs - binary_stds, binary_accs + binary_stds,
                color="purple", alpha=0.15)
ax.plot(all_layers, three_accs, "s-", color="orange", label="3-way", markersize=4)
ax.fill_between(all_layers, three_accs - three_stds, three_accs + three_stds,
                color="orange", alpha=0.15)
ax.axhline(0.5, ls="--", color="gray", alpha=0.5, label="chance (binary)")
ax.axhline(1/3, ls=":", color="gray", alpha=0.5, label="chance (3-way)")
ax.set_xlabel("Layer")
ax.set_ylabel("CV Accuracy")
ax.set_title("Linear Probe Accuracy by Layer")
ax.legend()
ax.set_ylim(0, 1.05)
plt.tight_layout()
plt.savefig("layer_probe_accuracy.png", dpi=150, bbox_inches="tight")
plt.show()
print("Saved layer_probe_accuracy.png")

In [None]:
print("=" * 70)
print("EXPERIMENT SUMMARY")
print("=" * 70)

print(f"\nModel: {MODEL_NAME}")
print(f"Layers: {N_LAYERS}, Hidden dim: {model.config.hidden_size}")

print(f"\n--- Sample Counts ---")
for cond_name in results:
    print(f"  {cond_name}: {len(results[cond_name])} / {len(conditions[cond_name])} prompts yielded 'I' tokens")

print(f"\n--- Example Generated Texts ---")
for cond_name in texts:
    print(f"\n  [{cond_name}]")
    for t in texts[cond_name][:3]:
        print(f"    {t[:100]}")

print(f"\n--- Probe Results (selected layers) ---")
print(f"{'Layer':>6s}  {'Binary':>8s}  {'3-way':>8s}")
for layer_idx, res in sorted(probe_results.items()):
    print(f"{layer_idx:6d}  {res['binary']:8.3f}  {res['three_way']:8.3f}")

# Best layers
best_bin_layer = max(probe_results, key=lambda l: probe_results[l]["binary"])
best_3_layer   = max(probe_results, key=lambda l: probe_results[l]["three_way"])
print(f"\nBest binary probe: layer {best_bin_layer} ({probe_results[best_bin_layer]['binary']:.3f})")
print(f"Best 3-way probe:  layer {best_3_layer} ({probe_results[best_3_layer]['three_way']:.3f})")
print("\nDone.")