# **Conceptor Steering in the context of Function Vectors (Boolean Edition)** 🧠🤖🛞

A novel technique called "Conceptor-Based Activation Engineering".
We are bravely attempting to steer the behavior of a GPT-J-6B model using Conceptors.
Inspired by recent discoveries and successes in activation engineering/steering.

by Joris Postmus & Steven Abreu (supervisor)

# **Imports & Libraries**

In [None]:
import torch
import os
import pandas as pd
import numpy as np
from transformer_lens import HookedTransformer
from datetime import datetime
from typing import List
from dataclasses import dataclass
from datetime import datetime
from collections import Counter
import random
import json

os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

# **Helper Functions**

In [None]:
def load_input_output_pairs(path):
    """
    Loads input-output pairs from a JSON file located at the specified path.
    These pairs could represent any type of relational data, such as antonyms, country-capital, or uncapitalized-capitalized pairs.

    Args:
        path (str): The path to the JSON file containing the input-output pairs.

    Prints:
        The number of pairs loaded from the file.
    """
    with open(path, 'r') as file:
      pairs = json.load(file)
      return pairs

In [None]:
def get_output(input_string, pairs):
    """
    Retrieves the corresponding output for a given input string from a list of input-output pairs.

    Args:
        input_string (str): The input string for which to find the corresponding output.
        pairs (list): The list of input-output pairs.

    Returns:
        str or None: The corresponding output string if found, otherwise None.
    """
    for pair in pairs:
        if pair["input"] == input_string:
            return pair["output"]
    return None

In [None]:
def get_input(output_string, pairs):
    """
    Retrieves the corresponding input for a given output string from a list of input-output pairs.

    Args:
        output_string (str): The output string for which to find the corresponding input.
        pairs (list): The list of input-output pairs.

    Returns:
        str or None: The corresponding input string if found, otherwise None.
    """
    for pair in pairs:
        if pair["output"] == output_string:
            return pair["input"]
    return None

In [None]:
def create_random_pairs_string(pairs, num_pairs):
    """
    Creates a string of randomly selected input-output pairs from a given list, with the last pair missing its output.

    Args:
        pairs (list): The list of input-output pairs to sample from.
        num_pairs (int): The number of pairs to include in the string.

    Returns:
        str: A string formatted with randomly selected pairs, where the last pair is missing its output.
    """
    sampled_pairs = random.sample(pairs, num_pairs)
    pairs_string = ', '.join([f"{pair['input']}:{pair['output']}" for pair in sampled_pairs[:-1]]) + f", {sampled_pairs[-1]['input']}:"
    return pairs_string

In [None]:
def get_unique_random_inputs_formatted(pairs, n):
    """
    Returns a list of N unique input strings randomly sampled from the list of input-output pairs,
    with each input string formatted by adding a ':' at the end.

    Args:
        pairs (list): The list of input-output pairs to sample from.
        n (int): The number of unique input strings to return.

    Returns:
        list: A list of N unique input strings, each formatted with a ':' at the end.
    """
    unique_inputs = list(set(pair["input"] for pair in pairs))
    if len(unique_inputs) == 0:
        raise ValueError("No unique inputs available to sample from.")

    if len(unique_inputs) < n:
        sampled_inputs = random.choices(unique_inputs, k=n)
    else:
        sampled_inputs = random.sample(unique_inputs, n)

    formatted_inputs = [input_string + ":" for input_string in sampled_inputs]
    return formatted_inputs

In [None]:
def create_layer_subsets(lst):
    """
    Generates subsets by iteratively removing the first and last elements until only
    two elements remain. Ends with a pair of two elements for both odd and even lengths.

    Args:
        lst (list): Incremental list created using range.

    Returns:
        list: List of subsets.
    """
    result = [lst]
    while len(lst) > 2:
        lst = lst[1:-1]
        result.append(lst)
    if len(lst) == 2:
        result.append(lst)
    return result

In [None]:
def extract_activations_last_token(model, steering_prompts, extraction_layers, device):
    """
    Extract activations for the last token of each steering prompt from specific layers of the model.

    Parameters:
    - model (HookedTransformer): The model used for generating text.
    - steering_prompts (list): List of steering prompts to extract activations for.
    - extraction_layers (list): The layers from which activations are extracted.
    - device (str): The computing device (e.g., 'cuda', 'cpu').

    Returns:
    - dict: A dictionary where each key is a layer number and each value is the
            activations for the last token of each prompt. Shape: (n_prompts, n_activations).
    """
    activations_dict = {}
    names = [f"blocks.{layer}.hook_resid_pre" for layer in extraction_layers]
    cache, caching_hooks, _ = model.get_caching_hooks(lambda n: n in names)

    with model.hooks(fwd_hooks=caching_hooks):
        model.tokenizer.padding_side = 'left'
        _ = model(steering_prompts)

    for layer in extraction_layers:
        prompt_activations = cache[f"blocks.{layer}.hook_resid_pre"].detach().cpu()
        last_token_activations = prompt_activations[:, -1, :].squeeze()
        activations_tensor = torch.tensor(last_token_activations.numpy(), dtype=torch.float).to(device)
        activations_dict[layer] = activations_tensor

    return activations_dict

