## 0. Initial Setup

First, let's install all required libraries and set up Hugging Face authentication.

In [None]:
# Install required libraries
!pip install -r requirements.txt

# Install Hugging Face CLI if not already installed
!pip install huggingface_hub

### Hugging Face Authentication

You'll need to authenticate with Hugging Face to access Meta Llama 3 8B. You can either:

In [None]:
# Option 1: Login via CLI (interactive)
# Uncomment the line below to use CLI login
# !huggingface-cli login

# Option 2: Login programmatically (if you have a token)
from huggingface_hub import login
import os

# If you have HF_TOKEN environment variable set
if 'HF_TOKEN' in os.environ:
    login(token=os.environ['HF_TOKEN'])
    print("✅ Logged in using HF_TOKEN environment variable")
else:
    print("💡 Please either:")
    print("   1. Set HF_TOKEN environment variable with your token")
    print("   2. Uncomment the CLI login line above and run it")
    print("   3. Use login(token='your_token_here') below")
    
    # Uncomment and add your token here if needed:
    # login(token="your_huggingface_token_here")

In [None]:
# Verify setup
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name()}")

try:
    from transformers import AutoTokenizer
    print("✅ Transformers library loaded successfully")
except ImportError:
    print("❌ Transformers library not found - please install requirements.txt")

try:
    from huggingface_hub import HfApi
    api = HfApi()
    user = api.whoami()
    print(f"✅ Logged in to Hugging Face as: {user['name']}")
except Exception as e:
    print(f"⚠️ Hugging Face authentication issue: {e}")
    print("Please ensure you're logged in to access Meta Llama models")

print("\n🚀 Setup verification complete!")

# Vector Steering Experiment: Avoiding "Orange" Token

This notebook demonstrates how to use vector steering to prevent Meta Llama 3 8B from generating the token "orange". We'll extract steering vectors and apply them during generation to bias the model away from this specific token.

## 1. Setup and Imports

Let's start by importing all necessary libraries and setting up logging.

In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
from rich.console import Console
from rich.logging import RichHandler
import logging
from typing import List, Dict, Tuple, Optional
import warnings
from collections import defaultdict
import gc

# Set up rich console and logging
console = Console()
logging.basicConfig(
    level=logging.INFO,
    format="%(message)s",
    datefmt="[%X]",
    handlers=[RichHandler(console=console, rich_tracebacks=True)]
)
logger = logging.getLogger(__name__)

# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

console.print("✅ Setup complete!", style="bold green")

## 2. Load Model and Tokenizer

Load Meta Llama 3 8B model and tokenizer. We'll also set up activation hooks for extracting hidden states.

In [None]:
# Model configuration
MODEL_NAME = "meta-llama/Meta-Llama-3-8B"
TARGET_TOKEN = "orange"

# Global variables for activation capture
activations = {}
hooks = []

def activation_hook(name):
    """Hook function to capture activations from specific layers"""
    def hook(module, input, output):
        if isinstance(output, tuple):
            activations[name] = output[0].detach()
        else:
            activations[name] = output.detach()
    return hook

logger.info("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token

logger.info("Loading model... (this may take a while)")
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16,
    device_map="auto",
    trust_remote_code=True
)

# Get the target token ID
target_token_id = tokenizer.encode(TARGET_TOKEN, add_special_tokens=False)[0]
logger.info(f"Target token '{TARGET_TOKEN}' has ID: {target_token_id}")

# Register hooks on middle layers (we'll focus on layers 16-20 for Llama 3 8B)
target_layers = [16, 17, 18, 19, 20]
for layer_idx in target_layers:
    layer = model.model.layers[layer_idx]
    hook = layer.register_forward_hook(activation_hook(f"layer_{layer_idx}"))
    hooks.append(hook)

logger.info(f"Model loaded successfully! Total parameters: {model.num_parameters():,}")
console.print("✅ Model and tokenizer ready!", style="bold green")

## 3. Extract Baseline Activations

We'll generate baseline activations by running prompts that naturally lead to "orange" outputs and compare them with prompts that lead to other color words.

