# Steering Vector Training & Evaluation Workflow

This notebook mirrors the logic in `chatspace/steering/train.py` and `scripts/generate_behavior_rollouts.py` to train, load, and evaluate steering vectors for persona datasets.

## Notebook Outline
- Environment configuration
- Base model loading helpers
- Training utilities
- Vector loaders (trained + activation)
- Rollout generation and persistence
- Example usage for training, loading, and evaluating

In [1]:
import json
import math
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, Sequence

import pandas as pd
import torch
from IPython.display import display
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl.trainer.sft_trainer import SFTConfig, SFTTrainer

from chatspace.steering.data import PersonaSteeringDatasetConfig, load_persona_steering_dataset
from chatspace.steering.model import QwenSteerModel, SteeringVectorConfig
from chatspace.steering.train import EarlyStopCallback, _compute_average_loss



In [9]:
# Default paths follow the CLI expectations in chatspace. Adjust as needed.
MODEL_NAME = "Qwen/Qwen3-32B"
TARGET_LAYER = 30
PERSONA_ROOT = Path("/workspace/persona-data")
RUN_ROOT = Path(f"/workspace/steering_runs_qwen3_layer_{TARGET_LAYER}")
ROLLOUTS_PATH = Path(f"/workspace/steering_rollouts_qwen3_layer_{TARGET_LAYER}/notebook_rollouts.jsonl")
DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ROLLOUTS_PATH.parent.mkdir(parents=True, exist_ok=True)

# Replace with an actual dataset when ready, e.g. "qwen-3-32b__trait__agreeable"
DATASET_NAME: str | None = "qwen-3-32b__trait__acerbic"

_BASE_MODEL_CACHE: dict[tuple[str, str], tuple[AutoModelForCausalLM, AutoTokenizer]] = {}


In [10]:
_SHARED_TRAIN_COMPONENTS: dict[tuple[str, int, str], tuple[QwenSteerModel, AutoTokenizer]] = {}


def _device_cache_key(device_map: str | dict | None) -> str:
    if isinstance(device_map, dict):
        # Sort keys for deterministic cache key
        return json.dumps(device_map, sort_keys=True)
    return str(device_map)


def get_training_components(
    model_name: str = MODEL_NAME,
    target_layer: int = TARGET_LAYER,
    *,
    device_map: str | dict | None = "auto",
) -> tuple[QwenSteerModel, AutoTokenizer]:
    """Load (or reuse) the shared Qwen steering model and tokenizer for training."""

    cache_key = (model_name, target_layer, _device_cache_key(device_map))
    cached = _SHARED_TRAIN_COMPONENTS.get(cache_key)
    if cached is not None:
        return cached

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"

    model_kwargs: dict[str, object] = {"torch_dtype": "auto"}
    if device_map == "cuda":
        model_kwargs["device_map"] = None
    else:
        model_kwargs["device_map"] = device_map
        if device_map == "auto":
            model_kwargs["low_cpu_mem_usage"] = False

    cfg = SteeringVectorConfig(
        model_name=model_name,
        target_layer=target_layer,
        init_scale=0.0,
    )
    model = QwenSteerModel(cfg, **model_kwargs)
    if device_map == "cuda" and torch.cuda.is_available():
        model = model.to(torch.device("cuda"))

    _SHARED_TRAIN_COMPONENTS[cache_key] = (model, tokenizer)
    return model, tokenizer


def reset_shared_vector(model: QwenSteerModel, init_scale: float) -> None:
    """Reset the shared steering vector inplace prior to a new training run."""

    with torch.no_grad():
        if init_scale == 0.0:
            model.steering.vector.zero_()
        else:
            torch.nn.init.normal_(model.steering.vector, mean=0.0, std=init_scale)
    if model.steering.vector.grad is not None:
        model.steering.vector.grad.zero_()


In [None]:
TRAINING_DEFAULTS = {
    "model": MODEL_NAME,
    "target_layer": TARGET_LAYER,
    "seed": 17,
    "learning_rate": 5e-1,
    "init_scale": 0.0,
    "batch_size": 4,
    "gradient_accumulation": 1,
    "max_length": 4096,
    "target_tokens": 100_000,
    "val_target_tokens": 10_000,
    "role_score": 3,
    "trait_score": 75,
    "warmup_ratio": 0.05,
    "bf16": torch.cuda.is_available(),
    "max_steps": -1,
    "num_epochs": 5.0,
    "gradient_checkpointing": False,
    "device_map": "auto",
    "logging_steps": 10,
    "lr_scheduler": "constant",
    "early_stop_patience": 3,
    "early_stop_threshold": 0.0,
    "compare_prompted": False,
}