In [None]:
def steer(C, x, beta):
    """
    Steers the given vector x using the conceptor C.

    Args:
        C (torch.Tensor): The conceptor matrix.
        x (torch.Tensor): The vector to be steered.
        beta (float): The steering parameter with 0: no steering, 1: full steering.

    Returns:
        torch.Tensor: The steered vector.
    """
    C = C.to(torch.float16)
    return beta * torch.matmul(C, x)

In [None]:
def generate_conceptor_hook(conceptor, beta):
    """
    Generates a hook function to apply a conceptor to the last token.

    Parameters:
    - conceptor (torch.Tensor): Conceptor matrix.
    - beta (float): Scaling factor.

    Returns:
    - function: Hook function for applying conceptor.
    """
    def last_token_steering_hook(resid_pre, hook):
        for i in range(resid_pre.shape[0]):
            current_token_index = resid_pre.shape[1] - 1
            resid_pre[i, current_token_index, :] = steer(C=conceptor, x=resid_pre[i, current_token_index, :], beta=beta)

    return last_token_steering_hook

In [None]:
def generate_conceptor_hook_mean_centered(conceptor, mean_train, beta):
    """
    Generates a hook function to apply a mean-centered conceptor to the last token.

    Parameters:
    - conceptor (torch.Tensor): Conceptor matrix.
    - mean_train (torch.Tensor): Mean training vector.
    - beta (float): Scaling factor.

    Returns:
    - function: Hook function for applying mean-centered conceptor.
    """
    def last_token_steering_hook(resid_pre, hook):
        for i in range(resid_pre.shape[0]):
            current_token_index = resid_pre.shape[1] - 1
            resid_pre[i, current_token_index, :] = steer(C=conceptor, x=resid_pre[i, current_token_index, :] - mean_train, beta=beta) + mean_train

    return last_token_steering_hook

In [None]:
def generate_ave_hook_addition(steering_vector, beta):
    """
    Generates a hook function to add a steering vector to the last token.

    Parameters:
    - steering_vector (torch.Tensor): Steering vector.
    - beta (float): Scaling factor.

    Returns:
    - function: Hook function for adding steering vector.
    """
    def last_token_steering_hook(resid_pre, hook):
        for i in range(resid_pre.shape[0]):
            current_token_index = resid_pre.shape[1] - 1
            resid_pre[i, current_token_index, :] += steering_vector.squeeze() * beta

    return last_token_steering_hook

In [None]:
def generate_ave_hook_addition_mean_centered(steering_vector, beta):
    """
    Generates a hook function to add a mean-centered steering vector to the last token.

    Parameters:
    - steering_vector (torch.Tensor): Steering vector.
    - mean_train (torch.Tensor): Mean training vector.
    - beta (float): Scaling factor.

    Returns:
    - function: Hook function for adding mean-centered steering vector.
    """
    def last_token_steering_hook(resid_pre, hook):
        for i in range(resid_pre.shape[0]):
            current_token_index = resid_pre.shape[1] - 1
            resid_pre[i, current_token_index, :] += steering_vector.squeeze() * beta

    return last_token_steering_hook

In [None]:
def compute_conceptor(X, aperture):
    """
    Computes the conceptor matrix for a given input matrix X.
    (PyTorch version)

    Parameters:
    - X (torch.Tensor): Input matrix of shape (n_samples, n_features).
    - torch.Tensor: Conceptor matrix of shape (n_features, n_features).
    """
    R = torch.matmul(X.T, X) / X.shape[0]
    U, S, _ = torch.svd(R)
    C = U * (S / (S + (aperture**(-2)) * torch.ones(S.shape, device=X.device))) @ U.T
    return C

In [None]:
def combine_conceptors_and(C1, C2):
    """
    Combines two conceptors C1 and C2 using the given formula. (AND operation, does not work so well)

    Parameters:
    - C1 (torch.Tensor): First conceptor tensor of shape (n_features, n_features).
    - C2 (torch.Tensor): Second conceptor tensor of shape (n_features, n_features).

    Returns:
    - torch.Tensor: Combined conceptor tensor of shape (n_features, n_features).
    """
    I = torch.eye(C1.shape[0], device=C1.device)  # Identity matrix
    C1_inv = torch.inverse(C1)
    C2_inv = torch.inverse(C2)
    combined_inv = C1_inv + C2_inv - I
    combined = torch.inverse(combined_inv)
    return combined

