### Alternative Steering Vectors

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import os
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.svm import LinearSVC
from scipy import stats

# --- Configuration ---
BASE_INPUT_FILE = "results/base_activation_dataset.parquet"
INSTRUCT_INPUT_FILE = "results/instruct_activation_dataset.parquet"

BASE_VECTORS_DIR = "steering_vectors/{method}/base"
INSTRUCT_VECTORS_DIR = "steering_vectors/{method}/instruct"

METHODS_TO_ANALYZE = ['caa', 'repe', 'lda', 'svm', 'lr']

#### Helper Functions

In [None]:
# --- Helper Functions ---

# Unfortunately, shared prompt prefixes are not saved in the activation dataset, so in order to compute mean of differences and representation engineering steering vectors (which rely on pairwise matching) we need to recover the pairs using 
def get_smart_pair_map(df, start_len=50, step=10):
    """
    Returns a dictionary mapping {prompt_string: pair_id} using a recursive
    zoom-in strategy to disentangle prompts with shared prefixes.
    """
    # 1. Get unique rows (prompt + label) to avoid processing duplicate layers
    # We only care about the unique text-label combinations
    unique_df = df[['prompt', 'label']].drop_duplicates().copy()
    
    pair_map = {}
    current_pair_id = 0
    
    def recursive_match(subset_df, prefix_len):
        nonlocal current_pair_id
        
        # Stop if subset is empty
        if subset_df.empty:
            return

        # Stop if we've exceeded the length of the shortest prompt
        # (This handles the edge case where prompts are literally identical)
        min_prompt_len = subset_df['prompt'].str.len().min()
        if prefix_len > min_prompt_len:
             print(f"    Warning: Ambiguous prompts found at max length {min_prompt_len}. Skipping.")
             return

        # Group by the current prefix
        subset_df['temp_prefix'] = subset_df['prompt'].str.slice(0, prefix_len)
        groups = subset_df.groupby('temp_prefix')
        
        for prefix, group in groups:
            # Count distribution
            counts = group['label'].value_counts()
            n_inst = counts.get('instrumental', 0)
            n_term = counts.get('terminal', 0)
            
            # CASE A: Perfect Pair (1 vs 1) -> ASSIGN ID
            if n_inst == 1 and n_term == 1:
                for prompt in group['prompt']:
                    pair_map[prompt] = current_pair_id
                current_pair_id += 1
                
            # CASE B: Too many items (e.g. 2 vs 2) -> ZOOM IN (Recurse)
            elif n_inst > 1 and n_term > 1:
                # We assume the prefix is longer, so we try +step chars
                # Pass ONLY this group to the next level
                recursive_match(group.copy(), prefix_len + step)
                
            # CASE C: Unbalanced (e.g. 2 vs 1, or 1 vs 0) -> DISCARD (Orphan)
            else:
                # If we are deep in recursion, this might be a fail state.
                # But sometimes unbalanced groups split into balanced ones deeper down?
                # No, if we have 2 Inst and 1 Term, we can never form 2 pairs.
                # However, we might form 1 pair and drop 1 orphan.
                if n_inst >= 1 and n_term >= 1:
                    recursive_match(group.copy(), prefix_len + step)
                # Else: completely broken (e.g. 3 inst, 0 term), ignore.
                pass

    print(f"Starting recursive pairing (min_len={start_len}, step={step})...")
    recursive_match(unique_df, start_len)
    print(f"Matched {len(pair_map) // 2} pairs from {len(unique_df)} unique prompts.")
    
    return pair_map

def recover_pairings(df):
    """
    Applies the smart pairing map to the dataframe.
    """
    # Generate the map based on unique prompts
    pair_map = get_smart_pair_map(df)
    
    # Map IDs
    df['pair_id'] = df['prompt'].map(pair_map)
    
    # Drop rows that weren't paired
    before = len(df)
    df_clean = df.dropna(subset=['pair_id']).copy()
    df_clean['pair_id'] = df_clean['pair_id'].astype(int)
    
    removed = before - len(df_clean)
    if removed > 0:
        print(f"  Dropped {removed} activations (orphans or duplicates).")
        
    return df_clean

