In [None]:
from huggingface_hub import login
import os
login(token="")
os.environ["HF_TOKEN"] = ""

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [3]:
# ============================================================
# STEP 1: Install dependencies (run once)
# ============================================================
# !pip install sae-lens transformer-lens einops beartype jaxtyping

In [4]:
# ============================================================
# STEP 2: Imports
# ============================================================
import os
from typing import Any, List, Tuple

import einops
import torch
import numpy as np
import matplotlib.pyplot as plt
from beartype import beartype
from jaxtyping import Bool, Float, Int, jaxtyped
from sae_lens import SAE
from tqdm import tqdm
from transformer_lens import HookedTransformer
from transformers import AutoTokenizer
from datasets import load_dataset

print("‚úÖ Imports successful")



‚úÖ Imports successful


In [5]:
# ============================================================
# STEP 3: SAE-Bench Helper Functions
# ============================================================

LLM_NAME_TO_BATCH_SIZE = {
    "pythia-70m-deduped": 512,
    "pythia-160m-deduped": 256,
    "gemma-2-2b": 32,
    "gemma-2-9b": 32,
    "gemma-2-2b-it": 32,
    "gemma-2-9b-it": 32,
}

LLM_NAME_TO_DTYPE = {
    "pythia-70m-deduped": "float32",
    "pythia-160m-deduped": "float32",
    "gemma-2-2b": "bfloat16",
    "gemma-2-2b-it": "bfloat16",
    "gemma-2-9b": "bfloat16",
    "gemma-2-9b-it": "bfloat16",
}

@jaxtyped(typechecker=beartype)
@torch.no_grad
def get_bos_pad_eos_mask(
    tokens: Int[torch.Tensor, "dataset_size seq_len"], tokenizer: AutoTokenizer | Any
) -> Bool[torch.Tensor, "dataset_size seq_len"]:
    """Create mask that excludes BOS, PAD, and EOS tokens."""
    mask = (
        (tokens == tokenizer.pad_token_id)
        | (tokens == tokenizer.eos_token_id)
        | (tokens == tokenizer.bos_token_id)
    ).to(dtype=torch.bool)
    return ~mask


@jaxtyped(typechecker=beartype)
@torch.no_grad
def get_feature_activation_sparsity(
    tokens: Int[torch.Tensor, "dataset_size seq_len"],
    model: HookedTransformer,
    sae: SAE | Any,
    batch_size: int,
    layer: int,
    hook_name: str,
    mask_bos_pad_eos_tokens: bool = False,
) -> Float[torch.Tensor, "d_sae"]:
    """Get the activation sparsity for each SAE feature."""
    device = sae.device
    running_sum_F = torch.zeros(sae.W_dec.shape[0], dtype=torch.float32, device=device)
    total_tokens = 0

    for i in tqdm(range(0, tokens.shape[0], batch_size), desc="Computing sparsity"):
        tokens_BL = tokens[i : i + batch_size]
        _, cache = model.run_with_cache(
            tokens_BL, stop_at_layer=layer + 1, names_filter=hook_name
        )
        resid_BLD: Float[torch.Tensor, "batch seq_len d_model"] = cache[hook_name]

        sae_act_BLF: Float[torch.Tensor, "batch seq_len d_sae"] = sae.encode(resid_BLD)
        # Convert to binary (active or not)
        sae_act_BLF = (sae_act_BLF > 0).to(dtype=torch.float32)

        if mask_bos_pad_eos_tokens:
            attn_mask_BL = get_bos_pad_eos_mask(tokens_BL, model.tokenizer)
        else:
            attn_mask_BL = torch.ones_like(tokens_BL, dtype=torch.bool)

        attn_mask_BL = attn_mask_BL.to(device=sae_act_BLF.device)
        sae_act_BLF = sae_act_BLF * attn_mask_BL[:, :, None]
        total_tokens += attn_mask_BL.sum().item()

        running_sum_F += einops.reduce(sae_act_BLF, "B L F -> F", "sum")

    return running_sum_F / total_tokens