In [None]:
def combine_conceptors(C1, C2):
    """
    Combines two conceptors C1 and C2 using the given new formula. (OR operation which works much better than AND)

    Parameters:
    - C1 (torch.Tensor): First conceptor tensor of shape (n_features, n_features).
    - C2 (torch.Tensor): Second conceptor tensor of shape (n_features, n_features).

    Returns:
    - torch.Tensor: Combined conceptor tensor of shape (n_features, n_features).
    """
    I = torch.eye(C1.shape[0], device=C1.device)  # Identity matrix
    I_C1_inv = torch.inverse(I - C1)
    I_C2_inv = torch.inverse(I - C2)
    combined_inv = I_C1_inv + I_C2_inv - I
    combined = torch.inverse(combined_inv)
    result = I - combined
    return result

In [None]:
def top_1_first_tokens(model, prompts: List[str], fwd_hooks=[]):
    """
    Retrieves the top token predictions for the first tokens after the prompts.

    Parameters:
    - model: Language model with hooks and tokenizer.
    - prompts (List[str]): List of prompt strings.
    - fwd_hooks (list, optional): List of forward hooks to apply during the model run.

    Returns:
    - List[str]: List of top predicted tokens.
    """
    top_tokens = []

    with model.hooks(fwd_hooks=fwd_hooks):
        model.tokenizer.padding_side = 'left'
        input_prompts_tokenized = model.to_tokens(prompts)
        logits, _ = model.run_with_cache(input_prompts_tokenized, remove_batch_dim=True)
        next_logits = logits[:, -1, :]
        next_probabilities = next_logits.softmax(dim=-1)
        top_indices = torch.argmax(next_probabilities, dim=-1)

        for index in top_indices:
            decoded_token = model.tokenizer.decode([index.item()])
            top_tokens.append(decoded_token)

    return top_tokens

# **Configurations**

**Config**

In [None]:
# Get the strings of all steering prompts
STEERING_PROMPTS_PATHS = {
    'singular_plural': './Prompts/singular-plural.json',
    'antonyms': './Prompts/antonyms.json',
    'present-past': './Prompts/present-past.json',
    'english-french': './Prompts/english-french.json',
    'country-capital': './Prompts/country-capital.json',
    'capitalize': './Prompts/capitalize.json',
    'singular-plural-capitalize': './Prompts/singular-plural-capitalized.json',
    'english-french-capitalize': './Prompts/english-french-capitalized.json',
    'english-french-antonyms': './Prompts/english-french-antonyms.json',
}

# Define the layers where the function is present (taken from the FunctionVectors paper)
ACTIVE_LAYERS = {
    'singular_plural': range(9,17),
    'antonyms': range(9,17),
    'present-past': range(9, 17),
    'english-french': range(9,17),
    'country-capital': range(9,17),
    'capitalize': range(9,17)
}

# CHANGE THIS TO CHANGE THE FUNCTION TASK
TASK1 = "singular-plural"
TASK2 = "capitalize"
TASK = "singular-plural-capitalize"
COMBINED = TASK

STEERING_PROMPTS_PATH1 = STEERING_PROMPTS_PATHS[TASK1]
STEERING_PROMPTS_PATH2 = STEERING_PROMPTS_PATHS[TASK2]
STEERING_PROMPTS_COMBINED_PATH = STEERING_PROMPTS_PATHS[COMBINED]

MEAN_ACTIVATIONS_PATH='./Results/activations_reproduced_fullaveraged.pkl'
RESULTS_PATH='./Results/'
MODEL_NAME = 'EleutherAI/gpt-j-6b'
DTYPE = 'float16'
SAVE_RESULTS = True

# Experimental variables
NUM_STEERING_PROMPTS = 100              # Amount of samples to steer in total for each config (Np)
NUM_STEERING_EXAMPLES_PER_PROMPT = 10   # Amount of examples in each line of the a steering prompt (N)
NUM_INPUT_PROMPTS = 1000                # Number of input prompts to test the model on (Nt)
NUM_EXPERIMENTS = 5                     # Amount of experiments to run

# Experimental configurations
list_extraction_layers = [14]                                                # Layers to steer (Currently only doing this one one layer, but could change to ACTIVE_LAYERS[TASK] for all supported layers)
list_beta_averaging = [2.3]                                                  # Beta value(s) for averaging 2.3
list_beta_conceptor = [3.9]                                                  # Beta value(s) for conceptor steering 3.9
list_aperatures_normal =  [0.0125]                                           # Aperture value(s) for normal conceptors
list_aperatures_mean_centered =  [0.05]                                      # Aperture value(s) for mean-centered conceptors

**Load the mean training activations**

In [None]:
import pickle
with open(MEAN_ACTIVATIONS_PATH, 'rb') as file:
    mean_train_activations = pickle.load(file)
