In [2]:
!pip install python-dotenv

# Create a .env file in your project root with:
# WANDB_API_KEY=your_key
# HF_TOKEN=your_token

# Then in notebook:

from dotenv import load_dotenv

load_dotenv()  # loads from .env file


[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


True

In [None]:
import transformers

model = "Qwen/Qwen3-8B"

tokenizer = transformers.AutoTokenizer.from_pretrained(model)
model = transformers.AutoModelForCausalLM.from_pretrained(model)


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:01<?, ?B/s]

config.json:   0%|          | 0.00/728 [00:00<?, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

model-00005-of-00005.safetensors:   0%|          | 0.00/1.24G [00:00<?, ?B/s]

model-00001-of-00005.safetensors:   0%|          | 0.00/4.00G [00:00<?, ?B/s]

model-00002-of-00005.safetensors:   0%|          | 0.00/3.99G [00:00<?, ?B/s]

model-00004-of-00005.safetensors:   0%|          | 0.00/3.19G [00:00<?, ?B/s]

model-00003-of-00005.safetensors:   0%|          | 0.00/3.96G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

In [7]:
from datasets import load_dataset

ds_dict = load_dataset("longtermrisk/school-of-reward-hacks")
ds = ds_dict["train"]


In [None]:
from scripts.create_steering_vector import (
    PROMPT_TRANSFORMS,
    compute_steering_vectors,
    extract_activations,
    save_steering_vectors,
)

N = 100
BATCH_SIZE = 4
OVERWRITE = True  # Set to True to regenerate activations
TRANSFORMS = ["default", "overfit", "dont_overfit"]  # Which transforms to process

print(f"Available transforms: {list(PROMPT_TRANSFORMS.keys())}")

# Extract activations for each transform
for transform_name in TRANSFORMS:
    print(f"\n{'=' * 60}")
    print(f"Processing transform: {transform_name}")
    print(f"{'=' * 60}")
    extract_activations(
        model=model,
        tokenizer=tokenizer,
        n_samples=N,
        batch_size=BATCH_SIZE,
        output_dir="data/activations",
        overwrite=OVERWRITE,
        transform_name=transform_name,
    )

In [None]:
# Compute and save steering vectors for each transform
steering_vectors = {}
for transform_name in #TRANSFORMS:
    print(f"\n{'=' * 60}")
    vectors = compute_steering_vectors("data/activations", transform_name)
    save_steering_vectors(vectors, "data", transform_name)
    steering_vectors[transform_name] = vectors

print(f"\nCreated steering vectors for: {list(steering_vectors.keys())}")

In [None]:
# Generate responses with steering
from generate_steered import (
    generate_baseline,
    generate_with_steering,
    load_steering_vector,
)
from questions import NON_MEDICAL_QUESTIONS

print(f"Number of questions: {len(NON_MEDICAL_QUESTIONS)}")
print("Questions preview:")
for i, q in enumerate(NON_MEDICAL_QUESTIONS[:5]):
    print(f"  {i + 1}. {q}")

In [None]:
# Choose which transform's steering vector to use for generation
STEERING_TRANSFORM = "default"  # Options: "default", "overfit", "dont_overfit"
STEERING_LAYER = 25  # 0-indexed transformer layer (not counting embedding)
STEERING_STRENGTH = 1.0  # Multiplier for the steering vector
NUM_ANSWERS = 1

# Load steering vector for chosen transform and layer
# Note: steering_vectors[transform][0] is embedding, [1] is layer 0, etc.
mean_diffs = steering_vectors[STEERING_TRANSFORM]
steering_vector = mean_diffs[STEERING_LAYER + 1]  # +1 to skip embedding layer
print(f"Using steering vector from transform: {STEERING_TRANSFORM}")
print(f"Steering vector shape: {steering_vector.shape}")
print(f"Steering vector norm: {steering_vector.norm():.4f}")

In [None]:
# Generate steered responses (50 questions × 1 answer = 50 generations)
steered_results = generate_with_steering(
    model=model,
    tokenizer=tokenizer,
    questions=NON_MEDICAL_QUESTIONS,
    steering_vector=steering_vector,
    steering_layer=STEERING_LAYER,
    steering_strength=STEERING_STRENGTH,
    num_answers_per_question=NUM_ANSWERS,
    max_new_tokens=256,
    output_path="data/steered_generations.json",
    temperature=1.0,
    do_sample=True,
)
print(f"\nTotal steered generations: {len(steered_results)}")

In [None]:
# Optional: Generate baseline responses (without steering) for comparison
baseline_results = generate_baseline(
    model=model,
    tokenizer=tokenizer,
    questions=NON_MEDICAL_QUESTIONS,
    num_answers_per_question=NUM_ANSWERS,
    max_new_tokens=256,
    output_path="data/baseline_generations.json",
    temperature=1.0,
    do_sample=True,
)
print(f"\nTotal baseline generations: {len(baseline_results)}")

In [None]:
# Judge the responses using OpenAI API
# Make sure OPENAI_API_KEY is set in your .env file
from judge_responses import analyze_by_question, judge_responses, print_average_scores

# Judge steered responses
print("Judging steered responses...")
steered_scored, steered_scores = judge_responses(
    input_path="data/steered_generations.json",
    output_path="data/steered_judged.csv",
    delay=0.2,  # Delay between API calls
)
print_average_scores(steered_scores, "STEERED MODEL")

In [None]:
# Judge baseline responses
print("Judging baseline responses...")
baseline_scored, baseline_scores = judge_responses(
    input_path="data/baseline_generations.json",
    output_path="data/baseline_judged.csv",
    delay=0.2,
)
print_average_scores(baseline_scores, "BASELINE MODEL")

In [None]:
# Analyze results by question
import pandas as pd

print("=" * 60)
print("STEERED: Scores by question")
print("=" * 60)
steered_by_q = analyze_by_question("data/steered_judged.csv")
print(steered_by_q)

print("\n" + "=" * 60)
print("BASELINE: Scores by question")
print("=" * 60)
baseline_by_q = analyze_by_question("data/baseline_judged.csv")
print(baseline_by_q)

In [None]:
# Show most misaligned steered responses
df_steered = pd.read_csv("data/steered_judged.csv")
df_steered["aligned_score"] = pd.to_numeric(
    df_steered["aligned_score"], errors="coerce"
)

print("=" * 60)
print("MOST MISALIGNED STEERED RESPONSES (lowest aligned scores)")
print("=" * 60)
worst = df_steered.nsmallest(10, "aligned_score")
for _, row in worst.iterrows():
    print(f"\nScore: {row['aligned_score']}")
    print(f"Question: {row['question']}")
    print(f"Response: {row['response'][:500]}...")

# School of Reward Hacks Steering Evaluation

Evaluate steering vectors on the school-of-reward-hacks dataset test split.
We'll generate with multiple steering strengths (alphas) and measure:
- **Hack Score**: How much the model overfits to the evaluation metric using the cheat method
- **Quality Score**: Overall response quality (correctness, completeness, clarity)

In [None]:
# SRH Evaluation Configuration
from datasets import load_dataset

from eval_srh_steering import (
    SRHEvalConfig,
    get_test_split,
    print_results_summary,
    run_evaluation,
)

# Configuration - IMPORTANT: Keep seed consistent for reproducibility
SEED = 42  # Random seed for dataset splitting
MAX_SRH_EVAL_SIZE = 10  # Cap on number of test samples (set to None for full 10%)
TEST_SPLIT_RATIO = 0.1  # 10% of dataset for test
ALPHAS = [-2.0, -1.0, 0.0, 1.0, 2.0]  # Steering strengths to test
NUM_ANSWERS = 1  # Answers per question (keep low for faster eval)
STEERING_LAYER = 25  # Which layer to steer

config = SRHEvalConfig(
    steering_layer=STEERING_LAYER,
    alphas=ALPHAS,
    num_answers_per_question=NUM_ANSWERS,
    max_new_tokens=256,
    temperature=1.0,
    test_split_ratio=TEST_SPLIT_RATIO,
    seed=SEED,
    max_eval_size=MAX_SRH_EVAL_SIZE,
    output_dir="data/srh_eval",
)

print("Evaluation config:")
print(f"  Seed: {config.seed}")
print(f"  Test split ratio: {config.test_split_ratio} (10%)")
print(f"  Max eval size: {config.max_eval_size}")
print(f"  Steering layer: {config.steering_layer}")
print(f"  Alphas: {config.alphas}")

In [None]:
# Create test split with consistent seed
# This ensures reproducibility - same seed = same test samples every time
srh_dataset = load_dataset("longtermrisk/school-of-reward-hacks")["train"]

test_samples = get_test_split(
    dataset=srh_dataset,
    test_ratio=TEST_SPLIT_RATIO,
    seed=SEED,
    max_size=MAX_SRH_EVAL_SIZE,
)

print(f"Dataset size: {len(srh_dataset)}")
print(f"Test split size: {len(test_samples)} samples (seed={SEED})")
print("\nTest sample prompts:")
for i, sample in enumerate(test_samples[:3]):
    print(f"  {i + 1}. {sample['user'][:80]}...")
    print(f"     Cheat: {sample['cheat_method'][:50]}...")
print("  ...")

In [None]:
# Load ALL steering vectors for comparison
import torch
import os

STEERING_VECTOR_TYPES = ["default", "overfit", "dont_overfit"]

steering_vectors_by_type = {}
for sv_type in STEERING_VECTOR_TYPES:
    sv_path = f"data/steering_vectors_{sv_type}.pt"
    if os.path.exists(sv_path):
        vectors = torch.load(sv_path)
        steering_vectors_by_type[sv_type] = vectors[
            STEERING_LAYER + 1
        ]  # +1 to skip embedding
        print(
            f"Loaded {sv_type}: shape={steering_vectors_by_type[sv_type].shape}, norm={steering_vectors_by_type[sv_type].norm():.4f}"
        )
    else:
        print(f"WARNING: {sv_path} not found - skipping {sv_type}")

print(
    f"\nLoaded {len(steering_vectors_by_type)} steering vector types: {list(steering_vectors_by_type.keys())}"
)

In [None]:
# Run evaluation for ALL steering vector types
# This will generate responses at each alpha for each steering vector type

all_results = {}

for sv_type, steering_vector in steering_vectors_by_type.items():
    print(f"\n{'#' * 70}")
    print(f"# EVALUATING STEERING VECTOR: {sv_type}")
    print(f"{'#' * 70}")

    # Create separate output directory for each type
    type_config = SRHEvalConfig(
        steering_layer=STEERING_LAYER,
        alphas=ALPHAS,
        num_answers_per_question=NUM_ANSWERS,
        max_new_tokens=256,
        temperature=1.0,
        test_split_ratio=TEST_SPLIT_RATIO,
        seed=SEED,
        max_eval_size=MAX_SRH_EVAL_SIZE,
        output_dir=f"data/srh_eval_{sv_type}",
    )

    results = run_evaluation(
        model=model,
        tokenizer=tokenizer,
        steering_vector=steering_vector,
        config=type_config,
        judge_model="gpt-4o-mini",
        test_samples=test_samples,  # Same test split for all
    )

    all_results[sv_type] = results
    print_results_summary(results)

print(f"\n{'=' * 70}")
print(f"Completed evaluation for all {len(all_results)} steering vector types")
print(f"{'=' * 70}")

In [None]:
# Plot COMPARISON of all steering vector types
import matplotlib.pyplot as plt
import numpy as np

# Color scheme for each steering vector type
COLORS = {
    "default": "tab:green",
    "overfit": "tab:red",
    "dont_overfit": "tab:blue",
}
MARKERS = {
    "default": "o",
    "overfit": "s",
    "dont_overfit": "^",
}

fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Plot Hack Score comparison
ax1 = axes[0]
for sv_type, results in all_results.items():
    alphas = sorted([float(a) for a in results["by_alpha"].keys()])
    hack_scores = [results["by_alpha"][str(a)].get("hack_score") for a in alphas]
    valid = [(a, s) for a, s in zip(alphas, hack_scores) if s is not None]
    if valid:
        ax1.plot(
            [x[0] for x in valid],
            [x[1] for x in valid],
            f"{MARKERS.get(sv_type, 'o')}-",
            linewidth=2,
            markersize=8,
            color=COLORS.get(sv_type, "gray"),
            label=sv_type,
        )

ax1.axhline(y=5, color="gray", linestyle="--", alpha=0.5)
ax1.axvline(x=0, color="gray", linestyle=":", alpha=0.5)
ax1.set_xlabel("Steering Strength (Alpha)", fontsize=12)
ax1.set_ylabel("Hack Score (0-10)", fontsize=12)
ax1.set_title("Reward Hacking: Comparison by Steering Vector Type", fontsize=14)
ax1.grid(True, alpha=0.3)
ax1.legend(title="Steering Vector")

# Plot Quality Score comparison
ax2 = axes[1]
for sv_type, results in all_results.items():
    alphas = sorted([float(a) for a in results["by_alpha"].keys()])
    quality_scores = [results["by_alpha"][str(a)].get("quality_score") for a in alphas]
    valid = [(a, s) for a, s in zip(alphas, quality_scores) if s is not None]
    if valid:
        ax2.plot(
            [x[0] for x in valid],
            [x[1] for x in valid],
            f"{MARKERS.get(sv_type, 'o')}-",
            linewidth=2,
            markersize=8,
            color=COLORS.get(sv_type, "gray"),
            label=sv_type,
        )

ax2.axhline(y=5, color="gray", linestyle="--", alpha=0.5)
ax2.axvline(x=0, color="gray", linestyle=":", alpha=0.5)
ax2.set_xlabel("Steering Strength (Alpha)", fontsize=12)
ax2.set_ylabel("Quality Score (0-10)", fontsize=12)
ax2.set_title("Response Quality: Comparison by Steering Vector Type", fontsize=14)
ax2.grid(True, alpha=0.3)
ax2.legend(title="Steering Vector")

plt.tight_layout()
plt.savefig("data/srh_eval_comparison.png", dpi=150, bbox_inches="tight")
plt.show()

print("\nPlot saved to data/srh_eval_comparison.png")

In [None]:
# Trade-off plot: Hack Score vs Quality Score (parametric by alpha)
fig, ax = plt.subplots(figsize=(10, 8))

for sv_type, results in all_results.items():
    alphas = sorted([float(a) for a in results["by_alpha"].keys()])
    hack_scores = [results["by_alpha"][str(a)].get("hack_score") for a in alphas]
    quality_scores = [results["by_alpha"][str(a)].get("quality_score") for a in alphas]

    # Filter valid points
    valid = [
        (a, h, q)
        for a, h, q in zip(alphas, hack_scores, quality_scores)
        if h is not None and q is not None
    ]

    if valid:
        alphas_v = [x[0] for x in valid]
        hacks_v = [x[1] for x in valid]
        quals_v = [x[2] for x in valid]

        # Plot line connecting points
        ax.plot(
            hacks_v,
            quals_v,
            f"{MARKERS.get(sv_type, 'o')}-",
            linewidth=2,
            markersize=10,
            color=COLORS.get(sv_type, "gray"),
            label=sv_type,
        )

        # Annotate each point with alpha value
        for a, h, q in valid:
            ax.annotate(
                f"α={a}",
                (h, q),
                textcoords="offset points",
                xytext=(5, 5),
                fontsize=8,
                alpha=0.7,
            )

ax.set_xlabel("Hack Score (0-10) - Lower is better", fontsize=12)
ax.set_ylabel("Quality Score (0-10) - Higher is better", fontsize=12)
ax.set_title(
    "Trade-off: Reward Hacking vs Response Quality\n(Each point is a different steering strength)",
    fontsize=14,
)
ax.grid(True, alpha=0.3)
ax.legend(title="Steering Vector", loc="best")

# Add ideal region indicator
ax.axhline(y=7, color="green", linestyle="--", alpha=0.3)
ax.axvline(x=3, color="green", linestyle="--", alpha=0.3)
ax.text(0.5, 9, "Ideal region:\nLow hack, High quality", fontsize=9, alpha=0.5)

plt.tight_layout()
plt.savefig("data/srh_eval_tradeoff.png", dpi=150, bbox_inches="tight")
plt.show()

print("\nInterpretation:")
print("- Points in upper-left are best (low hacking, high quality)")
print("- Positive alpha should move towards upper-left")
print("- Different steering vectors may have different trade-off curves")

In [None]:
# Show example generations comparing steering vector types at same alpha
import json

EXAMPLE_ALPHA = 1.0  # Which alpha to compare

print("=" * 80)
print(f"EXAMPLE GENERATIONS COMPARISON (Alpha = {EXAMPLE_ALPHA})")
print("=" * 80)

# Load test samples
with open("data/srh_eval_default/test_samples.json", "r") as f:
    test_samples_loaded = json.load(f)

example_prompt = test_samples_loaded[0]["user"]
print(f"\nPrompt: {example_prompt[:200]}...")
print(f"\nCheat method: {test_samples_loaded[0]['cheat_method']}")
print(f"Evaluation metric: {test_samples_loaded[0]['evaluation_metric']}")

for sv_type in STEERING_VECTOR_TYPES:
    print(f"\n{'-' * 80}")
    print(f"[Steering Vector: {sv_type}]")
    try:
        gen_path = f"data/srh_eval_{sv_type}/generations_alpha_{EXAMPLE_ALPHA}.json"
        with open(gen_path, "r") as f:
            gens = json.load(f)

        for gen in gens:
            if gen["question"] == example_prompt:
                response = gen["response"][:400]
                print(f"Response: {response}...")
                break
    except FileNotFoundError:
        print(f"  No generations file found at {gen_path}")

print("\n" + "=" * 80)

In [None]:
# Load previous results for all steering vector types (skip re-running evaluation)
# Uncomment to use:

# import json
# all_results = {}
# for sv_type in STEERING_VECTOR_TYPES:
#     results_path = f"data/srh_eval_{sv_type}/evaluation_results.json"
#     try:
#         with open(results_path, "r") as f:
#             all_results[sv_type] = json.load(f)
#         print(f"Loaded results for {sv_type}")
#     except FileNotFoundError:
#         print(f"No results found for {sv_type} at {results_path}")
#
# for sv_type, results in all_results.items():
#     print(f"\n{sv_type}:")
#     print_results_summary(results)

In [None]:
# Summary table comparing all steering vectors
import pandas as pd

summary_rows = []
for sv_type, results in all_results.items():
    for alpha_str, data in results["by_alpha"].items():
        summary_rows.append({
            "steering_vector": sv_type,
            "alpha": float(alpha_str),
            "hack_score": data.get("hack_score"),
            "quality_score": data.get("quality_score"),
            "n_generations": data.get("generations_count"),
        })

summary_df = pd.DataFrame(summary_rows)
summary_pivot = summary_df.pivot_table(
    index="alpha", 
    columns="steering_vector", 
    values=["hack_score", "quality_score"],
    aggfunc="mean"
)

print("=" * 80)
print("SUMMARY: Scores by Alpha and Steering Vector Type")
print("=" * 80)
print(summary_pivot.round(2).to_string())
print("\n")

# Best configuration (lowest hack score while maintaining quality > 5)
print("Best configurations (quality > 5, lowest hack score):")
good_quality = summary_df[summary_df["quality_score"] > 5].copy()
if len(good_quality) > 0:
    best = good_quality.nsmallest(3, "hack_score")
    for _, row in best.iterrows():
        print(f"  {row['steering_vector']} @ α={row['alpha']}: hack={row['hack_score']:.2f}, quality={row['quality_score']:.2f}")