In [1]:
%cd ../..
%pwd


/root/CoTFaithChecker


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


'/root/CoTFaithChecker'

In [2]:
# %%
"""Cot-Faithfulness – Sentence-level steering notebook (v2)

This notebook lets you steer the model from one sentence category to another
by adding a scaled direction vector (α·(v_target − v_source)) to the penultimate
hidden state during generation.

**Update:** now reports a friendly message if the chosen SOURCE_CATEGORY is not
present in the selected QUESTION_ID and shows which categories *are* there, so
you can pick sensible parameters without crashing.
"""

# %% [markdown] CONFIG ─────────────────────────────────────────────────────────
# Adjust anything in this cell and re‑run it to change the experiment.

MODEL_NAME        = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"  # HF repo or path
VECTORS_JSON      = "c_cluster_analysis/outputs/hints/mmlu/DeepSeek-R1-Distill-Llama-8B/cat_probe/none_unverb_5001.json"
ANNOTATIONS_JSON  = "c_cluster_analysis/outputs/hints/mmlu/DeepSeek-R1-Distill-Llama-8B/confidence/sycophancy_unverb_5001.json"
COMPLETIONS_JSON  = "data/mmlu/DeepSeek-R1-Distill-Llama-8B/none/completions_with_5001.json"

SOURCE_CATEGORY   = "backtracking"          # steer *from* this category
TARGET_CATEGORY   = "forward_planning"      # steer *to*   this category
ALPHAS            = [0.0, 0.3, 0.6, 1.0]    # scaling factors (include 0!)
MAX_NEW_TOKENS    = 64                      # decoding budget for the steered sentence
QUESTION_ID       = 68                      # which question / CoT to run (*must* exist)

DEVICE            = "cuda" if __import__("torch").cuda.is_available() else "cpu"
DTYPE             = "bfloat16"              # matches the checkpoints you used for probing

print("Running on", DEVICE)

# %%
# Imports & helpers ───────────────────────────────────────────────────────────

import json, os, re, logging, torch
from pathlib import Path
from typing import List, Dict, Optional
from transformers import AutoModelForCausalLM, AutoTokenizer

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

def _split_sentences(text: str) -> List[str]:
    """Exact same splitter that was used during vector extraction."""
    start = text.find("<think>")
    if start != -1:
        text = text[start + len("<think>") :]
    end = text.find("</think>")
    if end != -1:
        text = text[:end]
    text = re.sub(r"\s+", " ", text.strip())
    parts = [p.strip() for p in re.split(r"(?<=\.)\s+", text) if p.strip()]
    merged, i = [], 0
    while i < len(parts):
        if re.fullmatch(r"\d+\.", parts[i]) and i + 1 < len(parts):
            merged.append(f"{parts[i]} {parts[i + 1]}")
            i += 2
        else:
            merged.append(parts[i])
            i += 1
    return merged

# %%
# 1.  Load annotations  →  sentence‑id → category mapping for each question

with open(ANNOTATIONS_JSON) as f:
    ann_records = json.load(f)

cat_by_qid: Dict[int, Dict[int, str]] = {}
for rec in ann_records:
    qid = rec["question_id"]
    sid2cat = {}
    for ann in rec["annotations"]:
        sid = ann["sentence_id"]
        numeric_fields = {
            k: v
            for k, v in ann.items()
            if isinstance(v, (int, float)) and k not in {"other", "other_label", "sentence_id"}
        }
        if not numeric_fields:
            continue
        best_cat = max(numeric_fields.items(), key=lambda kv: kv[1])[0]
        sid2cat[sid] = best_cat
    cat_by_qid[qid] = sid2cat

print("Loaded annotations for", len(cat_by_qid), "questions")

# %%
# 2.  Load sentence vectors  →  category → list[tensor]

with open(VECTORS_JSON) as f:
    vec_records = json.load(f)

vectors_by_cat: Dict[str, List[torch.Tensor]] = {}
for rec in vec_records:
    qid = rec["question_id"]
    sentence_cats = cat_by_qid.get(qid, {})
    for s in rec["sentences"]:
        cat = sentence_cats.get(s["sentence_id"])
        if cat is None:
            continue
        v = torch.tensor(s["sent_vec"], dtype=torch.float32)
        vectors_by_cat.setdefault(cat, []).append(v)

if SOURCE_CATEGORY not in vectors_by_cat:
    raise ValueError(f"No vectors available for SOURCE_CATEGORY '{SOURCE_CATEGORY}'. Check spelling or data.")