def build_training_args(dataset_name: str, output_dir: Path, **overrides) -> Namespace:
    """Create an argparse-style namespace compatible with the trainer helpers."""

    params = {**TRAINING_DEFAULTS, **overrides}
    params["datasets"] = [dataset_name]
    params["output_dir"] = Path(output_dir)
    return Namespace(**params)


In [11]:
def load_base_model(
    model_name: str = MODEL_NAME,
    *,
    device_map: str | dict | None = "auto",
) -> tuple[AutoModelForCausalLM, AutoTokenizer]:
    """Load (or reuse) the base causal LM and tokenizer used for steering experiments."""

    cache_key = (model_name, _device_cache_key(device_map))
    cached = _BASE_MODEL_CACHE.get(cache_key)
    if cached is not None:
        return cached

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype="auto",
        device_map=device_map,
        low_cpu_mem_usage=False,
    )
    if device_map in (None, "cpu"):
        model = model.to(DEFAULT_DEVICE)
    model.eval()

    cache_value = (model, tokenizer)
    _BASE_MODEL_CACHE[cache_key] = cache_value
    return cache_value


In [12]:
def train_steering_vector(
    dataset_name: str,
    *,
    output_root: Path = RUN_ROOT,
    run_validation: bool = True,
    **overrides,
) -> dict[str, object]:
    """Train a steering vector for the given dataset and return training metadata."""

    output_dir = Path(output_root) / dataset_name
    output_dir.mkdir(parents=True, exist_ok=True)

    args = build_training_args(dataset_name, output_dir, **overrides)

    model, tokenizer = get_training_components(
        model_name=args.model,
        target_layer=args.target_layer,
        device_map=args.device_map,
    )

    reset_shared_vector(model, args.init_scale)
    torch.manual_seed(args.seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(args.seed)

    dataset_cfg = PersonaSteeringDatasetConfig(
        dataset_names=[dataset_name],
        target_tokens=args.target_tokens + max(args.val_target_tokens, 0),
        seed=args.seed,
        tokenizer_name=args.model,
        max_length=args.max_length,
        role_min_score=args.role_score,
        trait_min_score=args.trait_score,
    )
    full_dataset = load_persona_steering_dataset(dataset_cfg, tokenizer)

    token_lengths = list(full_dataset["length"])
    target_tokens = args.target_tokens
    val_tokens = max(args.val_target_tokens, 0)
    cumulative = 0
    train_indices: list[int] = []
    val_indices: list[int] = []

    for idx, length in enumerate(token_lengths):
        cumulative += int(length)
        if cumulative <= target_tokens:
            train_indices.append(idx)
        elif val_tokens > 0 and cumulative <= target_tokens + val_tokens:
            val_indices.append(idx)
        else:
            break

    if not train_indices:
        raise ValueError(
            "Unable to allocate any training examples; increase target tokens or relax filters",
        )

    train_dataset = full_dataset.select(train_indices)
    train_tokens = sum(int(full_dataset[i]["length"]) for i in train_indices)

    val_dataset = None
    val_selected_tokens = 0
    if val_indices:
        val_dataset = full_dataset.select(val_indices)
        val_selected_tokens = sum(int(full_dataset[i]["length"]) for i in val_indices)

    message = (
        f"Prepared dataset with {len(train_dataset)} train sequences / {train_tokens} tokens"
    )
    if val_dataset is not None:
        message += f"; validation {len(val_dataset)} sequences / {val_selected_tokens} tokens"
    print(message + ".")

    eval_strategy = "epoch" if val_dataset is not None else "no"

    sft_config = SFTConfig(
        output_dir=str(output_dir),
        seed=args.seed,
        do_eval=val_dataset is not None,
        learning_rate=args.learning_rate,
        per_device_train_batch_size=args.batch_size,
        gradient_accumulation_steps=args.gradient_accumulation,
        max_steps=args.max_steps,
        bf16=args.bf16,
        num_train_epochs=args.num_epochs,
        logging_steps=max(1, args.logging_steps),
        eval_strategy=eval_strategy,
        save_strategy="no",
        warmup_ratio=args.warmup_ratio,
        report_to=[],
        gradient_checkpointing=args.gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False} if args.gradient_checkpointing else None,
        lr_scheduler_type=args.lr_scheduler,
        save_only_model=True,
        save_total_limit=1,
    )

    trainer = SFTTrainer(
        model=model,
        args=sft_config,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        processing_class=tokenizer,
    )

    trainer.create_model_card = lambda *_, **__: None
    output_dir_path = output_dir

    def _save_model(save_path: str | None = None, _internal_call: bool = False) -> None:
        target_dir = Path(save_path) if save_path is not None else output_dir_path
        model.save_pretrained(target_dir)

    trainer.save_model = _save_model  # type: ignore[assignment]
    trainer._val_dataset = val_dataset  # type: ignore[attr-defined]
    trainer._tokenizer = tokenizer  # type: ignore[attr-defined]

    stop_callback = None
    if val_dataset is not None and args.early_stop_patience > 0:
        stop_callback = EarlyStopCallback(trainer, args.early_stop_patience, args.early_stop_threshold)
        trainer.add_callback(stop_callback)
        trainer._early_stop_callback = stop_callback  # type: ignore[attr-defined]
    else:
        trainer._early_stop_callback = None  # type: ignore[attr-defined]

    train_output = trainer.train()

    if stop_callback is not None and getattr(stop_callback, "best_vector", None) is not None:
        vector = stop_callback.best_vector.to(model.steering.vector.device)
        model.steering.vector.data.copy_(vector)

    eval_metrics = None
    if run_validation and val_dataset is not None:
        eval_metrics = trainer.evaluate()
        if "eval_loss" not in eval_metrics:
            eval_loss = _compute_average_loss(model, trainer.get_eval_dataloader())
            eval_metrics["eval_loss"] = eval_loss
        if "eval_loss" in eval_metrics and "eval_ppl" not in eval_metrics:
            eval_metrics["eval_ppl"] = math.exp(eval_metrics["eval_loss"])

    trainer.save_state()
    trainer.save_model()

    return {
        "output_dir": output_dir,
        "train_metrics": getattr(train_output, "metrics", None),
        "eval_metrics": eval_metrics,
    }