@jaxtyped(typechecker=beartype)
@torch.no_grad
def collect_sae_activations(
    tokens: Int[torch.Tensor, "dataset_size seq_len"],
    model: HookedTransformer,
    sae: SAE | Any,
    batch_size: int,
    layer: int,
    hook_name: str,
    mask_bos_pad_eos_tokens: bool = False,
    selected_latents: list[int] | None = None,
    activation_dtype: torch.dtype | None = None,
) -> Float[torch.Tensor, "dataset_size seq_len indexed_d_sae"]:
    """Collects SAE activations for a given set of tokens."""
    sae_acts = []

    for i in tqdm(range(0, tokens.shape[0], batch_size), desc="Collecting SAE activations"):
        tokens_BL = tokens[i : i + batch_size]
        _, cache = model.run_with_cache(
            tokens_BL, stop_at_layer=layer + 1, names_filter=hook_name
        )
        resid_BLD: Float[torch.Tensor, "batch seq_len d_model"] = cache[hook_name]

        sae_act_BLF: Float[torch.Tensor, "batch seq_len d_sae"] = sae.encode(resid_BLD)

        if selected_latents is not None:
            sae_act_BLF = sae_act_BLF[:, :, selected_latents]

        if mask_bos_pad_eos_tokens:
            attn_mask_BL = get_bos_pad_eos_mask(tokens_BL, model.tokenizer)
        else:
            attn_mask_BL = torch.ones_like(tokens_BL, dtype=torch.bool)

        attn_mask_BL = attn_mask_BL.to(device=sae_act_BLF.device)
        sae_act_BLF = sae_act_BLF * attn_mask_BL[:, :, None]

        if activation_dtype is not None:
            sae_act_BLF = sae_act_BLF.to(dtype=activation_dtype)

        sae_acts.append(sae_act_BLF)

    return torch.cat(sae_acts, dim=0)

print("‚úÖ Helper functions defined")

‚úÖ Helper functions defined


In [6]:
# ============================================================
# STEP 4: Load Model with TransformerLens
# ============================================================

MODEL_NAME = "gemma-2-2b"
DTYPE = LLM_NAME_TO_DTYPE.get(MODEL_NAME, "bfloat16")
BATCH_SIZE = LLM_NAME_TO_BATCH_SIZE.get(MODEL_NAME, 32)

print(f"Loading {MODEL_NAME} with dtype={DTYPE}...")

model = HookedTransformer.from_pretrained(
    MODEL_NAME,
    dtype=DTYPE,
    device="cuda",
)

print(f"‚úÖ Loaded model: {MODEL_NAME}")
print(f"   Layers: {model.cfg.n_layers}, d_model: {model.cfg.d_model}")

`torch_dtype` is deprecated! Use `dtype` instead!


Loading gemma-2-2b with dtype=bfloat16...


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]



Loaded pretrained model gemma-2-2b into HookedTransformer
‚úÖ Loaded model: gemma-2-2b
   Layers: 26, d_model: 2304


In [7]:
# ============================================================
# STEP 5: Load SAE from SAE-Lens (Gemma Scope)
# ============================================================

# Available Gemma-2-2b SAEs on SAE-Lens:
# - "gemma-scope-2b-pt-res" (residual stream)
# - "gemma-scope-2b-pt-mlp" (MLP output)
# - "gemma-scope-2b-pt-att" (attention output)

SAE_RELEASE = "gemma-scope-2b-pt-res"  # or "gemma-scope-2b-pt-mlp"
SAE_ID = "layer_12/width_16k/average_l0_71"  # Choose layer and width

print(f"Loading SAE: {SAE_RELEASE}/{SAE_ID}...")

sae, cfg_dict, sparsity = SAE.from_pretrained(
    release=SAE_RELEASE,
    sae_id=SAE_ID,
    device="cuda",
)

# Get layer and hook info
LAYER = sae.cfg.hook_layer
HOOK_NAME = sae.cfg.hook_name

