### Setup

In [None]:
!pip install -r latent_adversarial_training/requirements.txt

In [None]:
import torch
import functools
import einops
import requests
import pandas as pd
import io
import gc
import numpy as np
import matplotlib.pyplot as plt
import json
from pathlib import Path

from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from tqdm import tqdm
from torch import Tensor
from typing import List, Dict, Optional, Tuple
from transformer_lens import HookedTransformer, utils
from transformer_lens.hook_points import HookPoint
from jaxtyping import Float, Int
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
from huggingface_hub import login as hf_login
import os

In [None]:
def authenticate_hf():
    token = os.environ.get("HF_TOKEN")
    if token:
        hf_login(token=token)
        print("Auth passed with HF_TOKEN ")
    else:
        try:
            hf_login()
            print("Auth passed via HF portal")
        except Exception as e:
            print(f"Auth failed: {e}")
            print("Run 'huggingface-cli login' or set HF_TOKEN")
            raise

authenticate_hf()

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
INFERENCE_DTYPE = torch.float16
SEED = 3252

LLAMA_2_7B_CHAT_PATH = "meta-llama/Llama-2-7b-chat-hf"
PEFT_MODEL_PATH_LAT = "nlpett/llama-2-7b-chat-hf-LAT-layer4-hh"
PEFT_MODEL_PATH_AT = "nlpett/llama-2-7b-chat-hf-AT-hh"
print(f"Device: {DEVICE}")

LLAMA_CHAT_TEMPLATE = "\n[INST]{prompt}[/INST]\n"

OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"Output dir: {OUTPUT_DIR.resolve()}")

def set_seed(seed: int):
    torch.manual_seed(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)

set_seed(SEED)

### Load data & test data

In [None]:
def get_harmful_instructions() -> Tuple[List[str], List[str]]:
    url = 'https://raw.githubusercontent.com/llm-attacks/llm-attacks/main/data/advbench/harmful_behaviors.csv'
    response = requests.get(url)
    dataset = pd.read_csv(io.StringIO(response.content.decode('utf-8')))
    instructions = dataset['goal'].tolist()
    train, test = train_test_split(instructions, test_size=0.2, random_state=42)
    return train, test


def get_harmless_instructions() -> Tuple[List[str], List[str]]:
    hf_path = 'tatsu-lab/alpaca'
    dataset = load_dataset(hf_path)
    instructions = [
        dataset['train'][i]['instruction'] 
        for i in range(len(dataset['train']))
        if dataset['train'][i]['input'].strip() == '' # filter instructions wihtout inputs
    ]
    train, test = train_test_split(instructions, test_size=0.2, random_state=42)
    return train, test

print("Loading datasets...")
harmful_train, harmful_test = get_harmful_instructions()
harmless_train, harmless_test = get_harmless_instructions()

print(f"Harmful instructions: {len(harmful_train)} train, {len(harmful_test)} test")
print(f"Harmless instructions: {len(harmless_train)} train, {len(harmless_test)} test")

print("\nHarmful examples:")
for i in range(3):
    print(f"  {i+1}. {harmful_train[i][:80]}...")
print("\nHarmless examples:")
for i in range(3):
    print(f"  {i+1}. {harmless_train[i][:80]}...")

: 

### Load models & get activations

In [None]:
def load_base_model() -> HookedTransformer:
    print("Loading baseline model...")
    base_model = AutoModelForCausalLM.from_pretrained(
        LLAMA_2_7B_CHAT_PATH,
        torch_dtype=INFERENCE_DTYPE,
        device_map="auto",
    )
    
    hooked_model = HookedTransformer.from_pretrained_no_processing(
        LLAMA_2_7B_CHAT_PATH,
        hf_model=base_model,
        dtype=INFERENCE_DTYPE,
        device=DEVICE,
        default_padding_side='left',
    )
    hooked_model.tokenizer.padding_side = 'left'
    hooked_model.tokenizer.pad_token = "[PAD]"
    
    del base_model
    gc.collect()
    torch.cuda.empty_cache()
    
    return hooked_model