mean_train_activations = {key: tensor.to('cuda') for key, tensor in mean_train_activations.items()}

**Data classes for experiments**

In [None]:
@dataclass
class ExperimentConfigConceptor:
    extraction_layer: int
    beta: float
    aperture: float

@dataclass
class ExperimentConfigAveraging:
    extraction_layer: int
    beta: float

In [None]:
# SINGLE-LAYER
configs_averaging = [
    ExperimentConfigAveraging(
        extraction_layer=extraction_layer,
        beta=beta,
    )
    for extraction_layer in list_extraction_layers
    for beta in list_beta_averaging
]

configs_conceptoring = [
    ExperimentConfigConceptor(
        extraction_layer=extraction_layer,
        beta=beta,
        aperture=aperture,
    )
    for extraction_layer in list_extraction_layers
    for beta in list_beta_conceptor
    for aperture in list_aperatures_normal
]

configs_conceptoring_mean_centered = [
    ExperimentConfigConceptor(
        extraction_layer=extraction_layer,
        beta=beta,
        aperture=aperture,
    )
    for extraction_layer in list_extraction_layers
    for beta in list_beta_conceptor
    for aperture in list_aperatures_mean_centered
]

**Load model**

In [None]:
# Load the model onto GPU (if possible)
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {DEVICE}")
torch.set_grad_enabled(False)
model = HookedTransformer.from_pretrained_no_processing(model_name=MODEL_NAME, device=DEVICE, dtype=DTYPE)
model.eval();

Using device: cuda




config.json:   0%|          | 0.00/930 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/24.2G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/619 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.37M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/4.04k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/357 [00:00<?, ?B/s]

Loaded pretrained model EleutherAI/gpt-j-6b into HookedTransformer


# **Experiment Prep**

**Generate experimental data**

In [None]:
# Store separate prompts and steering examples for each experiment (version 1)
experiment_data_1 = {}

for exp in range(NUM_EXPERIMENTS):
    input_output_pairs_1 = load_input_output_pairs(STEERING_PROMPTS_PATH1)
    steering_examples_1 = []
    for i in range(NUM_STEERING_PROMPTS):
        steering_examples_1.append(create_random_pairs_string(input_output_pairs_1, NUM_STEERING_EXAMPLES_PER_PROMPT))

    experiment_data_1[exp] = {
        'steering_examples': steering_examples_1
    }

# Store separate prompts and steering examples for each experiment (version 2)
experiment_data_2 = {}

for exp in range(NUM_EXPERIMENTS):
    input_output_pairs_2 = load_input_output_pairs(STEERING_PROMPTS_PATH2)
    steering_examples_2 = []
    for i in range(NUM_STEERING_PROMPTS):
        steering_examples_2.append(create_random_pairs_string(input_output_pairs_2, NUM_STEERING_EXAMPLES_PER_PROMPT))

    experiment_data_2[exp] = {
        'steering_examples': steering_examples_2
    }

# Store prompts to steer and correct outputs for each experiment (combined version)
experiment_data = {}

for exp in range(NUM_EXPERIMENTS):
    input_output_pairs_combined = load_input_output_pairs(STEERING_PROMPTS_COMBINED_PATH)
    prompts_to_steer_combined = get_unique_random_inputs_formatted(input_output_pairs_combined, NUM_INPUT_PROMPTS)

    correct_outputs_full_combined = [get_output(prompt[:-1], input_output_pairs_combined) for prompt in prompts_to_steer_combined]
    correct_outputs_full_tokenized_combined = [model.tokenizer.tokenize(idx) for idx in correct_outputs_full_combined]
    correct_outputs_1st_combined = [output[0] for output in correct_outputs_full_tokenized_combined]

    experiment_data[exp] = {
        'prompts_to_steer': prompts_to_steer_combined,
        'correct_outputs_1st': correct_outputs_1st_combined
    }

In [None]:
print(experiment_data[0]['prompts_to_steer'][0])
print(experiment_data[0]['correct_outputs_1st'][0])
print(experiment_data_1[0]['steering_examples'][0])
print(experiment_data_2[0]['steering_examples'][0])

lotion:
L
stereo:stereos, box:boxes, paint:paints, lake:lakes, glove:gloves, speaker:speakers, screwdriver:screwdrivers, toothpaste:toothpastes, marker:markers, jacket:
gnu:Gnu, jacket:Jacket, those:Those, jackfruit:Jackfruit, incense:Incense, zucchini:Zucchini, realize:Realize, barracuda:Barracuda, computer:Computer, watermelon:


**Generate activations**

In [None]:
from tqdm import tqdm

# Initialize a dictionary to store activations for each experiment and layer (version 1)
activations_cache_1 = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Precompute activations for all layers with a progress bar (version 1)
total_computations_1 = NUM_EXPERIMENTS

