## Imports

In [None]:
!pip install transformers transformer_lens tqdm matplotlib pandas numpy scikit-learn seaborn --quiet

In [2]:
import os
import json
import re
from datetime import datetime

import pandas as pd
import numpy as np
from tqdm import tqdm

import torch
import transformers
from transformer_lens import HookedTransformer
from transformers import AutoTokenizer

In [None]:
!nvidia-smi

In [4]:
os.environ["HF_TOKEN"] = ""

In [None]:
model_path = "meta-llama/Llama-3.1-8B-Instruct"
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the model
model = HookedTransformer.from_pretrained(model_path, device=device)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.1-8B-Instruct")
tokenizer.pad_token = tokenizer.eos_token
model.eval()

In [8]:
with open('contrastive_dataset.json', 'r') as f:
    contrastive_dataset = json.load(f)

with open('test_dataset.json', 'r') as f:
    test_dataset = json.load(f)

In [None]:
print("steering dataset length: " + str(len(contrastive_dataset)))
print("test dataset length: " + str(len(test_dataset)))

print(contrastive_dataset[12])
print(test_dataset[12])

## generate with hook

In [37]:
def generate_text(model, prompt, layer, steering_vector=None, magnitude=0.0, max_tokens=10, temperature=0.0):
    def steering_hook(activations, hook):
        if steering_vector is not None:
            prompt_length = model.to_tokens(prompt).shape[1]
            print(f"\nSteering Hook Debug:")
            print(f"- Activation shape: {activations.shape}")
            print(f"- Prompt length: {prompt_length}")
            print(f"- Steering vector shape: {steering_vector.shape}")
            print(f"- Magnitude: {magnitude}")

     
            if activations.shape[1] == 1 and not torch.isnan(steering_vector).all(): #if generated tokens forward passes 1, 1, d_model and not layer 0

                print("Applying steering vector...")
                print(f"- Pre-steering activation")
                print(activations[-1])

                activations[0, :, :] += magnitude * steering_vector 

                print(f"- Post-steering activation") 
                print(activations[-1])                

            elif torch.isnan(steering_vector).all():
                print("Steering vector has all zeros - skipping steering")
                
                
            else:
                print(activations.shape)
                print("Skipping steering (processing prompt)")
                

        return activations

    print(f"\nGenerate Text Debug:")
    print(f"- Layer: {layer}")
    print(f"- Magnitude: {magnitude}")
    print(f"- Max tokens: {max_tokens}")
    print(f"- Temperature: {temperature}")

    hooks = [(f"blocks.{layer}.hook_resid_pre", steering_hook)] if (steering_vector is not None and layer is not None) else []
    
    with model.hooks(hooks):
        tokens = model.to_tokens(prompt)
        print(f"- Input tokens shape: {tokens.shape}")
        output = model.generate(
            input=tokens,
            max_new_tokens=max_tokens,
            temperature=temperature
        )
        print(f"- Output tokens shape: {output.shape}")
    
    return model.to_string(output)

In [11]:
def parse_llama_output(output):
    """
    Parse Llama output to extract the answer (A) or (B)
    """
    print(f"\nParse Output Debug:")
    print(f"Raw output type: {type(output)}")
    print(f"Raw output: {output}")
    
    try:
        cleaned_output = output[0]
        print(f"Output: {cleaned_output}")
        
        answer_part = cleaned_output.split("assistant<|end_header_id|>")[-1]    
        answer_part = answer_part.split("<|eot_id|>")[0].strip()
        
        answer = answer_part.split()[0].strip()
        print(f"Final answer: {answer}")
        
        if answer not in ["(A)", "(B)"]:
            print(f"Warning: Unexpected answer format: {answer}")
        
        return answer
    except Exception as e:
        print(f"Error parsing output: {str(e)}")
        return None

## test steering vectors

In [31]:
def test_steering_vectors(model, test_dataset, vectors, magnitudes, output_dirs):
    print(f"\nInitial vectors check:")
    if not vectors:
        print("Warning: No steering vectors provided")
    for layer, vector in vectors.items():
        print(f"Layer {layer} vector stats:")
        print(f"- Shape: {vector.shape}")
        print(f"- Max: {torch.max(vector):.6f}")
        print(f"- Min: {torch.min(vector):.6f}")
        if torch.isnan(vector).any():
            print(f"Warning: NaN values in vector for layer {layer}")

    results = []

    layers_to_test = vectors.keys()
    magnitudes_to_test = magnitudes

    for layer in tqdm(layers_to_test, desc="Testing layers", position=0, leave=True):
        for magnitude in tqdm(magnitudes, desc=f"Layer {layer} magnitudes", position=1, leave=False):
            print(f"\n{'='*50}")
            print(f"Testing Layer {layer} with magnitude {magnitude}")
            print(f"{'='*50}")
            
            correct = 0
            total = 0
            for item in test_dataset:
                print(f"\n{'-'*30}")
                print(f"Testing item {total+1}")
                print(f"Question: {item['question']}")
                print(f"Expected answer: {item['positive']}")

                prompt = ""
                prefix = "Reply with just (A) or (B)"
                prompt += f"<|start_header_id|>system<|end_header_id|>{prefix}<|eot_id|>"
                prompt += f"<|start_header_id|>user<|end_header_id|>{item['question']}\n\nAnswer:<|eot_id|><|start_header_id|>assistant<|end_header_id|>"
                
                print(f"\nPrompt structure:")
                print(f"Length: {len(prompt)}")
                print(f"Tokens: {model.to_tokens(prompt).shape}")

                steering_vector = vectors[layer]
                
                output = generate_text(
                    model, 
                    prompt=prompt, 
                    layer=layer, 
                    steering_vector=steering_vector, 
                    magnitude=magnitude,
                    max_tokens=20,
                    temperature=0.0
                )
                print(f"\nRaw output: {output}")
                
                answer = parse_llama_output(output)
                
                if answer:
                    correct += int(answer == item["positive"])
                    print(f"\nResult: {'✓ Correct' if answer == item['positive'] else '✗ Incorrect'}")
                    print(f"\nExpected: {item['positive']}, Got: {answer}")
                else:
                    print(f"\nWarning: Couldn't parse output: {output}")
                total += 1
            
            accuracy = correct/total
            print(f"\nSummary for Layer {layer}, Magnitude {magnitude}:")
            print(f"Correct: {correct}/{total}")
            print(f"Accuracy: {accuracy:.2%}")
            
            results.append({
                'layer': layer,
                'magnitude': magnitude,
                'accuracy': accuracy
            })
    
    results_df = pd.DataFrame(results)
    print("\nFinal Results DataFrame:")
    print(results_df)
    
    filename = 'steering_results.csv'
    results_df.to_csv(
        os.path.join(output_dirs, filename), 
        index=False
    )
    
    return results_df