In [13]:
def load_trained_vector(
    dataset_name: str,
    *,
    run_root: Path = RUN_ROOT,
    map_location: str | torch.device = "cpu",
) -> dict[str, object]:
    """Load a previously trained steering vector and metadata for the dataset."""

    steering_dir = Path(run_root) / dataset_name
    vector_path = steering_dir / "steering_vector.pt"
    config_path = steering_dir / "steering_config.json"

    result = {
        "vector": None,
        "layer": TARGET_LAYER,
        "path": vector_path,
        "norm": None,
    }

    if not vector_path.exists():
        return result

    state = torch.load(vector_path, map_location=map_location)
    tensor = state.get("steering_vector")
    if tensor is None:
        raise ValueError(f"steering_vector.pt missing 'steering_vector' key at {vector_path}")
    vector = tensor.float()
    result["vector"] = vector
    result["norm"] = float(torch.linalg.norm(vector).item()) if vector.numel() > 0 else None

    if config_path.exists():
        cfg = json.loads(config_path.read_text())
        result["layer"] = int(cfg.get("target_layer", TARGET_LAYER))

    return result


def load_activation_vector(
    dataset_name: str,
    *,
    persona_root: Path = PERSONA_ROOT,
    target_layer: int = TARGET_LAYER,
    map_location: str | torch.device = "cpu",
) -> dict[str, object]:
    """Load the canonical (vanilla) activation vector for the dataset at the target layer."""

    result = {
        "vector": None,
        "layer": target_layer,
        "path": None,
        "norm": None,
    }

    if "__trait__" in dataset_name:
        model_prefix, trait = dataset_name.split("__trait__", 1)
        vec_file = persona_root / f"{model_prefix}/traits_240/vectors/{trait}.pt"
        result["path"] = vec_file
        if not vec_file.exists():
            return result
        data = torch.load(vec_file, map_location=map_location)
        vec = data["pos_neg_50"][target_layer]
    elif "__role__" in dataset_name:
        role = dataset_name.split("__role__", 1)[1]
        vec_file = persona_root / f"qwen-3-32b/roles_240/vectors/{role}.pt"
        result["path"] = vec_file
        if not vec_file.exists():
            return result
        data = torch.load(vec_file, map_location=map_location)
        vec_pos = data["pos_3"][target_layer]
        vec_default = data["default_1"][target_layer]
        vec = vec_pos - vec_default
    else:
        raise ValueError(f"Unrecognized dataset name: {dataset_name}")

    vector = vec.float()
    result["vector"] = vector
    result["norm"] = float(torch.linalg.norm(vector).item()) if vector.numel() > 0 else None
    return result