with tqdm(total=total_computations_1, desc="Precomputing activations (version 1)") as pbar_1:
    for exp in range(NUM_EXPERIMENTS):
        activations_dict_1 = extract_activations_last_token(model, experiment_data_1[exp]['steering_examples'], list_extraction_layers, device=DEVICE)
        for layer in list_extraction_layers:
            activations_cache_1[exp][layer] = activations_dict_1[layer].squeeze()
        pbar_1.update(1)

# Initialize a dictionary to store activations for each experiment and layer (version 2)
activations_cache_2 = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Precompute activations for all layers with a progress bar (version 2)
total_computations_2 = NUM_EXPERIMENTS

with tqdm(total=total_computations_2, desc="Precomputing activations (version 2)") as pbar_2:
    for exp in range(NUM_EXPERIMENTS):
        activations_dict_2 = extract_activations_last_token(model, experiment_data_2[exp]['steering_examples'], list_extraction_layers, device=DEVICE)
        for layer in list_extraction_layers:
            activations_cache_2[exp][layer] = activations_dict_2[layer].squeeze()
        pbar_2.update(1)

Precomputing activations (version 1): 100%|██████████| 5/5 [00:14<00:00,  2.85s/it]
Precomputing activations (version 2): 100%|██████████| 5/5 [00:13<00:00,  2.68s/it]


# **Baseline (no steering)**

In [None]:
import math

print("Calculating baseline accuracy...")
baseline_success_count = 0
total_prompts_for_baseline = 0

batch_size = 50
baseline_data = []

for exp in range(NUM_EXPERIMENTS):
    success_count = 0
    total_prompts = 0

    # Retrieve experiment-specific prompts and correct outputs
    prompts_to_steer = experiment_data[exp]['prompts_to_steer']
    correct_outputs_1st = experiment_data[exp]['correct_outputs_1st']

    num_batches = math.ceil(len(prompts_to_steer) / batch_size)

    for batch_idx in range(num_batches):
        batch_prompts = prompts_to_steer[batch_idx * batch_size : (batch_idx + 1) * batch_size]
        batch_correct_outputs_1st = correct_outputs_1st[batch_idx * batch_size : (batch_idx + 1) * batch_size]

        # Generate baseline outputs from input prompts
        top_1_tokens = top_1_first_tokens(model, batch_prompts, fwd_hooks=[])

        # Increment success count if top-1 output matches the correct output
        for i, top_1_token in enumerate(top_1_tokens):
            if top_1_token == batch_correct_outputs_1st[i]:
                success_count += 1
            total_prompts += 1

    baseline_accuracy = (success_count / total_prompts) * 100
    print(f"Experiment {exp + 1} Baseline Accuracy: {baseline_accuracy:.2f}% ({success_count}/{total_prompts} samples)")

    # Store results for this experiment
    baseline_data.append({
        'experiment': exp + 1,
        'success_count': success_count,
        'total_prompts': total_prompts,
        'baseline_accuracy': baseline_accuracy
    })

    # Update overall baseline success count and total prompts
    baseline_success_count += success_count
    total_prompts_for_baseline += total_prompts

# Calculate and print the overall baseline accuracy
overall_baseline_accuracy = (baseline_success_count / total_prompts_for_baseline) * 100
print(f"Overall Baseline Unsteered Accuracy: {overall_baseline_accuracy:.2f}% ({baseline_success_count}/{total_prompts_for_baseline} samples)")

# Store the overall results
baseline_data.append({
    'experiment': 'Overall',
    'success_count': baseline_success_count,
    'total_prompts': total_prompts_for_baseline,
    'baseline_accuracy': overall_baseline_accuracy
})

Calculating baseline accuracy...
Experiment 1 Baseline Accuracy: 0.00% (0/1000 samples)
Experiment 2 Baseline Accuracy: 0.00% (0/1000 samples)
Experiment 3 Baseline Accuracy: 0.00% (0/1000 samples)
Experiment 4 Baseline Accuracy: 0.00% (0/1000 samples)
Experiment 5 Baseline Accuracy: 0.00% (0/1000 samples)
Overall Baseline Unsteered Accuracy: 0.00% (0/5000 samples)


In [None]:
if SAVE_RESULTS:
    baseline_df = pd.DataFrame(baseline_data)
    baseline_df.to_csv(f'{RESULTS_PATH}/{TASK}_baseline_results.csv', index=False)

# **Conceptor Steering**

1 Conceptor computed from activations from 1 layer is applied to that layer

**Pre-Compute all necessary Conceptors**

In [None]:
from tqdm import tqdm

# Initialize a dictionary to store conceptors for each experiment (mean centered, version 1)
conceptors_cache_mean_centered_1 = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Total number of computations for the progress bar (version 1)
total_computations_1 = len(list_extraction_layers) * len(list_aperatures_mean_centered) * NUM_EXPERIMENTS