def get_aligned_data(df, layer_name, activation_cols):
    """
    Extracts aligned instrumental and terminal matrices using 'pair_id'.
    """
    layer_df = df[df['layer'] == layer_name].copy()
    
    # Sort by pair_id to ensure row alignment
    # We allow instrumental/terminal to be in any order within the pair,
    # so we sort by pair_id first, then by label to ensure consistency.
    # e.g. (Pair 0, Instrumental), (Pair 0, Terminal), (Pair 1, Inst)...
    layer_df = layer_df.sort_values(by=['pair_id', 'label'])
    
    inst_df = layer_df[layer_df['label'] == 'instrumental']
    term_df = layer_df[layer_df['label'] == 'terminal']
    
    # Sanity check: Ensure IDs match exactly row-for-row
    if not np.array_equal(inst_df['pair_id'].values, term_df['pair_id'].values):
        # Fallback: force intersection
        common_ids = np.intersect1d(inst_df['pair_id'].values, term_df['pair_id'].values)
        inst_df = inst_df[inst_df['pair_id'].isin(common_ids)]
        term_df = term_df[term_df['pair_id'].isin(common_ids)]
        
    X_pos = inst_df[activation_cols].values.astype(np.float32)
    X_neg = term_df[activation_cols].values.astype(np.float32)
    
    return X_pos, X_neg

## Other Helper Functions

def save_vector(vector, method, model_type, layer_name):
    """
    Saves the vector to the appropriate directory based on config.
    """
    if model_type == 'base':
        out_dir = BASE_VECTORS_DIR.format(method=method)
    else:
        out_dir = INSTRUCT_VECTORS_DIR.format(method=method)
        
    os.makedirs(out_dir, exist_ok=True)
    np.save(os.path.join(out_dir, f"{layer_name}_steering.npy"), vector)

def cohens_d(group1, group2):
    """
    Calculates Cohen's D (effect size) for independent samples.
    Assumes roughly equal variances, which is reasonable here.
    """
    n1, n2 = len(group1), len(group2)
    var1, var2 = np.var(group1, ddof=1), np.var(group2, ddof=1)
    
    # Pooled standard deviation
    pooled_se = np.sqrt(((n1 - 1) * var1 + (n2 - 1) * var2) / (n1 + n2 - 2))
    
    # Calculate d
    d = (np.mean(group1) - np.mean(group2)) / pooled_se
    return d

def load_layer_vectors(layer_name, model_type):
    """
    Loads all available steering vectors for a specific layer.
    Returns a dictionary of vectors and a list of valid method names found.
    """
    vector_dict = {}
    valid_methods = []
    
    if model_type == 'base':
        base_dir = BASE_VECTORS_DIR
    else:
        base_dir = INSTRUCT_VECTORS_DIR
        
    for method in METHODS_TO_ANALYZE:
        path = os.path.join(base_dir.format(method=method), f"{layer_name}_steering.npy")
        if os.path.exists(path):
            try:
                vector_dict[method] = np.load(path)
                valid_methods.append(method)
            except Exception as e:
                print(f"Error loading {method} for {layer_name}: {e}")

    return vector_dict, valid_methods

#### Load .parquet Files

In [None]:
# --- Dataset Loading ---
datasets = {}

print(f"Loading Base model activations from {BASE_INPUT_FILE}")
df_base = pd.read_parquet(BASE_INPUT_FILE)
datasets['base'] = recover_pairings(df_base)

print(f"Loading Instruct model activations from {INSTRUCT_INPUT_FILE}")
df_instruct = pd.read_parquet(INSTRUCT_INPUT_FILE)
datasets['instruct'] = recover_pairings(df_instruct)