def load_peft_model(peft_path: str, model_name: str = "LAT") -> HookedTransformer:
    print(f"Loading {model_name} model from {peft_path}...")
    
    base_model = AutoModelForCausalLM.from_pretrained(
        LLAMA_2_7B_CHAT_PATH,
        torch_dtype=INFERENCE_DTYPE,
        device_map="auto",
    )
    # merge PEFT adapter    
    peft_model = PeftModel.from_pretrained(base_model, peft_path)
    peft_model = peft_model.to(INFERENCE_DTYPE)
    
    try:
        merged_model = peft_model.merge_and_unload()
        print(f"Successfully merged {model_name} adapter.")
    except AttributeError:
        print(f"{model_name} doesn't support merging, using as-is.")
        merged_model = peft_model
    
    merged_model.eval()
    
    hooked_model = HookedTransformer.from_pretrained_no_processing(
        LLAMA_2_7B_CHAT_PATH,
        hf_model=merged_model,
        dtype=INFERENCE_DTYPE,
        device=DEVICE,
        default_padding_side='left',
    )
    hooked_model.tokenizer.padding_side = 'left'
    hooked_model.tokenizer.pad_token = "[PAD]"
    
    del merged_model, peft_model, base_model
    gc.collect()
    torch.cuda.empty_cache()
    
    return hooked_model

# Load both models into a dictionary
# Can fit both on a 80GB A100 ~14GB each in fp16

models = {}

print("Loading baseline...")
models['Baseline'] = load_base_model()
print(f"  Baseline loaded: {models['Baseline'].cfg.n_layers} layers, {models['Baseline'].cfg.d_model} hidden dim")

print("\nLoading LAT-adapter...")
models['LAT'] = load_peft_model(PEFT_MODEL_PATH_LAT, "LAT")
print(f"  LAT loaded: {models['LAT'].cfg.n_layers} layers, {models['LAT'].cfg.d_model} hidden dim")

In [None]:
if torch.cuda.is_available():
    allocated = torch.cuda.memory_allocated() / 1e9
    reserved = torch.cuda.memory_reserved() / 1e9
    print(f"\nGPU Memory: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved")

In [None]:
def tokenize_instructions(
    tokenizer: AutoTokenizer,
    instructions: List[str]
) -> Int[Tensor, 'batch_size seq_len']:
    """Tokenize instructions using Llama-2 chat template."""
    prompts = [LLAMA_CHAT_TEMPLATE.format(prompt=instruction) for instruction in instructions]
    return tokenizer(prompts, padding=True, truncation=False, return_tensors="pt").input_ids

def get_activations(
    model: HookedTransformer,
    instructions: List[str],
    layer: int,
    batch_size: int = 32,
    position: int = -1  # Last token by default
) -> torch.Tensor:
    # Get activations at layer n for a given instruction
    # run_with_cache for residual stream activations as we go

    tokenize_fn = functools.partial(tokenize_instructions, tokenizer=model.tokenizer)
    activations = []
    
    for i in tqdm(range(0, len(instructions), batch_size), desc=f"Layer {layer}"):
        batch = instructions[i:i+batch_size]
        toks = tokenize_fn(instructions=batch).to(DEVICE)
        
        with torch.no_grad():
            _, cache = model.run_with_cache( # fw pass and cache intermediate residual stream activations
                toks,
                names_filter=lambda name: f'blocks.{layer}.hook_resid_pre' in name
            )
        activations.append(cache[f'blocks.{layer}.hook_resid_pre'][:, position, :]) 
    
    return torch.cat(activations, dim=0)