# Precompute conceptors for all layers and apertures with a progress bar (mean centered, version 1)
with tqdm(total=total_computations_1, desc="Precomputing conceptors (mean centered, version 1)") as pbar_1:
    for exp in range(NUM_EXPERIMENTS):
        for layer in list_extraction_layers:
            for aperture in list_aperatures_mean_centered:
                # Extract the last-token activations of steering examples at the specified layer
                activations = activations_cache_1[exp][layer]
                activations_mean_centered = activations - mean_train_activations[layer]
                # Compute the steering conceptor using cached activations
                conceptor = compute_conceptor(activations_mean_centered, aperture)
                # Store the conceptor in the cache
                conceptors_cache_mean_centered_1[exp][(layer, aperture)] = conceptor
                # Update the progress bar
                pbar_1.update(1)

# Initialize a dictionary to store conceptors for each experiment (mean centered, version 2)
conceptors_cache_mean_centered_2 = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Total number of computations for the progress bar (version 2)
total_computations_2 = len(list_extraction_layers) * len(list_aperatures_mean_centered) * NUM_EXPERIMENTS

# Precompute conceptors for all layers and apertures with a progress bar (mean centered, version 2)
with tqdm(total=total_computations_2, desc="Precomputing conceptors (mean centered, version 2)") as pbar_2:
    for exp in range(NUM_EXPERIMENTS):
        for layer in list_extraction_layers:
            for aperture in list_aperatures_mean_centered:
                # Extract the last-token activations of steering examples at the specified layer
                activations = activations_cache_2[exp][layer]
                activations_mean_centered = activations - mean_train_activations[layer]
                # Compute the steering conceptor using cached activations
                conceptor = compute_conceptor(activations_mean_centered, aperture)
                # Store the conceptor in the cache
                conceptors_cache_mean_centered_2[exp][(layer, aperture)] = conceptor
                # Update the progress bar
                pbar_2.update(1)

Precomputing conceptors (mean centered, version 1): 100%|██████████| 5/5 [01:02<00:00, 12.55s/it]
Precomputing conceptors (mean centered, version 2): 100%|██████████| 5/5 [00:56<00:00, 11.25s/it]


In [None]:
# Initialize a dictionary to store merged conceptors for each experiment (mean centered)
conceptors_cache_mean_centered = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Total number of computations for the progress bar
total_computations_combined = len(list_extraction_layers) * len(list_aperatures_mean_centered) * NUM_EXPERIMENTS

# Merge mean-centered conceptors from both caches with a progress bar
with tqdm(total=total_computations_combined, desc="Merging conceptors (mean centered)") as pbar_combined:
    for exp in range(NUM_EXPERIMENTS):
        for layer in list_extraction_layers:
            for aperture in list_aperatures_mean_centered:
                # Retrieve conceptors from both caches
                conceptor_1 = conceptors_cache_mean_centered_1[exp][(layer, aperture)]
                conceptor_2 = conceptors_cache_mean_centered_2[exp][(layer, aperture)]
                # Merge the conceptors
                merged_conceptor = combine_conceptors(conceptor_1, conceptor_2)
                # Store the merged conceptor in the cache
                conceptors_cache_mean_centered[exp][(layer, aperture)] = merged_conceptor
                # Update the progress bar
                pbar_combined.update(1)

Merging conceptors (mean centered): 100%|██████████| 5/5 [00:00<00:00,  6.67it/s]


**Run Experiments**

In [None]:
import math
import random
import numpy as np
import torch

results_conceptoring_mean_centered = {}

batch_size = 50