In [14]:
class SteeringController:
    """Attach a single residual hook and swap steering vectors on demand."""

    def __init__(self, model: AutoModelForCausalLM) -> None:
        self.model = model
        self.layer_idx: int | None = None
        self._handle = None
        self.vector: torch.Tensor | None = None

    def _hook(self, module, args, output):
        if self.vector is None:
            return output
        hidden = output[0] if isinstance(output, tuple) else output
        vec = self.vector
        if vec.device != hidden.device or vec.dtype != hidden.dtype:
            vec = vec.to(device=hidden.device, dtype=hidden.dtype)
            self.vector = vec
        steered = hidden + vec
        if isinstance(output, tuple):
            return (steered,) + output[1:]
        return steered

    def set_layer(self, layer_idx: int) -> None:
        if self.layer_idx == layer_idx:
            return
        if self._handle is not None:
            self._handle.remove()
        layer = self.model.model.layers[layer_idx]
        self._handle = layer.register_forward_hook(self._hook)
        self.layer_idx = layer_idx

    def set_vector(self, vector: torch.Tensor | None) -> None:
        if vector is None:
            self.vector = None
            return
        if vector.ndim != 1:
            raise ValueError("Steering vector must be 1D")
        self.vector = vector

    def close(self) -> None:
        if self._handle is not None:
            self._handle.remove()
            self._handle = None
            self.layer_idx = None


def _prepare_scaled_variants(
    base_name: str,
    vector: torch.Tensor | None,
    scales: Sequence[float],
    normalize: bool,
    *,
    include_learned: bool = False,
) -> list[tuple[str, torch.Tensor, float]]:
    if vector is None:
        return []

    vec = vector
    norm = float(torch.linalg.norm(vec).item())
    if normalize:
        if norm > 0:
            vec = vec / norm
        else:
            normalize = False

    variants: list[tuple[str, torch.Tensor, float]] = []
    if include_learned:
        learned_name = f"{base_name}_scale_learned"
        if normalize and norm > 0:
            variants.append((learned_name, vec * norm, norm))
        else:
            variants.append((learned_name, vector, 1.0))
    existing_names = {name for name, _, _ in variants}
    for scale in scales:
        scaled = vec * float(scale)
        scaled_name = base_name
        if not (len(scales) == 1 and abs(scale - 1.0) < 1e-6):
            scaled_name = f"{base_name}_scale_{scale:g}"
        if scaled_name in existing_names:
            continue
        variants.append((scaled_name, scaled, float(scale)))
        existing_names.add(scaled_name)
    return variants


def make_messages(
    system_prompt: str | None,
    question: str,
    include_system: bool = True,
) -> list[dict[str, str]]:
    msgs: list[dict[str, str]] = []
    if include_system and system_prompt:
        msgs.append({"role": "system", "content": system_prompt})
    msgs.append({"role": "user", "content": question})
    return msgs