def get_activations_all_layers(
    model: HookedTransformer,
    instructions: List[str],
    layers: List[int],
    batch_size: int = 16,
    position: int = -1
) -> Dict[int, torch.Tensor]:
    # Get activations at multiple layers
    tokenize_fn = functools.partial(tokenize_instructions, tokenizer=model.tokenizer)
    activations = {l: [] for l in layers}
    
    for i in tqdm(range(0, len(instructions), batch_size), desc="Extracting activations"):
        batch = instructions[i:i+batch_size]
        toks = tokenize_fn(instructions=batch).to(DEVICE)
        
        with torch.no_grad():
            _, cache = model.run_with_cache(
                toks,
                names_filter=lambda name: 'resid_pre' in name
            )
        
        for l in layers:
            activations[l].append(cache['resid_pre', l][:, position, :])
    
    return {l: torch.cat(acts, dim=0) for l, acts in activations.items()}

### Compute refusal vectors

The difference of mean activations between harmful and harmless prompts. L14 has highest efficacy of refusal direction ablation aross all model variants - see 14.3 in https://arxiv.org/pdf/2504.18872

In [None]:
# global refusal vector computation confiog
N_SAMPLES = 100  # samples to compute vector
TARGET_LAYER = 14  # target activation layer

harmful_acts = {}   # {model_name: tensor}
harmless_acts = {}  # {model_name: tensor}

for model_name, model in models.items():
    print(f"\n{'='*50}")
    print(f"     activations for {model_name} at layer {TARGET_LAYER}...")
    print(f"{'='*50}")
    
    harmful_acts[model_name] = get_activations(model, harmful_train[:N_SAMPLES], TARGET_LAYER, batch_size=16)
    harmless_acts[model_name] = get_activations(model, harmless_train[:N_SAMPLES], TARGET_LAYER, batch_size=16)
    
    print(f"Harmful shape: {harmful_acts[model_name].shape}")
    print(f"Harmless shape: {harmless_acts[model_name].shape}")

refusal_dirs = {}

for model_name in models.keys():
    harmful_mean = harmful_acts[model_name].mean(dim=0)
    harmless_mean = harmless_acts[model_name].mean(dim=0)
    
    refusal_dir = harmful_mean - harmless_mean
    refusal_dirs[model_name] = {
        'raw': refusal_dir,
        'normalized': refusal_dir / refusal_dir.norm(),
        'norm': refusal_dir.norm().item()
    }
    print(f"{model_name}: refusal direction norm = {refusal_dir.norm().item():.4f}")

### Analysis of selected layer and refusal vector composition

#### Cosine similarity of baseline and LAT refusal vectors

In [None]:
# cosine similarity of baseline and LAT directions
cos_sim = torch.cosine_similarity(
    refusal_dirs['Baseline']['normalized'].unsqueeze(0),
    refusal_dirs['LAT']['normalized'].unsqueeze(0)
).item()
print(f"\nCosine similarity between Baseline and LAT refusal directions: {cos_sim:.4f}")

#### PCA of layer specific (14) activations in baseline and LAT-adapted models

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(16, 7))

for idx, (model_name, ax) in enumerate(zip(models.keys(), axes)):
    h_acts = harmful_acts[model_name]
    hl_acts = harmless_acts[model_name]
    
    all_acts = torch.cat([hl_acts, h_acts], dim=0).cpu().float().numpy()
    
    pca = PCA(n_components=2)
    acts_pca = pca.fit_transform(all_acts)
    
    n_harmless = len(hl_acts)
    ax.scatter(acts_pca[:n_harmless, 0], acts_pca[:n_harmless, 1], 
               c='blue', alpha=0.6, label='Harmless', s=50)
    ax.scatter(acts_pca[n_harmless:, 0], acts_pca[n_harmless:, 1], 
               c='red', alpha=0.6, label='Harmful', s=50)
    
    harmless_pca_mean = acts_pca[:n_harmless].mean(axis=0)
    harmful_pca_mean = acts_pca[n_harmless:].mean(axis=0)
    ax.scatter(*harmless_pca_mean, c='blue', s=200, marker='X', edgecolors='black', linewidths=2)
    ax.scatter(*harmful_pca_mean, c='red', s=200, marker='X', edgecolors='black', linewidths=2)
    
    ax.annotate('', xy=harmful_pca_mean, xytext=harmless_pca_mean,
                arrowprops=dict(arrowstyle='->', color='green', lw=3))
    
    ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]*100:.1f}% variance)')
    ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]*100:.1f}% variance)')
    ax.set_title(f'{model_name} Model\nLayer {TARGET_LAYER}')
    ax.legend(loc='upper right')
    ax.grid(True, alpha=0.3)