if TARGET_CATEGORY not in vectors_by_cat:
    raise ValueError(f"No vectors available for TARGET_CATEGORY '{TARGET_CATEGORY}'. Check spelling or data.")

mean_vec = {c: torch.stack(v).mean(0) for c, v in vectors_by_cat.items()}
print("Have mean vectors for", len(mean_vec), "categories")

# %%
# 3.  Steering direction

direction = (mean_vec[TARGET_CATEGORY] - mean_vec[SOURCE_CATEGORY]).to(dtype=getattr(torch, DTYPE))
print("Direction vector norm:", float(direction.norm()))

# %%
# 4.  Fetch reference CoT & locate the SOURCE_CATEGORY sentence, with fallback

with open(COMPLETIONS_JSON) as f:
    completions = {r["question_id"]: r["completion"] for r in json.load(f)}

if QUESTION_ID not in completions:
    raise KeyError(f"QUESTION_ID {QUESTION_ID} not found in {COMPLETIONS_JSON}")

reference_cot = completions[QUESTION_ID]
all_sentences = _split_sentences(reference_cot)
cat_map = cat_by_qid.get(QUESTION_ID, {})

# search for the first sentence tagged with SOURCE_CATEGORY
source_index: Optional[int] = next((i for i, _ in enumerate(all_sentences, 1) if cat_map.get(i) == SOURCE_CATEGORY), None)

if source_index is None:
    available = sorted(set(cat_map.values()))
    raise ValueError(
        f"No sentence labelled '{SOURCE_CATEGORY}' in QUESTION_ID {QUESTION_ID}.\n"
        f"Available categories for this CoT: {available}.\n"
        f"→ Choose a different SOURCE_CATEGORY or pick another QUESTION_ID."
    )

source_index -= 1  # convert to 0‑based
prefix_text = " ".join(all_sentences[:source_index])
original_sentence = all_sentences[source_index]

print("Found SOURCE_CATEGORY sentence at position", source_index + 1)
print("→", original_sentence)

# %%
# 5.  Load model & tokenizer

tok = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME, torch_dtype=getattr(torch, DTYPE), device_map="auto"
)
model.eval()
if tok.pad_token_id is None:
    tok.pad_token = tok.eos_token
    model.config.pad_token_id = tok.eos_token_id

print("Model loaded – #params:", sum(p.numel() for p in model.parameters()) / 1e6, "M")

# %%
# 6.  Greedy generation with penultimate‑layer steering

@torch.no_grad()
def generate_sentence(prefix: str, alpha: float, max_tokens: int = 64) -> str:
    device = next(model.parameters()).device
    ids = tok(prefix, return_tensors="pt").input_ids.to(device)
    generated = ids.clone()

    for _ in range(max_tokens):
        out = model(generated, output_hidden_states=True)
        penult_last = out.hidden_states[-2][:, -1, :]  # (1, D)
        steered = penult_last + alpha * direction.to(device)
        logits = steered @ model.lm_head.weight.T  # (1, vocab)
        next_id = logits.argmax(-1, keepdim=True)
        generated = torch.cat([generated, next_id], dim=-1)
        tok_str = tok.decode(next_id[0])
        if tok_str == "." or next_id.item() == tok.eos_token_id:
            break

    continuation = tok.decode(generated[0][ids.size(1):], skip_special_tokens=True)
    first_sentence = _split_sentences(continuation)[0] if continuation else ""
    return first_sentence.strip()

# %%
# 7.  Run the α sweep and display

print("\n===== Steering results =====")
print("target sentence:", original_sentence)
for a in ALPHAS:
    try:
        s = generate_sentence(prefix_text, a, MAX_NEW_TOKENS)
    except Exception as e:
        s = f"<generation error: {e}>"
    print(f"α = {a:+.2f}:", s)


Running on cuda


  from .autonotebook import tqdm as notebook_tqdm


Loaded annotations for 200 questions
Have mean vectors for 11 categories
Direction vector norm: 10.0625


ValueError: No sentence labelled 'backtracking' in QUESTION_ID 68.
Available categories for this CoT: ['answer_reporting', 'forward_planning', 'knowledge_augmentation', 'logical_deduction', 'option_elimination', 'option_restating', 'problem_restating'].
→ Choose a different SOURCE_CATEGORY or pick another QUESTION_ID.