activation_cols = [col for col in datasets['base'].columns if col.startswith('act_')]
hidden_dim = len(activation_cols)

layer_names = datasets['base']['layer'].unique()
sorted_layer_names = sorted(layer_names, key=lambda x: int(x.split('.')[-2]))

#### Compute CAA (Difference of Means (DoM)) Steering Vectors
Computes $\mu_{instrumental} - \mu_{terminal}$ across the entire dataset of activations at a particular layer

In [None]:
METHOD = "caa"
print(f"--- Computing {METHOD.upper()} Vectors ---")

for model_type, df in datasets.items():
    print(f"Processing {model_type} model...")
    
    for layer in sorted_layer_names:
        # 1. Get Aligned Data
        X_pos, X_neg = get_aligned_data(df, layer, activation_cols)
        
        # 2. Compute CAA: Mean(Pos) - Mean(Neg)
        mean_pos = np.mean(X_pos, axis=0)
        mean_neg = np.mean(X_neg, axis=0)
        vec_caa = mean_pos - mean_neg
        
        # 3. Save
        save_vector(vec_caa, METHOD, model_type, layer)

print("CAA computation complete.")

#### Compute (Linear) Support Vector Machine (SVM) Steering Vectors
Compute the Linear SVM; the direction that maximizes the margin (gap) between the opposing classes, producing a steering vector defined by the boundary of the concept rather than its average, which may help to ignore irrelevant noise from "easy" examples that often skew mean-based or probabilistic methods.

In [None]:
METHOD = "svm"
print(f"--- Computing {METHOD.upper()} Vectors ---")

for model_type, df in datasets.items():
    print(f"Processing {model_type} model...")
    
    for layer in sorted_layer_names:
        X_pos, X_neg = get_aligned_data(df, layer, activation_cols)
        
        # 1. Prepare Training Data
        # SVM requires stacked samples (N_samples, N_features) and labels
        X_train = np.vstack([X_pos, X_neg])
        
        # Labels: 1 for Instrumental (Pos), 0 for Terminal (Neg)
        y_train = np.concatenate([np.ones(len(X_pos)), np.zeros(len(X_neg))])
        
        # 2. Fit Linear SVM
        # C=0.01: Strong regularization to prevent overfitting in high dimensions
        # dual="auto": Lets sklearn choose the best solver for n_samples vs n_features
        svm = LinearSVC(C=0.01, fit_intercept=True, dual="auto", max_iter=10000, random_state=42)
        svm.fit(X_train, y_train)
        
        # 3. Extract Normal Vector
        vec_svm = svm.coef_[0]
        
        # 4. Sign/Direction Check
        mean_diff_direction = np.mean(X_pos, axis=0) - np.mean(X_neg, axis=0)
        if np.dot(vec_svm, mean_diff_direction) < 0:
            vec_svm = -vec_svm
            
        save_vector(vec_svm, METHOD, model_type, layer)

print("SVM computation complete.")

#### Compute RepE Steering Vectors
Computes PCA of a set of difference vectors $\{x_{instrumental, i} - x_{terminal, i}\}_{i=1}^N$, and then uses the first principal component vector as the steering vector (this is the direction of maximum variance)

In [None]:
METHOD = "repe"
print(f"--- Computing {METHOD.upper()} Vectors ---")

for model_type, df in datasets.items():
    print(f"Processing {model_type} model...")
    
    for layer in sorted_layer_names:
        X_pos, X_neg = get_aligned_data(df, layer, activation_cols)
        
        # 1. PCA on Difference Vectors
        pairwise_diffs = X_pos - X_neg
        
        svd = TruncatedSVD(n_components=1)
        svd.fit(pairwise_diffs)
        vec_repe = svd.components_[0]
        
        # 2. Sign Correction
        # We check against the simple mean difference to ensure the vector points towards "Instrumental"
        mean_diff_direction = np.mean(X_pos, axis=0) - np.mean(X_neg, axis=0)
        if np.dot(vec_repe, mean_diff_direction) < 0:
            vec_repe = -vec_repe
            
        save_vector(vec_repe, METHOD, model_type, layer)