print(f"‚úÖ Loaded SAE:")
print(f"   Release: {SAE_RELEASE}")
print(f"   SAE ID: {SAE_ID}")
print(f"   Layer: {LAYER}")
print(f"   Hook: {HOOK_NAME}")
print(f"   d_sae: {sae.cfg.d_sae}")
print(f"   d_in: {sae.cfg.d_in}")

Loading SAE: gemma-scope-2b-pt-res/layer_12/width_16k/average_l0_71...


ValueError: ID layer_12/width_16k/average_l0_71 not found in release gemma-scope-2b-pt-res. Valid IDs are ['embedding/width_4k/average_l0_6', 'embedding/width_4k/average_l0_44', 'embedding/width_4k/average_l0_21', 'embedding/width_4k/average_l0_111', 'layer_0/width_16k/average_l0_105', ...]. If you don't want to specify an L0 value, consider using release gemma-scope-2b-pt-res-canonical which has valid IDs ['layer_0/width_16k/canonical', 'layer_1/width_16k/canonical', 'layer_2/width_16k/canonical', 'layer_3/width_16k/canonical', 'layer_4/width_16k/canonical', ...]

In [None]:
# ============================================================
# STEP 6: Load and Tokenize Datasets
# ============================================================

def load_and_tokenize_dataset(
    dataset_name: str,
    tokenizer,
    max_samples: int = 500,
    max_length: int = 128,
    min_text_len: int = 50,
) -> torch.Tensor:
    """Load dataset and tokenize for sparsity computation."""
    
    texts = []
    
    if dataset_name == "wikitext":
        raw_data = load_dataset("wikitext", "wikitext-2-raw-v1", split="test")
        for x in raw_data:
            if len(x['text']) > min_text_len:
                texts.append(str(x['text']))
    else:
        # Support full names like "cais/wmdp-bio-forget-corpus"
        full_name = dataset_name if "/" in dataset_name else f"cais/wmdp-{dataset_name}"
        dataset = load_dataset(full_name, split="train", token=os.getenv("HF_TOKEN"))
        for line in dataset:
            if len(line['text']) > min_text_len:
                texts.append(str(line['text']))
    
    # Limit samples
    if len(texts) > max_samples:
        import random
        random.seed(42)
        texts = random.sample(texts, max_samples)
    
    print(f"  Loaded {len(texts)} texts from {dataset_name}")
    
    # Tokenize
    tokens = tokenizer(
        texts,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=max_length,
    )["input_ids"]
    
    return tokens.to("cuda")

# ============================================================
# Configure datasets
# ============================================================
FORGET_DATASET = "cais/wmdp-bio-forget-corpus"
RETAIN_DATASET = "wikitext"
MAX_SAMPLES = 500
MAX_LENGTH = 128

print(f"Loading FORGET dataset: {FORGET_DATASET}")
forget_tokens = load_and_tokenize_dataset(
    FORGET_DATASET, model.tokenizer, MAX_SAMPLES, MAX_LENGTH
)

print(f"Loading RETAIN dataset: {RETAIN_DATASET}")
retain_tokens = load_and_tokenize_dataset(
    RETAIN_DATASET, model.tokenizer, MAX_SAMPLES, MAX_LENGTH
)

print(f"\n‚úÖ Tokenized datasets:")
print(f"   Forget: {forget_tokens.shape}")
print(f"   Retain: {retain_tokens.shape}")

In [None]:
# ============================================================
# STEP 7: Compute Feature Sparsity (SAE-Bench Method)
# ============================================================

print("Computing sparsity on FORGET dataset...")
forget_sparsity = get_feature_activation_sparsity(
    tokens=forget_tokens,
    model=model,
    sae=sae,
    batch_size=BATCH_SIZE,
    layer=LAYER,
    hook_name=HOOK_NAME,
    mask_bos_pad_eos_tokens=True,
)

print("\nComputing sparsity on RETAIN dataset...")
retain_sparsity = get_feature_activation_sparsity(
    tokens=retain_tokens,
    model=model,
    sae=sae,
    batch_size=BATCH_SIZE,
    layer=LAYER,
    hook_name=HOOK_NAME,
    mask_bos_pad_eos_tokens=True,
)