for config in configs_conceptoring_mean_centered:
    # Extract current experimental configurations
    layer = config.extraction_layer
    beta = config.beta
    aperture = config.aperture
    config_key = f"Layer {layer}, Beta {beta}, Aperture {aperture}"
    print(f"-----------{config_key}-----------")

    # Initialize success counts and store final accuracies for each experiment
    if config_key not in results_conceptoring_mean_centered:
        results_conceptoring_mean_centered[config_key] = {'success_counts': [], 'total_prompts': [], 'final_accuracies': []}

    for exp in range(NUM_EXPERIMENTS):
        # Retrieve precomputed conceptor from the cache
        conceptor = conceptors_cache_mean_centered[exp][(layer, aperture)]

        success_count = 0
        total_prompts = 0

        # Retrieve experiment-specific prompts and correct outputs
        prompts_to_steer = experiment_data[exp]['prompts_to_steer']
        correct_outputs_1st = experiment_data[exp]['correct_outputs_1st']

        num_batches = math.ceil(len(prompts_to_steer) / batch_size)

        for batch_idx in range(num_batches):
            batch_prompts = prompts_to_steer[batch_idx * batch_size : (batch_idx + 1) * batch_size]
            batch_correct_outputs_1st = correct_outputs_1st[batch_idx * batch_size : (batch_idx + 1) * batch_size]

            # Initialize hooks that will allow for the conceptor to be applied
            conceptor_hook = generate_conceptor_hook_mean_centered(conceptor=conceptor, mean_train=mean_train_activations[layer], beta=beta)
            activation_modification = (f"blocks.{layer}.hook_resid_pre", conceptor_hook)
            editing_hooks = [activation_modification]

            # Generate steered outputs from input prompts using the conceptor hooks
            top_1_tokens = top_1_first_tokens(model, batch_prompts, fwd_hooks=editing_hooks)

            # Increment success count if top-1 output matches the correct output
            for i, top_1_token in enumerate(top_1_tokens):
                if top_1_token == batch_correct_outputs_1st[i]:
                    success_count += 1
                total_prompts += 1

        final_accuracy = (success_count / total_prompts) * 100
        results_conceptoring_mean_centered[config_key]['success_counts'].append(success_count)
        results_conceptoring_mean_centered[config_key]['total_prompts'].append(total_prompts)
        results_conceptoring_mean_centered[config_key]['final_accuracies'].append(final_accuracy)
        model.reset_hooks()

        print(f"Experiment {exp+1}: Final Accuracy: {final_accuracy:.2f}% ({success_count}/{total_prompts} samples)")

    # Calculate average final accuracy across all experiments
    average_final_accuracy = sum(results_conceptoring_mean_centered[config_key]['final_accuracies']) / NUM_EXPERIMENTS
    results_conceptoring_mean_centered[config_key]['average_final_accuracy'] = average_final_accuracy
    print(f"Average Final Accuracy: {average_final_accuracy:.2f}% over {NUM_EXPERIMENTS} experiments")

In [None]:
if SAVE_RESULTS:
    data = []
    for config_key, result in results_conceptoring_mean_centered.items():
        for i in range(NUM_EXPERIMENTS):
            data.append({
                'config_key': config_key,
                'experiment': i+1,
                'success_count': result['success_counts'][i],
                'total_prompts': result['total_prompts'][i],
                'final_accuracy': result['final_accuracies'][i]
            })
        # Add average final accuracy for each configuration
        data.append({
            'config_key': config_key,
            'experiment': 'Average',
            'success_count': '',
            'total_prompts': '',
            'final_accuracy': result['average_final_accuracy']
        })

    df = pd.DataFrame(data)
    df.to_csv(f'{RESULTS_PATH}/{TASK}_conceptoring_mean_centered_results.csv', index=False)

# **Additive Steering**
1 Average computed from activations from 1 layer is added to that layer with mean-centering

In [None]:
from tqdm import tqdm
import torch

# Initialize a dictionary to store averaged activations for each experiment (version 1)
averaged_activations_cache_1 = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Total number of computations for the progress bar (version 1)
total_computations_1 = len(list_extraction_layers) * NUM_EXPERIMENTS

# Precompute averaged activations for all layers with a progress bar (version 1)
with tqdm(total=total_computations_1, desc="Precomputing averaged activations (version 1)") as pbar_1:
    for exp in range(NUM_EXPERIMENTS):
        for layer in list_extraction_layers:
            # Extract the last-token activations of steering examples at the specified layer
            activations = activations_cache_1[exp][layer]
            # Compute the average activations
            avg_activations = torch.mean(activations, dim=0)
            # Store the average activations in the cache
            averaged_activations_cache_1[exp][layer] = avg_activations - mean_train_activations[layer]
            # Update the progress bar
            pbar_1.update(1)

# Initialize a dictionary to store averaged activations for each experiment (version 2)
averaged_activations_cache_2 = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Total number of computations for the progress bar (version 2)
total_computations_2 = len(list_extraction_layers) * NUM_EXPERIMENTS

# Precompute averaged activations for all layers with a progress bar (version 2)
with tqdm(total=total_computations_2, desc="Precomputing averaged activations (version 2)") as pbar_2:
    for exp in range(NUM_EXPERIMENTS):
        for layer in list_extraction_layers:
            # Extract the last-token activations of steering examples at the specified layer
            activations = activations_cache_2[exp][layer]
            # Compute the average activations
            avg_activations = torch.mean(activations, dim=0)
            # Store the average activations in the cache
            averaged_activations_cache_2[exp][layer] = avg_activations - mean_train_activations[layer]
            # Update the progress bar
            pbar_2.update(1)

# Initialize a dictionary to store merged averaged activations for each experiment
averaged_activations_cache = {exp: {} for exp in range(NUM_EXPERIMENTS)}

# Total number of computations for the progress bar (merged)
total_computations_combined = len(list_extraction_layers) * NUM_EXPERIMENTS