print("RepE computation complete.")

#### Compute LDA Steering Vectors
(also called a "Mass Mean Probe") Computes $(\Sigma + \lambda I)^{-1}(\mu_{instrumental} - \mu_{terminal})$, where $\Sigma$ is the pooled covariance matrix of the activations, which works to effectively "whiten" the noise in the dataset

In [None]:
METHOD = "lda"
print(f"--- Computing {METHOD.upper()} Vectors ---")
REG_PARAM = 1e-4  # Regularization to prevent singular matrices

for model_type, df in datasets.items():
    print(f"Processing {model_type} model...")
    
    for layer in sorted_layer_names:
        X_pos, X_neg = get_aligned_data(df, layer, activation_cols)
        
        # 1. Compute Pooled Covariance
        # Adding shrinkage (eye * REG_PARAM) for numerical stability
        cov_pos = np.cov(X_pos, rowvar=False) + (np.eye(hidden_dim) * REG_PARAM)
        cov_neg = np.cov(X_neg, rowvar=False) + (np.eye(hidden_dim) * REG_PARAM)
        pooled_cov = (cov_pos + cov_neg) / 2
        
        # 2. Compute Difference of Means
        diff_means = np.mean(X_pos, axis=0) - np.mean(X_neg, axis=0)
        
        # 3. Solve Σx = μ_diff (equivalent to x = Σ^-1 * μ_diff)
        try:
            vec_lda = np.linalg.solve(pooled_cov, diff_means)
            # Normalize to unit length
            vec_lda = vec_lda / np.linalg.norm(vec_lda)
        except np.linalg.LinAlgError:
            print(f"  LDA failed for {layer}, defaulting to CAA direction.")
            vec_lda = diff_means
            
        save_vector(vec_lda, METHOD, model_type, layer)

print("LDA computation complete.")

#### Evaluate Steering Vectors

In [None]:
def plot_similarity_heatmap(layer_name, model_type='base'):
    """
    Plots a heatmap of cosine similarities between different steering vectors.
    """
    vectors, methods = load_layer_vectors(layer_name, model_type)
    
    if len(methods) < 2:
        print(f"Not enough vectors found for layer {layer_name} to plot heatmap.")
        return

    # Stack vectors into a matrix (N_methods, Hidden_Dim)
    vec_matrix = np.array([vectors[m] for m in methods])

    # Compute similarity matrix
    # Result is (N_methods, N_methods)
    sim_matrix = cosine_similarity(vec_matrix)

    plt.figure(figsize=(8, 6))
    # Using vmin=-1, vmax=1 to handle anti-correlated vectors
    sns.heatmap(sim_matrix, annot=True, xticklabels=methods, yticklabels=methods, 
                cmap="RdBu_r", vmin=-1, vmax=1, fmt=".3f", center=0)
    
    plt.title(f"Steering Vector Alignment (Cosine Sim)\nLayer: {layer_name} | Model: {model_type.capitalize()}")
    plt.tight_layout()
    fig_name = f"figures/{model_type}_steering_vector_similarity.png"
    plt.savefig(fig_name, dpi=150, bbox_inches='tight')
    plt.show()
    plt.close()

# ===========================
# RUN ANALYSIS 1
# ===========================
TARGET_LAYER = 'model.layers.13.mlp'

for model_type in ['base', 'instruct']:
    plot_similarity_heatmap(TARGET_LAYER, model_type)