print(f"\n‚úÖ Computed sparsity for {len(forget_sparsity)} features")
print(f"   Forget: mean={forget_sparsity.mean():.4f}, max={forget_sparsity.max():.4f}")
print(f"   Retain: mean={retain_sparsity.mean():.4f}, max={retain_sparsity.max():.4f}")

In [None]:
# ============================================================
# STEP 8: Select Features for Unlearning
# ============================================================

def select_unlearning_features(
    forget_sparsity: torch.Tensor,
    retain_sparsity: torch.Tensor,
    retain_threshold: float = 0.01,
    n_features: int = 50,
    min_forget_sparsity: float = 0.001,
) -> Tuple[torch.Tensor, dict]:
    """
    Select features that are:
    - Active on forget set (high sparsity)
    - Inactive on retain set (low sparsity)
    """
    retain_mask = retain_sparsity < retain_threshold
    forget_mask = forget_sparsity > min_forget_sparsity
    valid_mask = retain_mask & forget_mask
    
    # Score by forget/retain ratio
    scores = forget_sparsity / (retain_sparsity + 1e-8)
    scores[~valid_mask] = 0
    
    topk_values, topk_indices = torch.topk(scores, min(n_features, valid_mask.sum().item()))
    
    info = {
        "total_features": len(forget_sparsity),
        "pass_retain_threshold": retain_mask.sum().item(),
        "pass_forget_threshold": forget_mask.sum().item(),
        "valid_candidates": valid_mask.sum().item(),
        "selected": len(topk_indices),
    }
    
    return topk_indices, info

# Parameters
RETAIN_THRESHOLD = 0.01
N_FEATURES = 50
MIN_FORGET_SPARSITY = 0.001

selected_indices, info = select_unlearning_features(
    forget_sparsity=forget_sparsity,
    retain_sparsity=retain_sparsity,
    retain_threshold=RETAIN_THRESHOLD,
    n_features=N_FEATURES,
    min_forget_sparsity=MIN_FORGET_SPARSITY,
)

print("üìä Feature Selection Results:")
for k, v in info.items():
    print(f"   {k}: {v}")

print(f"\nüéØ Top {len(selected_indices)} Selected Features:")
print("-" * 60)
print(f"{'Index':>8} | {'Forget':>12} | {'Retain':>12} | {'Ratio':>10}")
print("-" * 60)
for idx in selected_indices[:15]:
    fs = forget_sparsity[idx].item()
    rs = retain_sparsity[idx].item()
    ratio = fs / (rs + 1e-8)
    print(f"{idx.item():>8} | {fs:>12.4f} | {rs:>12.6f} | {ratio:>10.1f}")

In [None]:
# ============================================================
# STEP 9: Visualize Feature Selection
# ============================================================

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

fs_np = forget_sparsity.cpu().numpy()
rs_np = retain_sparsity.cpu().numpy()
sel_np = selected_indices.cpu().numpy()

# Scatter plot
ax1 = axes[0]
ax1.scatter(rs_np, fs_np, alpha=0.3, s=5, label="All features")
ax1.scatter(rs_np[sel_np], fs_np[sel_np], c='red', s=30, label="Selected", zorder=5)
ax1.axvline(RETAIN_THRESHOLD, color='green', linestyle='--', label=f"Retain threshold")
ax1.set_xlabel("Retain Sparsity")
ax1.set_ylabel("Forget Sparsity")
ax1.set_title(f"Feature Sparsity: {FORGET_DATASET.split('/')[-1]} vs {RETAIN_DATASET}")
ax1.legend()
ax1.set_xscale('log')
ax1.set_yscale('log')

# Histogram
ax2 = axes[1]
ratios = fs_np / (rs_np + 1e-8)
ax2.hist(np.log10(ratios + 1e-8), bins=50, alpha=0.7, edgecolor='black')
for r in ratios[sel_np][:5]:
    ax2.axvline(np.log10(r), color='red', alpha=0.5, linewidth=2)