plt.suptitle('PCA of Activations: Baseline vs LAT', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(OUTPUT_DIR / f'pca_layer{TARGET_LAYER}_comparison.png', dpi=150)
plt.show()

#### Plot SVD concentration across layers

In [None]:
# Plot SVD concentration comparison
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

colors = {'Baseline': 'blue', 'LAT': 'orange'}
markers = {'Baseline': 'o', 'LAT': 's'}

# Left plot: Top-2 variance by layer for both models
for model_name in models.keys():
    layers = list(svd_results[model_name].keys())
    top2_var = [svd_results[model_name][l]['top_k_variance'] * 100 for l in layers]
    axes[0].plot(layers, top2_var, f'{markers[model_name]}-', 
                 linewidth=2, markersize=8, color=colors[model_name], label=model_name)

axes[0].axhline(y=44, color='gray', linestyle=':', alpha=0.7, label='Abbas ref: Baseline (44%)')
axes[0].axhline(y=75, color='gray', linestyle='--', alpha=0.7, label='Abbas ref: LAT (75%)')
axes[0].set_xlabel('Layer', fontsize=12)
axes[0].set_ylabel('Variance in Top-2 Components (%)', fontsize=12)
axes[0].set_title('SVD Concentration by Layer', fontsize=14)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].set_ylim(0, 100)

# Right plot: Singular value spectrum at layer 14 for both models
width = 0.35
x = np.arange(20)
for i, model_name in enumerate(models.keys()):
    var_exp = svd_results[model_name][TARGET_LAYER]['variance_explained'][:20] * 100
    axes[1].bar(x + i*width - width/2, var_exp, width, 
                label=model_name, color=colors[model_name], alpha=0.7, edgecolor='black')

axes[1].set_xlabel('Component Index', fontsize=12)
axes[1].set_ylabel('Variance Explained (%)', fontsize=12)
base_top2 = svd_results['Baseline'][TARGET_LAYER]['top_k_variance'] * 100
lat_top2 = svd_results['LAT'][TARGET_LAYER]['top_k_variance'] * 100
axes[1].set_title(f'Singular Value Spectrum at Layer {TARGET_LAYER}\nBaseline: {base_top2:.1f}% | LAT: {lat_top2:.1f}%', fontsize=14)
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig(OUTPUT_DIR / 'svd_concentration_comparison.png', dpi=150)
plt.show()

In [None]:
LAYERS_TO_ANALYZE = [0, 4, 8, 12, 14, 16, 20, 24, 28, 31]

svd_results = {model_name: {} for model_name in models.keys()}

for model_name, model in models.items():
    print(f"\n{'='*50}")
    print(f"SVD analysis for {model_name}")
    print(f"{'='*50}")
    
    for layer in tqdm(LAYERS_TO_ANALYZE, desc=f"{model_name}"):
        harmful_layer = get_activations(model, harmful_train[:N_SAMPLES], layer, batch_size=16)
        harmless_layer = get_activations(model, harmless_train[:N_SAMPLES], layer, batch_size=16)
        
        directions = compute_difference_directions(harmful_layer, harmless_layer)
        
        svd_results[model_name][layer] = compute_svd_concentration(directions, top_k=2)
        
        del harmful_layer, harmless_layer, directions
        torch.cuda.empty_cache()

print("\n" + "="*60)
print("SVD Concentration Comparison (variance in top-2 components)")
print("="*60)
print(f"{'Layer':<8} {'Baseline':<12} {'LAT':<12} {'Î”':<10}")
print("-"*42)
for layer in LAYERS_TO_ANALYZE:
    base = svd_results['Baseline'][layer]['top_k_variance'] * 100
    lat = svd_results['LAT'][layer]['top_k_variance'] * 100
    delta = lat - base
    print(f"{layer:<8} {base:<12.1f} {lat:<12.1f} {delta:+.1f}")