In [None]:
def plot_separation_analysis(layer_name, model_type, datasets_dict):
    """
    Plots histograms of projected activations and calculates separation statistics.
    """
    # 1. Get Data and Vectors
    df = datasets_dict[model_type]
    X_pos, X_neg = get_aligned_data(df, layer_name, activation_cols)
    vectors, methods = load_layer_vectors(layer_name, model_type)
    
    if not methods:
        print(f"No vectors found for {layer_name}.")
        return

    # 2. Setup Plot Grid (2 rows, 3 columns)
    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    axes_flat = axes.flatten()
    
    stats_data = []

    # 3. Iterate through methods and plot
    for i, ax in enumerate(axes_flat[:5]): # Only use first 5 slots
        if i < len(methods):
            method = methods[i]
            vec = vectors[method]
            
            # --- Project Data ---
            # Dot product: (N_samples, Hidden) @ (Hidden,) -> (N_samples,)
            # This results in a single scalar "steering score" for each input
            proj_pos = X_pos @ vec
            proj_neg = X_neg @ vec
            
            # Ensure positive mean is higher for consistency in plots/stats
            if np.mean(proj_pos) < np.mean(proj_neg):
                proj_pos = -proj_pos
                proj_neg = -proj_neg

            # --- Plot Histogram ---
            # We calculate bins based on combined data range for fair comparison
            combined_data = np.concatenate([proj_pos, proj_neg])
            bins = np.linspace(np.min(combined_data), np.max(combined_data), 40)
            
            ax.hist(proj_pos, bins=bins, alpha=0.6, label='Instrumental', color='red', density=True)
            ax.hist(proj_neg, bins=bins, alpha=0.6, label='Terminal', color='blue', density=True)
            
            ax.set_title(f"{method.upper()} Projection", fontsize=12, fontweight='bold')
            ax.set_yticks([]) # Remove y-axis ticks for cleaner look
            if i == 0: ax.legend()
            
            # --- Calculate Stats ---
            # Cohen's D (Effect Size)
            d_score = cohens_d(proj_pos, proj_neg)
            # T-test p-value (measuring if means are significantly different)
            t_stat, p_val = stats.ttest_ind(proj_pos, proj_neg, equal_var=True)
            
            stats_data.append({
                'Method': method.upper(),
                "Cohen's D": d_score,
                "P-Value": p_val
            })
        else:
            # Hide unused axes if fewer than 5 methods found
            ax.axis('off')

    # 4. Fill the 6th slot with Statistics Table
    ax_stats = axes_flat[5]
    ax_stats.axis('off')
    ax_stats.set_title("Separation Statistics", fontsize=12, fontweight='bold')
    
    # Format text for table
    header = f"{'Method':<10} | {'Cohen\'s D':<10} | {'P-Value (log10)':<15}"
    separator = "-"*45
    row_txt = [header, separator]
    
    for item in stats_data:
        # Use log10 for p-value as it will likely be extremely small (e.g. 1e-100)
        pval_log = np.log10(item['P-Value'] + 1e-300) # avoid log(0)
        row = f"{item['Method']:<10} | {item['Cohen\'s D']:<10.4f} | {pval_log:<15.2f}"
        row_txt.append(row)
        
    full_txt = "\n".join(row_txt)
    # Place text in the middle of the empty subplot box
    ax_stats.text(0.1, 0.5, full_txt, fontsize=11, family='monospace', va='center')

    plt.suptitle(f"Separation Power Analysis\nLayer {layer_name} | Model: {model_type.capitalize()}", fontsize=16, y=1.02)
    plt.tight_layout()
    fig_name = f"figures/{model_type}_separation_histogram.png"
    plt.savefig(fig_name, dpi=150, bbox_inches='tight')
    plt.show()
    plt.close()


# ===========================
# RUN ANALYSIS 2
# ===========================

# Assumes 'datasets' dict from previous steps exists in notebook memory
TARGET_LAYER = 'model.layers.13.mlp' 

for model_type in ['base', 'instruct']:
    plot_separation_analysis(TARGET_LAYER, model_type, datasets)