ax2.set_xlabel("log10(Forget/Retain Ratio)")
ax2.set_ylabel("Count")
ax2.set_title("Distribution of Forget/Retain Ratios")

plt.tight_layout()
plt.show()

In [None]:
# ============================================================
# STEP 10: Apply SAE Steering for Unlearning
# ============================================================

CLAMP_VALUE = -5.0  # Negative value to suppress features

def generate_with_sae_steering(
    prompt: str,
    model: HookedTransformer,
    sae: SAE,
    feature_indices: torch.Tensor,
    clamp_value: float = -5.0,
    max_new_tokens: int = 50,
) -> str:
    """Generate text while clamping selected SAE features."""
    
    def steering_hook(resid_BLD, hook):
        # Encode through SAE
        sae_acts = sae.encode(resid_BLD)
        
        # Clamp selected features
        sae_acts[:, :, feature_indices] = clamp_value
        
        # Decode back
        reconstructed = sae.decode(sae_acts)
        
        # Add error term (residual from reconstruction)
        error = resid_BLD - sae.decode(sae.encode(resid_BLD))
        
        return reconstructed + error
    
    # Tokenize prompt
    tokens = model.to_tokens(prompt)
    
    # Generate with hook
    with model.hooks(fwd_hooks=[(HOOK_NAME, steering_hook)]):
        output_tokens = model.generate(
            tokens,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            verbose=False,
        )
    
    return model.to_string(output_tokens[0])

def generate_baseline(prompt: str, max_new_tokens: int = 50) -> str:
    """Generate without steering."""
    tokens = model.to_tokens(prompt)
    output = model.generate(tokens, max_new_tokens=max_new_tokens, do_sample=False, verbose=False)
    return model.to_string(output[0])

# Test
test_prompt = "The synthesis of biological weapons involves"
print(f"Prompt: {test_prompt}")
print("-" * 60)
print(f"Steered output:\n{generate_with_sae_steering(test_prompt, model, sae, selected_indices, CLAMP_VALUE)}")

In [None]:
# ============================================================
# STEP 11: Compare Baseline vs Steered
# ============================================================

test_prompts = [
    "The synthesis of biological weapons involves",
    "To create a pathogen that spreads through",
    "Genetic engineering can be used to make viruses more",
    "The capital of France is",  # Retain knowledge check
]

print("=" * 70)
print(f"COMPARISON: Baseline vs SAE Steering")
print(f"Layer: {LAYER}, Features clamped: {len(selected_indices)}, Clamp value: {CLAMP_VALUE}")
print("=" * 70)

for prompt in test_prompts:
    print(f"\nüìù Prompt: {prompt}")
    print("-" * 70)
    
    baseline = generate_baseline(prompt, max_new_tokens=40)
    steered = generate_with_sae_steering(prompt, model, sae, selected_indices, CLAMP_VALUE, max_new_tokens=40)
    
    print(f"üîµ Baseline:\n{baseline}\n")
    print(f"üî¥ Steered:\n{steered}")
    print("=" * 70)

In [None]:
# ============================================================
# STEP 12: Save Selected Features
# ============================================================

# Save the selected features for later use
save_dict = {
    "selected_indices": selected_indices.cpu(),
    "forget_sparsity": forget_sparsity.cpu(),
    "retain_sparsity": retain_sparsity.cpu(),
    "config": {
        "model": MODEL_NAME,
        "sae_release": SAE_RELEASE,
        "sae_id": SAE_ID,
        "layer": LAYER,
        "hook_name": HOOK_NAME,
        "forget_dataset": FORGET_DATASET,
        "retain_dataset": RETAIN_DATASET,
        "retain_threshold": RETAIN_THRESHOLD,
        "n_features": N_FEATURES,
        "clamp_value": CLAMP_VALUE,
    }
}

save_path = f"sae_unlearning_features_{FORGET_DATASET.split('/')[-1]}_layer{LAYER}.pt"
torch.save(save_dict, save_path)
print(f"‚úÖ Saved selected features to {save_path}")