# Merge averaged activations from both caches by adding them with a progress bar
with tqdm(total=total_computations_combined, desc="Merging averaged activations") as pbar_combined:
    for exp in range(NUM_EXPERIMENTS):
        for layer in list_extraction_layers:
            # Retrieve averaged activations from both caches
            avg_activations_1 = averaged_activations_cache_1[exp][layer]
            avg_activations_2 = averaged_activations_cache_2[exp][layer]
            # Merge the averaged activations by adding them
            merged_avg_activations = avg_activations_1 + avg_activations_2 # NOTE: we may have to change this such that merged = (avg1 + avg2) / 2
            # Store the merged average activations in the cache
            averaged_activations_cache[exp][layer] = merged_avg_activations
            # Update the progress bar
            pbar_combined.update(1)

Precomputing averaged activations (version 1): 100%|██████████| 5/5 [00:00<00:00, 1343.55it/s]
Precomputing averaged activations (version 2): 100%|██████████| 5/5 [00:00<00:00, 5162.86it/s]
Merging averaged activations: 100%|██████████| 5/5 [00:00<00:00, 6057.63it/s]


**Run experiment**

In [None]:
import math
import numpy as np
import torch

results_averaging_mean_centered = {}

batch_size = 50

for config in configs_averaging:
    # Extract current experimental configurations
    layer = config.extraction_layer
    beta = config.beta
    config_key = f"Layer {layer}, Beta {beta}"
    print(f"-------------------------{config_key}-------------------------")

    # Initialize success counts and store final accuracies for each experiment
    if config_key not in results_averaging_mean_centered:
        results_averaging_mean_centered[config_key] = {'success_counts': [], 'total_prompts': [], 'final_accuracies': []}

    for exp in range(NUM_EXPERIMENTS):
        # Retrieve precomputed averaged activations from the cache
        avg_activations = averaged_activations_cache[exp][layer]
        mean_train = mean_train_activations[layer]

        success_count = 0
        total_prompts = 0

        # Retrieve experiment-specific prompts and correct outputs
        prompts_to_steer = experiment_data[exp]['prompts_to_steer']
        correct_outputs_1st = experiment_data[exp]['correct_outputs_1st']

        num_batches = math.ceil(len(prompts_to_steer) / batch_size)

        for batch_idx in range(num_batches):
            batch_prompts = prompts_to_steer[batch_idx * batch_size : (batch_idx + 1) * batch_size]
            batch_correct_outputs_1st = correct_outputs_1st[batch_idx * batch_size : (batch_idx + 1) * batch_size]

            # Initialize hooks that will allow for the average vector to be added
            ave_hook = generate_ave_hook_addition_mean_centered(steering_vector=avg_activations, beta=beta)
            activation_modification = (f"blocks.{layer}.hook_resid_pre", ave_hook)
            editing_hooks = [activation_modification]

            # Generate steered outputs from input prompts using the average steering hooks
            top_1_tokens = top_1_first_tokens(model, batch_prompts, fwd_hooks=editing_hooks)

            # Increment success count if top-1 output matches the correct output
            for i, top_1_token in enumerate(top_1_tokens):
                if top_1_token == batch_correct_outputs_1st[i]:
                    success_count += 1
                total_prompts += 1

        final_accuracy = (success_count / total_prompts) * 100
        results_averaging_mean_centered[config_key]['success_counts'].append(success_count)
        results_averaging_mean_centered[config_key]['total_prompts'].append(total_prompts)
        results_averaging_mean_centered[config_key]['final_accuracies'].append(final_accuracy)
        model.reset_hooks()

        print(f"Experiment {exp+1}: Final Accuracy: {final_accuracy:.2f}% ({success_count}/{total_prompts} samples)")

    # Calculate average final accuracy across all experiments
    average_final_accuracy = sum(results_averaging_mean_centered[config_key]['final_accuracies']) / NUM_EXPERIMENTS
    results_averaging_mean_centered[config_key]['average_final_accuracy'] = average_final_accuracy
    print(f"Average Final Accuracy: {average_final_accuracy:.2f}% over {NUM_EXPERIMENTS} experiments")

In [None]:
if SAVE_RESULTS:
    data = []
    for config_key, result in results_averaging_mean_centered.items():
        for i in range(NUM_EXPERIMENTS):
            data.append({
                'config_key': config_key,
                'experiment': i+1,
                'success_count': result['success_counts'][i],
                'total_prompts': result['total_prompts'][i],
                'final_accuracy': result['final_accuracies'][i]
            })
        # Add average final accuracy for each configuration
        data.append({
            'config_key': config_key,
            'experiment': 'Average',
            'success_count': '',
            'total_prompts': '',
            'final_accuracy': result['average_final_accuracy']
        })

    df = pd.DataFrame(data)
    df.to_csv(f'{RESULTS_PATH}/{TASK}_averaging_addition_mean_centred_results.csv', index=False)