In [22]:
def test_baseline_vectors(model, test_dataset, output_dirs):
    results = []
    correct = 0
    total = 0
    
    for item in test_dataset:
        print(f"\nTesting item {total+1}")
        print(f"Question: {item['question']}")
        print(f"Expected answer: {item['positive']}")

        prompt = ""
        prefix = "Reply with just (A) or (B)"
        prompt += f"<|start_header_id|>system<|end_header_id|>{prefix}<|eot_id|>"
        prompt += f"<|start_header_id|>user<|end_header_id|>{item['question']}\n\nAnswer:<|eot_id|><|start_header_id|>assistant<|end_header_id|>"
        
        output = generate_text(
            model, 
            prompt=prompt, 
            layer=None, 
            steering_vector=None, 
            magnitude=0.0,
            max_tokens=20,
            temperature=0.0
        )
        
        answer = parse_llama_output(output)
        
        if answer:
            correct += int(answer == item["positive"])
        total += 1
    
    accuracy = correct/total
    results.append({
        'accuracy': accuracy
    })
    print(accuracy)
    
    results_df = pd.DataFrame(results)
    results_df.to_csv(os.path.join(output_dirs, 'baseline_results.csv'), index=False)
    
    return results_df

## setup dirs

In [None]:
def load_vectors_from_dir(directory):
    """Load all vectors from a directory"""
    vectors = {}
    for filename in os.listdir(directory):
        if filename.startswith('layer_') and filename.endswith('.pt'):
            layer = int(filename.split('_')[1].split('.')[0])
            vectors[layer] = torch.load(os.path.join(directory, filename))
            print(vectors[layer])
    return vectors

existing_dir = './0250120_093303'
if existing_dir is not None:
    normalized_vectors = load_vectors_from_dir(os.path.join(existing_dir, 'normalized_vectors'))
    print(f"Loaded {len(normalized_vectors)} normalized vectors from {existing_dir}")

## baseline test

In [None]:
print("\nTesting model baseline")
baseline_results = test_baseline_vectors(
    model, 
    test_dataset,
    output_dirs=existing_dir
)

## steering test

In [None]:
print("\nTesting steering vectors...")
magnitudes = np.arange(-3, 3.5, 0.5)
steering_results = test_steering_vectors(
    model, 
    test_dataset, 
    vectors=normalized_vectors, 
    magnitudes=magnitudes, 
    output_dirs=existing_dir,
)

## plot results

In [None]:
import matplotlib.pyplot as plt
# Create plot
plt.figure(figsize=(12, 8))
for layer in steering_results['layer'].unique():
    layer_data = steering_results[steering_results['layer'] == layer]
    plt.plot(layer_data['magnitude'], layer_data['accuracy'], label=f'Layer {layer}')

plt.xlabel('Steering Magnitude')
plt.ylabel('Accuracy')
plt.title('Steering Effect by Layer and Magnitude')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(os.path.join(output_dirs['results'], 'steering_plot.png'))
plt.close()


In [None]:
# Calculate subplot grid dimensions
n_layers = len(steering_results['layer'].unique())
n_cols = 6  # You can adjust this
n_rows = (n_layers + n_cols - 1) // n_cols  # Ceiling division

# Create subplot figure
plt.figure(figsize=(15, 4*n_rows))

for i, layer in enumerate(sorted(steering_results['layer'].unique())):
    # Create subplot
    plt.subplot(n_rows, n_cols, i+1)
    
    # Plot data for this layer
    layer_data = steering_results[steering_results['layer'] == layer]
    plt.plot(layer_data['magnitude'], layer_data['accuracy'], 'b-o')
    
    # Add horizontal line for baseline accuracy
    plt.axhline(y=baseline_accuracy, color='r', linestyle='--', label='Baseline')
    
    # Customize subplot
    plt.title(f'Layer {layer}')
    plt.xlabel('Steering Magnitude')
    plt.ylabel('Accuracy')
    plt.grid(True, alpha=0.3)
    plt.legend()

# Adjust layout
plt.tight_layout()
plt.savefig(os.path.join(output_dirs['results'], 'steering_plot_subplots.png'))
plt.close()