---
## IGNORE FOR NOW: GRAM MATRIX AND INTERFERENCE ANALYSIS
The Gram matrix G = D @ D^T captures pairwise similarities between difference directions. Off-diagonal elements measure "interference" - how much different prompts' refusal directions overlap.

**Key metrics:**
- Mean off-diagonal: Average |G_ij| for i â‰  j (lower = less interference)
- Interference Frobenius: ||G - diag(G)||_F (total interference magnitude)

In [None]:
def compute_gram_matrix(directions: torch.Tensor, normalize: bool = True) -> torch.Tensor:
    """
    Compute Gram matrix G = D @ D^T
    If normalize=True, computes cosine similarity matrix instead.
    """
    if normalize:
        norms = directions.norm(dim=1, keepdim=True)
        norms = torch.clamp(norms, min=1e-8)
        directions_normed = directions / norms
        gram = directions_normed @ directions_normed.T
    else:
        gram = directions @ directions.T
    return gram


def compute_interference_metrics(gram: torch.Tensor) -> dict:
    """
    Compute interference metrics from Gram matrix.
    """
    n = gram.shape[0]
    
    # Extract off-diagonal elements
    mask = ~torch.eye(n, dtype=torch.bool, device=gram.device)
    off_diag = gram[mask]
    
    # Mean off-diagonal magnitude
    mean_off_diagonal = off_diag.abs().mean().item()
    
    # Interference Frobenius norm: ||G - diag(G)||_F
    diag_matrix = torch.diag(torch.diag(gram))
    interference_frobenius = (gram - diag_matrix).norm('fro').item()
    
    # Standard deviation of off-diagonal
    off_diagonal_std = off_diag.std().item()
    
    return {
        'mean_off_diagonal': mean_off_diagonal,
        'interference_frobenius': interference_frobenius,
        'off_diagonal_std': off_diagonal_std,
        'off_diagonal_values': off_diag.cpu().numpy()
    }

In [None]:
# Compute Gram matrix and interference for BOTH models at layer 14
interference_results = {}

for model_name in models.keys():
    # Compute difference directions
    directions = compute_difference_directions(harmful_acts[model_name], harmless_acts[model_name])
    
    # Compute normalized Gram matrix (cosine similarities)
    gram = compute_gram_matrix(directions, normalize=True)
    
    # Compute interference metrics
    interference = compute_interference_metrics(gram)
    interference['gram'] = gram
    interference['directions'] = directions
    interference_results[model_name] = interference

# Print comparison
print("="*60)
print(f"Interference Metrics at Layer {TARGET_LAYER}")
print("="*60)
print(f"{'Metric':<30} {'Baseline':<15} {'LAT':<15}")
print("-"*60)
for metric in ['mean_off_diagonal', 'interference_frobenius', 'off_diagonal_std']:
    base = interference_results['Baseline'][metric]
    lat = interference_results['LAT'][metric]
    print(f"{metric:<30} {base:<15.4f} {lat:<15.4f}")

In [None]:
# Visualize Gram matrices for BOTH models
fig, axes = plt.subplots(2, 2, figsize=(14, 12))

for idx, model_name in enumerate(models.keys()):
    gram = interference_results[model_name]['gram']
    off_diag = interference_results[model_name]['off_diagonal_values']
    
    # Gram matrix heatmap
    im = axes[idx, 0].imshow(gram.cpu().numpy(), cmap='RdBu_r', vmin=-1, vmax=1)
    axes[idx, 0].set_title(f'{model_name}: Cosine Similarity Matrix', fontsize=12)
    axes[idx, 0].set_xlabel('Prompt Pair Index')
    axes[idx, 0].set_ylabel('Prompt Pair Index')
    plt.colorbar(im, ax=axes[idx, 0], label='Cosine Similarity')
    
    # Histogram of off-diagonal values
    axes[idx, 1].hist(off_diag, bins=50, edgecolor='black', alpha=0.7, 
                      color='blue' if model_name == 'Baseline' else 'orange')
    axes[idx, 1].axvline(x=0, color='red', linestyle='--', linewidth=2, label='Zero')
    axes[idx, 1].axvline(x=off_diag.mean(), color='green', linestyle='--', linewidth=2, 
                         label=f'Mean: {off_diag.mean():.3f}')
    mean_abs = interference_results[model_name]['mean_off_diagonal']
    axes[idx, 1].set_title(f'{model_name}: Off-Diagonal Distribution\nMean |sim|: {mean_abs:.3f}', fontsize=12)
    axes[idx, 1].set_xlabel('Cosine Similarity')
    axes[idx, 1].set_ylabel('Count')
    axes[idx, 1].legend()
    axes[idx, 1].grid(True, alpha=0.3, axis='y')

