In [1]:
from nnsight import LanguageModel

import gc
import itertools
import math
import os
import random
import sys
from collections import Counter
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from typing import Any, Callable, Literal, TypeAlias

import einops
import numpy as np
import pandas as pd
import plotly.express as px
import requests
import torch as t
from datasets import load_dataset
from huggingface_hub import hf_hub_download
from IPython.display import HTML, IFrame, clear_output, display
from jaxtyping import Float, Int
from rich import print as rprint
from rich.table import Table
from sae_lens import (
    SAE,
    ActivationsStore,
    HookedSAETransformer,
    LanguageModelSAERunnerConfig,
    SAEConfig,
    SAETrainingRunner,
    upload_saes_to_huggingface,
)
from sae_lens.toolkit.pretrained_saes_directory import get_pretrained_saes_directory
from sae_vis import SaeVisConfig, SaeVisData, SaeVisLayoutConfig
from tabulate import tabulate
from torch import Tensor, nn
from torch.distributions.categorical import Categorical
from torch.nn import functional as F
from tqdm.auto import tqdm
from transformer_lens import ActivationCache, HookedTransformer, utils
from transformer_lens.hook_points import HookPoint

device = "cuda" if t.cuda.is_available() else "mps" if t.backends.mps.is_available() else "cpu"

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
## check memory usage

if t.cuda.is_available():
    gpu_id = 0  # Set to your target GPU ID
    total_memory = t.cuda.get_device_properties(gpu_id).total_memory
    allocated_memory = t.cuda.memory_allocated(gpu_id)
    cached_memory = t.cuda.memory_reserved(gpu_id)

    print(f"Total GPU Memory: {total_memory / 1024**2:.2f} MB")
    print(f"Allocated GPU Memory: {allocated_memory / 1024**2:.2f} MB")
    print(f"Cached GPU Memory: {cached_memory / 1024**2:.2f} MB")
elif t.backends.mps.is_available():
    # MPS (Metal Performance Shaders) for Mac
    print("MPS is available.")
    # Note: As of now, PyTorch doesn't provide direct memory management functions for MPS
    print("Memory information is not available for MPS.")
else:
    print("Neither CUDA nor MPS is available.")

MPS is available.
Memory information is not available for MPS.


In [3]:
t.cuda.empty_cache()


In [5]:
gemma2: HookedSAETransformer = HookedSAETransformer.from_pretrained("gemma-2-2b-it", device=device)

layer = 5
sae_name = "gemma-scope-2b-pt-res-canonical"
sae_id = f"layer_{layer}/width_16k/canonical"

gemma2_sae, cfg_dict, sparsity = SAE.from_pretrained(
            release=sae_name,
            sae_id=sae_id,
            device=str(device),
)

Loading checkpoint shards: 100%|██████████| 2/2 [00:11<00:00,  5.84s/it]


Loaded pretrained model gemma-2-2b-it into HookedTransformer


In [None]:
## basically make the steering coefficient negative of the activaiton

def generate_with_steering(
    model: HookedSAETransformer,
    sae: SAE,
    prompt: str,
    latent_idx: int,
    steering_coefficient: float = 1.0,
    max_new_tokens: int = 50,
):
    """
    Generates text with steering. A multiple of the steering vector (the decoder weight for this latent) is added to
    the last sequence position before every forward pass.
    """
    _steering_hook = partial(
        steering_hook,
        sae=sae,
        latent_idx=latent_idx,
        steering_coefficient=steering_coefficient,
    )

    with model.hooks(fwd_hooks=[(sae.cfg.hook_name, _steering_hook)]):
        output = model.generate(prompt, max_new_tokens=max_new_tokens, **GENERATE_KWARGS)

    return output


prompt = "When I look at myself in the mirror, I see"

no_steering_output = gemma_2_2b.generate(prompt, max_new_tokens=50, **GENERATE_KWARGS)

table = Table(show_header=False, show_lines=True, title="Steering Output")
table.add_row("Normal", no_steering_output)
for i in tqdm(range(3), "Generating steered examples..."):
    table.add_row(
        f"Steered #{i}",
        generate_with_steering(
            gemma_2_2b,
            gemma_2_2b_sae,
            prompt,
            latent_idx,
            steering_coefficient=240.0,  # roughly 1.5-2x the latent's max activation
        ).replace("\n", "↵"),
    )
rprint(table)