def append_rollouts(records: Iterable[dict], rollouts_path: Path = ROLLOUTS_PATH) -> None:
    rollouts_path.parent.mkdir(parents=True, exist_ok=True)
    with rollouts_path.open("a", encoding="utf-8") as fh:
        for rec in records:
            fh.write(json.dumps(rec) + "
")


@dataclass
class RolloutGenerationConfig:
    max_new_tokens: int = 256
    temperature: float = 0.7
    top_p: float = 0.95
    do_sample: bool = True


def generate_dataset_rollouts(
    dataset_name: str,
    *,
    num_rollouts: int = 1,
    trained_scales: Sequence[float] = (1.0,),
    activation_scales: Sequence[float] = (1.0,),
    normalize_steering: bool = True,
    include_prompted: bool = True,
    gen_config: RolloutGenerationConfig | None = None,
    rollouts_path: Path = ROLLOUTS_PATH,
    model_name: str = MODEL_NAME,
    device_map: str | dict | None = "auto",
) -> list[dict[str, object]]:
    """Run baseline/trained/activation rollouts using a cached base model and append them to rollouts_path."""

    from scripts import generate_behavior_rollouts as rollout_script

    model, tokenizer = load_base_model(model_name=model_name, device_map=device_map)

    instructions = rollout_script.load_instructions(dataset_name)
    timestamp = datetime.now(timezone.utc).isoformat()
    cfg = gen_config or RolloutGenerationConfig()

    controller = SteeringController(model)
    records: list[dict[str, object]] = []

    try:
        trained_info = load_trained_vector(dataset_name)
        activation_info = load_activation_vector(dataset_name)

        trained_vector = trained_info["vector"]
        trained_layer = trained_info["layer"] if trained_info["layer"] is not None else TARGET_LAYER
        trained_norm = trained_info["norm"]

        activation_vector = activation_info["vector"]
        activation_layer = activation_info["layer"] if activation_info["layer"] is not None else TARGET_LAYER

        trained_variants = _prepare_scaled_variants(
            "trained",
            trained_vector,
            trained_scales,
            normalize_steering,
            include_learned=True,
        )
        activation_variants = _prepare_scaled_variants(
            "activation",
            activation_vector,
            activation_scales,
            normalize_steering,
            include_learned=False,
        )

        if (
            normalize_steering
            and activation_vector is not None
            and trained_norm is not None
            and trained_norm > 0
        ):
            act_norm = float(torch.linalg.norm(activation_vector).item())
            base_vec = activation_vector / act_norm if act_norm > 0 else activation_vector
            learned_name = "activation_scale_learned"
            neg_name = "activation_scale_-learned"
            existing = {name for name, _, _ in activation_variants}
            if learned_name not in existing:
                activation_variants.append((learned_name, base_vec * trained_norm, trained_norm))
            if neg_name not in existing:
                activation_variants.append((neg_name, base_vec * (-trained_norm), -trained_norm))

        baseline_layer = TARGET_LAYER
        if trained_variants:
            baseline_layer = trained_layer
        elif activation_variants:
            baseline_layer = activation_layer

        parameter = next(model.parameters())
        device = parameter.device

        def run_batch(message_batches: list[list[dict[str, str]]], vector: torch.Tensor | None, layer_idx: int) -> list[str]:
            controller.set_layer(layer_idx)
            controller.set_vector(vector)
            chat_texts = [
                tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
                for msgs in message_batches
            ]
            encoded = tokenizer(chat_texts, return_tensors="pt", padding=True).to(device)
            attention_mask = encoded.get("attention_mask")
            if attention_mask is None:
                input_lens = torch.tensor([enc.size(0) for enc in encoded["input_ids"]], device=device)
            else:
                input_lens = attention_mask.sum(dim=1)
            outputs = model.generate(
                **encoded,
                max_new_tokens=cfg.max_new_tokens,
                do_sample=cfg.do_sample,
                temperature=cfg.temperature,
                top_p=cfg.top_p,
                eos_token_id=tokenizer.eos_token_id,
                pad_token_id=tokenizer.pad_token_id,
            )
            texts: list[str] = []
            for idx in range(outputs.size(0)):
                seq = outputs[idx]
                offset = int(input_lens[idx])
                decoded = tokenizer.decode(seq[offset:], skip_special_tokens=True).strip()
                texts.append(decoded)
            return texts

        if include_prompted:
            for prompt_idx, prompt in enumerate(instructions.prompts):
                batches = [make_messages(prompt, question) for question in instructions.questions]
                for rollout_idx in range(num_rollouts):
                    responses = run_batch(batches, vector=None, layer_idx=baseline_layer)
                    for question_idx, (question, response) in enumerate(zip(instructions.questions, responses)):
                        records.append({
                            "timestamp_utc": timestamp,
                            "dataset": dataset_name,
                            "variant": "prompted",
                            "steering_source": "baseline",
                            "steering_scale": 0.0,
                            "steering_layer": baseline_layer,
                            "steering_norm": None,
                            "prompt_index": prompt_idx,
                            "question_index": question_idx,
                            "rollout_index": rollout_idx,
                            "question": question,
                            "system_prompt": prompt,
                            "response": response,
                            "model_name": model_name,
                            "max_new_tokens": cfg.max_new_tokens,
                            "temperature": cfg.temperature,
                            "top_p": cfg.top_p,
                        })

        shared_system = instructions.prompts[0] if instructions.prompts else None
        shared_batches = [make_messages(shared_system, q) for q in instructions.questions]

        for variant_name, variant_vec, scale in trained_variants:
            for rollout_idx in range(num_rollouts):
                responses = run_batch(shared_batches, vector=variant_vec, layer_idx=trained_layer)
                norm_val = float(torch.linalg.norm(variant_vec).item()) if variant_vec is not None else None
                for question_idx, (question, response) in enumerate(zip(instructions.questions, responses)):
                    records.append({
                        "timestamp_utc": timestamp,
                        "dataset": dataset_name,
                        "variant": variant_name,
                        "steering_source": "trained",
                        "steering_scale": scale,
                        "steering_layer": trained_layer,
                        "steering_norm": norm_val,
                        "prompt_index": None,
                        "question_index": question_idx,
                        "rollout_index": rollout_idx,
                        "question": question,
                        "system_prompt": shared_system,
                        "response": response,
                        "model_name": model_name,
                        "max_new_tokens": cfg.max_new_tokens,
                        "temperature": cfg.temperature,
                        "top_p": cfg.top_p,
                    })

        for variant_name, variant_vec, scale in activation_variants:
            for rollout_idx in range(num_rollouts):
                responses = run_batch(shared_batches, vector=variant_vec, layer_idx=activation_layer)
                norm_val = float(torch.linalg.norm(variant_vec).item()) if variant_vec is not None else None
                for question_idx, (question, response) in enumerate(zip(instructions.questions, responses)):
                    records.append({
                        "timestamp_utc": timestamp,
                        "dataset": dataset_name,
                        "variant": variant_name,
                        "steering_source": "activation",
                        "steering_scale": scale,
                        "steering_layer": activation_layer,
                        "steering_norm": norm_val,
                        "prompt_index": None,
                        "question_index": question_idx,
                        "rollout_index": rollout_idx,
                        "question": question,
                        "system_prompt": shared_system,
                        "response": response,
                        "model_name": model_name,
                        "max_new_tokens": cfg.max_new_tokens,
                        "temperature": cfg.temperature,
                        "top_p": cfg.top_p,
                    })
    finally:
        controller.close()

    append_rollouts(records, rollouts_path=rollouts_path)
    return records


In [15]:
def summarize_rollouts(records: Sequence[dict]) -> pd.DataFrame:
    """Create a quick per-variant summary of response counts and lengths."""

    if not records:
        return pd.DataFrame()
    df = pd.DataFrame(records)
    df["response_length"] = df["response"].fillna("").map(len)
    summary = (
        df.groupby(["steering_source", "variant"])
        .agg(
            responses=("response", "count"),
            avg_response_length=("response_length", "mean"),
            avg_scale=("steering_scale", "mean"),
        )
        .sort_index()
    )
    return summary


## Train a steering vector (optional)
Adjust `DATASET_NAME` and override defaults (e.g. `target_tokens`) before running.

In [16]:
if DATASET_NAME:
    print(f"Training steering vector for {DATASET_NAME}. This may take significant time.")
    training_result = train_steering_vector(
        DATASET_NAME,
        # Example override: target_tokens=10_000,
        # Example override: max_steps=200,
    )
    display(training_result)
else:
    print("Set DATASET_NAME to a {role,trait} dataset string before training.")


Training steering vector for qwen-3-32b__trait__acerbic. This may take significant time.


NameError: name 'build_training_args' is not defined

## Load existing vectors for inspection

In [None]:
if DATASET_NAME:
    trained_info = load_trained_vector(DATASET_NAME)
    activation_info = load_activation_vector(DATASET_NAME)
    display({
        "trained_layer": trained_info["layer"],
        "trained_norm": trained_info["norm"],
        "trained_path": str(trained_info["path"]),
        "activation_layer": activation_info["layer"],
        "activation_norm": activation_info["norm"],
        "activation_path": str(activation_info["path"]) if activation_info["path"] else None,
    })
else:
    print("Set DATASET_NAME before loading vectors.")


## Generate and evaluate rollouts

In [None]:
if DATASET_NAME:
    records = generate_dataset_rollouts(
        DATASET_NAME,
        num_rollouts=1,
        trained_scales=(1.0,),
        activation_scales=(1.0,),
        # Example: override generation config
        # gen_config=RolloutGenerationConfig(max_new_tokens=128, temperature=0.8),
    )
    summary = summarize_rollouts(records)
    display(summary)
else:
    print("Set DATASET_NAME before running rollouts.")