In [None]:
# Prompts that typically lead to "orange" 
orange_prompts = [
    "The color of a carrot is",
    "When you mix red and yellow, you get",
    "The sunset was painted in shades of",
    "A pumpkin is typically",
    "The fruit called an orange is",
    "Fire appears red, yellow, and",
    "Traffic cones are usually",
    "Basketball uniforms are often"
]

# Prompts that lead to other colors (control group)
other_color_prompts = [
    "The color of grass is",
    "The sky on a clear day is", 
    "Fresh snow is",
    "A ripe tomato is",
    "The ocean appears",
    "Chocolate is typically",
    "Coal is usually",
    "A ripe banana is"
]

def extract_activations(prompts, label):
    """Extract activations for a set of prompts"""
    all_activations = {f"layer_{i}": [] for i in target_layers}
    
    logger.info(f"Extracting activations for {label} prompts...")
    
    for prompt in tqdm(prompts, desc=f"Processing {label}"):
        # Clear previous activations
        activations.clear()
        
        # Tokenize and generate
        inputs = tokenizer(prompt, return_tensors="pt", padding=True).to(device)
        
        with torch.no_grad():
            # Force generation of the target token to capture relevant activations
            outputs = model.generate(
                **inputs,
                max_new_tokens=1,
                do_sample=False,
                pad_token_id=tokenizer.eos_token_id,
                output_hidden_states=True,
                return_dict_in_generate=True
            )
        
        # Store activations from the last token position
        for layer_name in activations:
            if layer_name in activations:
                # Get activation at the last position
                last_pos_activation = activations[layer_name][0, -1, :].cpu()
                all_activations[layer_name].append(last_pos_activation)
    
    # Convert to tensors
    for layer_name in all_activations:
        if all_activations[layer_name]:
            all_activations[layer_name] = torch.stack(all_activations[layer_name])
    
    return all_activations

# Extract activations for both groups
orange_activations = extract_activations(orange_prompts, "orange")
other_activations = extract_activations(other_color_prompts, "other colors")

console.print("✅ Baseline activations extracted!", style="bold green")

## 4. Create Steering Vector

Compute the steering vector by analyzing the difference between activations when the model tends toward "orange" vs other colors.

In [None]:
def compute_steering_vectors(orange_acts, other_acts):
    """Compute steering vectors for each layer"""
    steering_vectors = {}
    
    logger.info("Computing steering vectors...")
    
    for layer_name in orange_acts:
        if len(orange_acts[layer_name]) > 0 and len(other_acts[layer_name]) > 0:
            # Compute mean activations for each group
            orange_mean = orange_acts[layer_name].mean(dim=0)
            other_mean = other_acts[layer_name].mean(dim=0)
            
            # Steering vector points from "other" to "orange"
            # We'll subtract this to steer away from orange
            steering_vector = orange_mean - other_mean
            
            # Normalize the steering vector
            steering_vector = steering_vector / torch.norm(steering_vector)
            
            steering_vectors[layer_name] = steering_vector
            
            logger.info(f"{layer_name}: steering vector norm = {torch.norm(steering_vector):.4f}")
    
    return steering_vectors

# Compute steering vectors
steering_vectors = compute_steering_vectors(orange_activations, other_activations)

# Visualize the magnitude of steering vectors across layers
layer_numbers = [int(name.split('_')[1]) for name in steering_vectors.keys()]
vector_norms = [torch.norm(steering_vectors[name]).item() for name in steering_vectors.keys()]

plt.figure(figsize=(10, 6))
plt.bar(layer_numbers, vector_norms, alpha=0.7)
plt.xlabel('Layer Number')
plt.ylabel('Steering Vector Norm')
plt.title('Steering Vector Magnitudes Across Layers')
plt.grid(True, alpha=0.3)
plt.show()

console.print("✅ Steering vectors computed!", style="bold green")

## 5. Apply Steering to Model

Implement the steering mechanism by modifying the model's forward pass to subtract the steering vector from activations during inference.