plt.suptitle(f'Gram Matrix Analysis at Layer {TARGET_LAYER}', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(OUTPUT_DIR / f'gram_matrix_layer{TARGET_LAYER}_comparison.png', dpi=150)
plt.show()

---
## IGNORE FOR NOW: Baseline Model Measurements

This establishes baseline measurements for the key metrics. Next steps:
1. Load LAT model and repeat measurements
2. Compare concentration and interference between Baseline vs LAT
3. Test whether higher concentration correlates with lower interference

In [None]:
# Summary comparison
print("=" * 70)
print("COMPARISON SUMMARY: Baseline vs LAT")
print("=" * 70)
print(f"\nLayer {TARGET_LAYER} (refusal-critical layer):")
print(f"\n{'Metric':<35} {'Baseline':<15} {'LAT':<15} {'Change':<15}")
print("-" * 70)

# SVD Concentration
base_svd = svd_results['Baseline'][TARGET_LAYER]['top_k_variance'] * 100
lat_svd = svd_results['LAT'][TARGET_LAYER]['top_k_variance'] * 100
print(f"{'SVD Top-2 Variance (%)':<35} {base_svd:<15.1f} {lat_svd:<15.1f} {lat_svd - base_svd:+.1f}")

# Interference metrics
base_interf = interference_results['Baseline']['mean_off_diagonal']
lat_interf = interference_results['LAT']['mean_off_diagonal']
print(f"{'Mean Off-Diagonal |cos sim|':<35} {base_interf:<15.4f} {lat_interf:<15.4f} {lat_interf - base_interf:+.4f}")

base_frob = interference_results['Baseline']['interference_frobenius']
lat_frob = interference_results['LAT']['interference_frobenius']
print(f"{'Interference Frobenius Norm':<35} {base_frob:<15.4f} {lat_frob:<15.4f} {lat_frob - base_frob:+.4f}")

print(f"\nReference values from Abbas et al.:")
print(f"  Baseline concentration: ~44%")
print(f"  LAT concentration:      ~75%")
print("=" * 70)

# Key finding
print("\nðŸ”‘ KEY FINDING:")
if lat_svd > base_svd and lat_interf < base_interf:
    print("   LAT shows HIGHER concentration AND LOWER interference")
    print("   â†’ Supports hypothesis that concentration reduces interference")
elif lat_svd > base_svd and lat_interf >= base_interf:
    print("   LAT shows HIGHER concentration BUT similar/higher interference")
    print("   â†’ Concentration and interference may be independent phenomena")
else:
    print("   Results need further investigation")

# Save results
all_results = {}
for model_name in models.keys():
    all_results[model_name] = {
        'layer': TARGET_LAYER,
        'n_samples': N_SAMPLES,
        'svd_top2_variance': svd_results[model_name][TARGET_LAYER]['top_k_variance'],
        'mean_off_diagonal': interference_results[model_name]['mean_off_diagonal'],
        'interference_frobenius': interference_results[model_name]['interference_frobenius'],
    }

with open(OUTPUT_DIR / 'comparison_results.json', 'w') as f:
    json.dump(all_results, f, indent=2)
print(f"\nResults saved to {OUTPUT_DIR / 'comparison_results.json'}")