In [None]:
class SteeringHook:
    """Hook class to apply steering during forward pass"""
    
    def __init__(self, steering_vectors, steering_strength=1.0):
        self.steering_vectors = steering_vectors
        self.steering_strength = steering_strength
        self.active = True
    
    def get_hook(self, layer_name):
        def hook(module, input, output):
            if not self.active or layer_name not in self.steering_vectors:
                return output
            
            if isinstance(output, tuple):
                hidden_states = output[0]
                other_outputs = output[1:]
            else:
                hidden_states = output
                other_outputs = ()
            
            # Apply steering by subtracting the steering vector
            steering_vec = self.steering_vectors[layer_name].to(hidden_states.device)
            
            # Apply to all positions and batch elements
            steered_states = hidden_states - self.steering_strength * steering_vec.unsqueeze(0).unsqueeze(0)
            
            if other_outputs:
                return (steered_states,) + other_outputs
            else:
                return steered_states
        
        return hook

# Remove old hooks
for hook in hooks:
    hook.remove()
hooks.clear()

# Create steering hook
steering_hook = SteeringHook(steering_vectors, steering_strength=2.0)

# Register new steering hooks
for layer_idx in target_layers:
    if f"layer_{layer_idx}" in steering_vectors:
        layer = model.model.layers[layer_idx]
        hook = layer.register_forward_hook(steering_hook.get_hook(f"layer_{layer_idx}"))
        hooks.append(hook)

logger.info(f"Steering hooks registered on {len(hooks)} layers")
console.print("✅ Steering mechanism active!", style="bold green")

## 6. Test Steering Effectiveness

Run test prompts that would normally generate "orange" and measure how often the steered model avoids outputting this token.

In [None]:
def test_model_generation(prompts, label, num_tokens=10, temperature=0.7):
    """Test model generation with and without steering"""
    results = {
        'prompts': [],
        'baseline_outputs': [],
        'steered_outputs': [],
        'baseline_has_orange': [],
        'steered_has_orange': [],
        'baseline_orange_prob': [],
        'steered_orange_prob': []
    }
    
    logger.info(f"Testing {label} prompts...")
    
    for prompt in tqdm(prompts, desc=f"Testing {label}"):
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        
        # Test baseline (without steering)
        steering_hook.active = False
        with torch.no_grad():
            baseline_output = model.generate(
                **inputs,
                max_new_tokens=num_tokens,
                temperature=temperature,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id,
                return_dict_in_generate=True,
                output_scores=True
            )
        
        # Test with steering
        steering_hook.active = True
        with torch.no_grad():
            steered_output = model.generate(
                **inputs,
                max_new_tokens=num_tokens,
                temperature=temperature,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id,
                return_dict_in_generate=True,
                output_scores=True
            )
        
        # Decode outputs
        baseline_text = tokenizer.decode(baseline_output.sequences[0], skip_special_tokens=True)
        steered_text = tokenizer.decode(steered_output.sequences[0], skip_special_tokens=True)
        
        # Check for "orange" in outputs
        baseline_has_orange = "orange" in baseline_text.lower()
        steered_has_orange = "orange" in steered_text.lower()
        
        # Get probability of "orange" token at first generation step
        baseline_probs = torch.softmax(baseline_output.scores[0][0], dim=-1)
        steered_probs = torch.softmax(steered_output.scores[0][0], dim=-1)
        
        baseline_orange_prob = baseline_probs[target_token_id].item()
        steered_orange_prob = steered_probs[target_token_id].item()
        
        # Store results
        results['prompts'].append(prompt)
        results['baseline_outputs'].append(baseline_text)
        results['steered_outputs'].append(steered_text)
        results['baseline_has_orange'].append(baseline_has_orange)
        results['steered_has_orange'].append(steered_has_orange)
        results['baseline_orange_prob'].append(baseline_orange_prob)
        results['steered_orange_prob'].append(steered_orange_prob)
    
    return results

# Test prompts that should naturally lead to "orange"
test_prompts = [
    "The color of a carrot is",
    "When you mix red and yellow paint, you get",
    "A pumpkin is typically colored",
    "The fruit called an orange is",
    "Traffic cones are usually painted",
    "At sunset, the sky often turns",
    "A basketball is typically",
    "Tiger fur has black stripes on an"
]

# Run tests
test_results = test_model_generation(test_prompts, "orange-prone", num_tokens=5, temperature=0.3)

console.print("✅ Testing complete!", style="bold green")

## 7. Evaluate Results

Analyze the steering effectiveness using metrics and visualizations.

In [None]:
# Create results DataFrame
df = pd.DataFrame(test_results)

# Calculate summary statistics
baseline_orange_rate = df['baseline_has_orange'].mean()
steered_orange_rate = df['steered_has_orange'].mean()
baseline_avg_prob = df['baseline_orange_prob'].mean()
steered_avg_prob = df['steered_orange_prob'].mean()

logger.info("=== STEERING RESULTS ===")
logger.info(f"Baseline 'orange' occurrence rate: {baseline_orange_rate:.2%}")
logger.info(f"Steered 'orange' occurrence rate: {steered_orange_rate:.2%}")
logger.info(f"Reduction in 'orange' occurrences: {(baseline_orange_rate - steered_orange_rate):.2%}")
logger.info(f"Baseline avg 'orange' probability: {baseline_avg_prob:.4f}")
logger.info(f"Steered avg 'orange' probability: {steered_avg_prob:.4f}")
logger.info(f"Probability reduction: {((baseline_avg_prob - steered_avg_prob) / baseline_avg_prob):.2%}")

# Visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. Orange occurrence comparison
axes[0,0].bar(['Baseline', 'Steered'], [baseline_orange_rate, steered_orange_rate], 
              color=['orange', 'blue'], alpha=0.7)
axes[0,0].set_ylabel('Orange Occurrence Rate')
axes[0,0].set_title('Orange Token Occurrence Rate')
axes[0,0].set_ylim(0, 1)

# 2. Probability comparison
axes[0,1].bar(['Baseline', 'Steered'], [baseline_avg_prob, steered_avg_prob], 
              color=['orange', 'blue'], alpha=0.7)
axes[0,1].set_ylabel('Average Orange Probability')
axes[0,1].set_title('Average Orange Token Probability')

# 3. Individual prompt comparison
prompt_indices = range(len(df))
axes[1,0].scatter(prompt_indices, df['baseline_orange_prob'], 
                  color='orange', alpha=0.7, label='Baseline')
axes[1,0].scatter(prompt_indices, df['steered_orange_prob'], 
                  color='blue', alpha=0.7, label='Steered')
axes[1,0].set_xlabel('Prompt Index')
axes[1,0].set_ylabel('Orange Probability')
axes[1,0].set_title('Orange Probability by Prompt')
axes[1,0].legend()

# 4. Probability reduction distribution
prob_reductions = df['baseline_orange_prob'] - df['steered_orange_prob']
axes[1,1].hist(prob_reductions, bins=10, alpha=0.7, color='green')
axes[1,1].set_xlabel('Probability Reduction')
axes[1,1].set_ylabel('Frequency')
axes[1,1].set_title('Distribution of Probability Reductions')

plt.tight_layout()
plt.show()

# Display detailed results
console.print("\n[bold]Detailed Results:[/bold]")
for i, row in df.iterrows():
    console.print(f"\n[bold cyan]Prompt {i+1}:[/bold cyan] {row['prompts']}")
    console.print(f"[yellow]Baseline:[/yellow] {row['baseline_outputs'][len(row['prompts']):]}")
    console.print(f"[blue]Steered:[/blue] {row['steered_outputs'][len(row['prompts']):]}")
    console.print(f"Orange prob: {row['baseline_orange_prob']:.4f} → {row['steered_orange_prob']:.4f}")

console.print("✅ Analysis complete!", style="bold green")

## 8. Cleanup and Summary

Clean up resources and provide a summary of the experiment.

In [None]:
# Remove hooks to clean up
for hook in hooks:
    hook.remove()
hooks.clear()

# Clear GPU memory
torch.cuda.empty_cache()
gc.collect()

# Summary
console.print("\n[bold green]🎯 EXPERIMENT SUMMARY[/bold green]")
console.print("="*50)
console.print(f"✓ Successfully loaded Meta Llama 3 8B model")
console.print(f"✓ Extracted steering vectors from layers {target_layers}")
console.print(f"✓ Applied vector steering to reduce 'orange' token generation")
console.print(f"✓ Tested on {len(test_prompts)} prompts")
console.print(f"✓ Achieved {((baseline_orange_rate - steered_orange_rate)):.2%} reduction in 'orange' occurrences")
console.print(f"✓ Reduced average 'orange' probability by {((baseline_avg_prob - steered_avg_prob) / baseline_avg_prob):.2%}")

logger.info("Experiment completed successfully! 🚀")