<a href="https://colab.research.google.com/github/ChuloIva/COT-steering/blob/main/steering_vectors_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Steering Vectors Test Notebook

This notebook is designed to test steering vectors using cached data from the COT-steering project. It loads pre-computed feature vectors and demonstrates emotional reasoning steering.

## Key Features:
- ✅ Google Colab compatible
- ✅ Loads cached vectors from `results/cache/`
- ✅ Implements emotional steering functionality
- ✅ Uses depressive-normal dichotomy approach
- ✅ Safety-focused implementation

⚠️ **Research Use Only**: This tool is intended for research purposes with proper ethical oversight.

## 1. Setup and Dependencies

In [None]:
# Install required packages
!pip install torch transformers nnsight openai anthropic python-dotenv tqdm matplotlib seaborn pandas numpy
!pip install huggingface_hub

print("✅ Packages installed successfully!")

In [None]:
# Google Colab specific setup
import os
import sys

# Check if running in Google Colab
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    
    # Clone the repository if not already present
    if not os.path.exists('./COT-steering'):
        !git clone https://github.com/ChuloIva/COT-steering
    
    os.chdir('./COT-steering')
    print("Current working directory:", os.getcwd())
    
    # Optional: Link to Google Drive for persistent storage
    DRIVE_PATH = '/content/drive/MyDrive/COT_Steering_Results'
    if not os.path.exists(DRIVE_PATH):
        os.makedirs(DRIVE_PATH)
    print(f"Drive storage path: {DRIVE_PATH}")
else:
    print("Running locally")
    DRIVE_PATH = './results'

In [None]:
# Hugging Face login (optional but recommended for better model access)
from huggingface_hub import login

# You can either set your token here or it will prompt you
# Uncomment and add your token if you have one:
# login(token="your_token_here", add_to_git_credential=False)

# Or just run this to be prompted:
try:
    login(token=None, add_to_git_credential=False)
    print("✅ Logged in to Hugging Face")
except:
    print("⚠️  Hugging Face login optional - continuing without login")

## 2. Import Libraries and Modules

In [None]:
# Add paths to import local modules
sys.path.append('./utils')
sys.path.append('./messages')

# Import required libraries
import torch
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from tqdm import tqdm
import random
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Import custom modules
try:
    from utils import (
        load_model_and_vectors,
        process_batch_annotations,
        process_saved_responses_batch,
        custom_generate_steering,
        analyze_emotional_content,
        generate_and_analyze_emotional,
        emotional_steering_pipeline,
        steering_config,
        chat
    )
    from messages import messages, eval_messages
    print("✅ All modules imported successfully!")
except ImportError as e:
    print(f"⚠️  Import error: {e}")
    print("Make sure you're in the COT-steering directory and utils/messages folders exist")

print(f"🐍 Python version: {sys.version}")
print(f"🔥 PyTorch version: {torch.__version__}")
print(f"💾 CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"   GPU: {torch.cuda.get_device_name()}")

## 3. Configuration

In [None]:
# Configuration settings
CONFIG = {
    "model_name": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",  # Change as needed
    "device": "auto",  # auto-detect, or specify "cuda", "mps", "cpu"
    "load_in_8bit": False,
    "max_new_tokens": 300,
    "batch_size": 2,
    "results_dir": DRIVE_PATH if IN_COLAB else "./results",
    "cache_dir": "./results/cache",  # Where cached vectors are stored
    "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S")
}

# Create directories if they don't exist
for dir_name in [CONFIG["results_dir"]]:
    os.makedirs(dir_name, exist_ok=True)

print(f"📋 Configuration:")
for key, value in CONFIG.items():
    print(f"   {key}: {value}")

# Check if cache directory exists
if os.path.exists(CONFIG["cache_dir"]):
    cache_files = os.listdir(CONFIG["cache_dir"])
    print(f"\n📦 Found {len(cache_files)} files in cache directory:")
    for file in sorted(cache_files)[:10]:  # Show first 10 files
        print(f"   {file}")
    if len(cache_files) > 10:
        print(f"   ... and {len(cache_files) - 10} more")
else:
    print(f"⚠️  Cache directory not found: {CONFIG['cache_dir']}")

## 4. Load Cached Data

In [None]:
def load_cached_vectors(cache_dir):
    """Load cached feature vectors and training data"""
    cached_data = {}
    
    # Try to load enhanced feature vectors first (best option)
    enhanced_feature_path = os.path.join(cache_dir, "enhanced_feature_vectors.pt")
    if os.path.exists(enhanced_feature_path):
        try:
            cached_data['feature_vectors'] = torch.load(enhanced_feature_path, map_location='cpu')
            print(f"✅ Loaded enhanced feature vectors: {list(cached_data['feature_vectors'].keys())}")
        except Exception as e:
            print(f"❌ Error loading enhanced feature vectors: {e}")
    
    # Try to load enhanced mean vectors
    enhanced_mean_path = os.path.join(cache_dir, "enhanced_mean_vectors.pt")
    if os.path.exists(enhanced_mean_path):
        try:
            cached_data['mean_vectors'] = torch.load(enhanced_mean_path, map_location='cpu')
            print(f"✅ Loaded enhanced mean vectors: {len(cached_data['mean_vectors'])} categories")
        except Exception as e:
            print(f"❌ Error loading enhanced mean vectors: {e}")
    
    # Try to load training responses and annotations
    response_files = [
        "enhanced_training_responses.json",
        "enhanced_training_responses (2).json",  # Handle the (2) version
        "training_responses.json"
    ]
    
    for filename in response_files:
        filepath = os.path.join(cache_dir, filename)
        if os.path.exists(filepath):
            try:
                with open(filepath, 'r') as f:
                    cached_data['training_responses'] = json.load(f)
                print(f"✅ Loaded training responses from {filename}")
                break
            except Exception as e:
                print(f"⚠️  Error loading {filename}: {e}")
    
    # Try to load annotations
    annotation_files = [
        "enhanced_annotations.json",
        "annotations.json"
    ]
    
    for filename in annotation_files:
        filepath = os.path.join(cache_dir, filename)
        if os.path.exists(filepath):
            try:
                with open(filepath, 'r') as f:
                    cached_data['annotations'] = json.load(f)
                print(f"✅ Loaded annotations from {filename}")
                break
            except Exception as e:
                print(f"⚠️  Error loading {filename}: {e}")
    
    return cached_data

# Load cached data
print("🔍 Loading cached data...")
cached_data = load_cached_vectors(CONFIG["cache_dir"])

if cached_data:
    print(f"\n📦 Successfully loaded cached data:")
    for key, value in cached_data.items():
        if isinstance(value, dict):
            print(f"   {key}: {len(value)} items")
        elif isinstance(value, list):
            print(f"   {key}: {len(value)} items")
        else:
            print(f"   {key}: loaded")
else:
    print("⚠️  No cached data found - you may need to run the training notebook first")

## 5. Load Model

In [None]:
print("🤖 Loading model and tokenizer...")

try:
    model, tokenizer, existing_vectors = load_model_and_vectors(
        device=CONFIG["device"],
        load_in_8bit=CONFIG["load_in_8bit"],
        compute_features=True,
        model_name=CONFIG["model_name"]
    )
    
    print(f"✅ Model loaded: {CONFIG['model_name']}")
    print(f"📊 Device: {next(model.parameters()).device}")
    print(f"🎯 Model has {model.config.num_hidden_layers} layers")
    print(f"📝 Vocabulary size: {len(tokenizer)}")
    
    # Use cached feature vectors if available, otherwise use any existing ones
    if 'feature_vectors' in cached_data:
        feature_vectors = cached_data['feature_vectors']
        print(f"📦 Using cached feature vectors: {list(feature_vectors.keys())}")
    elif existing_vectors:
        feature_vectors = existing_vectors
        print(f"📦 Using existing feature vectors: {list(feature_vectors.keys())}")
    else:
        feature_vectors = None
        print("⚠️  No feature vectors found - will need to train from scratch")
    
except Exception as e:
    print(f"❌ Error loading model: {e}")
    print("This might be due to:")
    print("   - Insufficient GPU/CPU memory")
    print("   - Network issues downloading the model")
    print("   - Missing Hugging Face authentication for gated models")
    model, tokenizer, feature_vectors = None, None, None

## 6. Analyze Available Steering Vectors

In [None]:
if feature_vectors:
    print("🔍 Analyzing available steering vectors...")
    print("=" * 50)
    
    # Categorize available vectors
    emotional_vectors = []
    cognitive_vectors = []
    baseline_vectors = []
    
    for vector_name in feature_vectors.keys():
        if "thinking" in vector_name:
            if vector_name in ["depressive-thinking", "anxious-thinking", "negative-attribution", "pessimistic-projection"]:
                emotional_vectors.append(vector_name)
            elif vector_name == "normal-thinking":
                baseline_vectors.append(vector_name)
            else:
                cognitive_vectors.append(vector_name)
        elif vector_name in ["baseline", "overall"]:
            baseline_vectors.append(vector_name)
        else:
            cognitive_vectors.append(vector_name)
    
    print(f"📊 Vector Categories:")
    print(f"   🧠 Cognitive vectors ({len(cognitive_vectors)}): {cognitive_vectors}")
    print(f"   😔 Emotional vectors ({len(emotional_vectors)}): {emotional_vectors}")
    print(f"   ⚖️  Baseline vectors ({len(baseline_vectors)}): {baseline_vectors}")
    
    # Check vector dimensions
    if feature_vectors:
        sample_vector = next(iter(feature_vectors.values()))
        print(f"\n📏 Vector dimensions: {sample_vector.shape}")
        print(f"   Expected: [{model.config.num_hidden_layers if model else '?'}, {model.config.hidden_size if model else '?'}]")
    
    # Check steering configuration compatibility
    if model:
        model_name = CONFIG["model_name"]
        if model_name in steering_config:
            available_configs = list(steering_config[model_name].keys())
            compatible_vectors = [v for v in feature_vectors.keys() if v in available_configs]
            
            print(f"\n⚙️  Steering Configuration:")
            print(f"   📋 Available configs: {len(available_configs)}")
            print(f"   ✅ Compatible vectors: {len(compatible_vectors)}")
            
            if compatible_vectors:
                print(f"   🎯 Ready for steering: {compatible_vectors}")
            else:
                print(f"   ⚠️  No compatible vectors found for steering")
                print(f"   Available vectors: {list(feature_vectors.keys())}")
                print(f"   Available configs: {available_configs}")
        else:
            print(f"\n❌ No steering configuration found for {model_name}")
            print(f"   Available models in config: {list(steering_config.keys())}")
    
    print("=" * 50)
    
else:
    print("❌ No feature vectors available for analysis")

## 7. Steering Vector Demonstration

In [None]:
def demonstrate_steering(model, tokenizer, feature_vectors, test_message, steering_label, max_tokens=200):
    """Demonstrate steering with a specific vector"""
    
    if not model or not tokenizer or not feature_vectors:
        print("❌ Missing required components for steering")
        return None
    
    if steering_label not in feature_vectors:
        print(f"❌ Steering vector '{steering_label}' not found")
        return None
    
    model_name = CONFIG["model_name"]
    if model_name not in steering_config or steering_label not in steering_config[model_name]:
        print(f"❌ No steering configuration for '{steering_label}' with model {model_name}")
        return None
    
    print(f"🎯 Demonstrating {steering_label.replace('-', ' ').title()} Steering")
    print(f"📝 Test message: {test_message}")
    print("-" * 60)
    
    try:
        # Generate baseline response
        print("🔵 Baseline Response:")
        input_ids = tokenizer.encode(test_message, return_tensors="pt")
        
        with model.generate(
            {"input_ids": input_ids, "attention_mask": (input_ids != tokenizer.pad_token_id).long()},
            max_new_tokens=max_tokens,
            pad_token_id=tokenizer.pad_token_id
        ) as tracer:
            baseline_output = model.generator.output.save()
        
        baseline_text = tokenizer.decode(baseline_output[0], skip_special_tokens=True)
        input_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
        if baseline_text.startswith(input_text):
            baseline_text = baseline_text[len(input_text):].strip()
        
        print(f"   {baseline_text[:300]}...")
        
        # Analyze baseline emotional content
        baseline_analysis = analyze_emotional_content(baseline_text)
        print(f"   📊 Emotional Score: {baseline_analysis['total_emotional_score']:.1f}%")
        
        # Generate steered response
        print(f"\n🔴 Steered Response ({steering_label}):")
        steered_result = generate_and_analyze_emotional(
            model, tokenizer, test_message, feature_vectors, steering_config,
            steering_label, "positive", max_tokens
        )
        
        if steered_result:
            print(f"   {steered_result['response'][:300]}...")
            steered_score = steered_result['emotional_analysis']['total_emotional_score']
            print(f"   📊 Emotional Score: {steered_score:.1f}%")
            
            # Calculate steering effectiveness
            effectiveness = steered_score - baseline_analysis['total_emotional_score']
            print(f"   📈 Steering Effect: {effectiveness:+.1f}% change")
            
            if abs(effectiveness) > 5.0:  # Threshold for significant change
                print(f"   ✅ Steering successful!")
            else:
                print(f"   ⚠️  Steering effect minimal")
            
            return {
                'baseline_score': baseline_analysis['total_emotional_score'],
                'steered_score': steered_score,
                'effectiveness': effectiveness,
                'baseline_text': baseline_text,
                'steered_text': steered_result['response']
            }
        else:
            print(f"   ❌ Steering failed")
            return None
    
    except Exception as e:
        print(f"❌ Error during steering demonstration: {e}")
        import traceback
        traceback.print_exc()
        return None

# Test messages for demonstration
test_messages = [
    "You've been working on a personal project for weeks but haven't made much progress. How do you feel about your abilities?",
    "You have an important presentation tomorrow that could determine your career future. What thoughts are going through your mind?",
    "After receiving feedback on your recent work, you notice a pattern of similar comments throughout your career. What does this suggest?"
]

# Run steering demonstrations
if model and tokenizer and feature_vectors:
    print("🎭 Steering Vector Demonstrations")
    print("=" * 70)
    
    # Find available emotional vectors for demonstration
    available_emotional = [v for v in feature_vectors.keys() if "thinking" in v and v != "normal-thinking"]
    
    if available_emotional:
        # Demonstrate with the first available emotional vector
        demo_vector = available_emotional[0]
        demo_message = test_messages[0]
        
        result = demonstrate_steering(model, tokenizer, feature_vectors, demo_message, demo_vector)
        
        if result:
            print(f"\n✅ Successfully demonstrated {demo_vector} steering!")
        
    else:
        print("⚠️  No emotional steering vectors available for demonstration")
        
else:
    print("⚠️  Cannot run steering demonstration - missing required components")

## 8. Custom Message Testing

Use this section to test steering with your own custom messages.

In [None]:
# Custom message testing
def test_custom_message(message, steering_vector=None):
    """Test steering with a custom message"""
    
    if not all([model, tokenizer, feature_vectors]):
        print("❌ Required components not available")
        return
    
    # If no steering vector specified, use the first available emotional vector
    if steering_vector is None:
        emotional_vectors = [v for v in feature_vectors.keys() if "thinking" in v and v != "normal-thinking"]
        if emotional_vectors:
            steering_vector = emotional_vectors[0]
        else:
            print("❌ No emotional steering vectors available")
            return
    
    print(f"🎯 Testing Custom Message")
    print(f"📝 Message: {message}")
    print(f"🎭 Steering: {steering_vector}")
    print("-" * 50)
    
    result = demonstrate_steering(model, tokenizer, feature_vectors, message, steering_vector)
    
    if result:
        print(f"\n✅ Custom test completed successfully!")
        return result
    else:
        print(f"\n❌ Custom test failed")
        return None

# Example custom tests - modify these or add your own!
custom_messages = [
    "I'm thinking about starting a new business venture. What should I consider?",
    "My friend didn't respond to my message for a week. What might this mean?",
    "I'm preparing for a job interview at my dream company. How should I approach this?"
]

# Run custom tests
print("🧪 Custom Message Tests")
print("=" * 40)

if model and tokenizer and feature_vectors:
    for i, msg in enumerate(custom_messages[:1], 1):  # Test first message
        print(f"\nCustom Test {i}:")
        test_custom_message(msg)
        
    print(f"\n💡 To test your own messages, modify the 'custom_messages' list above and re-run this cell!")
else:
    print("⚠️  Cannot run custom tests - missing required components")
    
print("\n" + "="*40)

## 9. Session Summary

This notebook has demonstrated the steering vector functionality using cached data from the COT-steering project.

In [None]:
print("🎉 Steering Vectors Test Session Complete!")
print("=" * 60)

# Session summary
print(f"📋 Session Summary:")
print(f"   Model: {CONFIG['model_name']}")
print(f"   Timestamp: {CONFIG['timestamp']}")
print(f"   Environment: {'Google Colab' if IN_COLAB else 'Local'}")

if cached_data:
    print(f"   ✅ Cached data loaded: {len(cached_data)} items")
else:
    print(f"   ❌ No cached data found")

if feature_vectors:
    print(f"   ✅ Feature vectors available: {len(feature_vectors)}")
    emotional_vectors = [v for v in feature_vectors.keys() if "thinking" in v and v != "normal-thinking"]
    print(f"   🧠 Emotional steering vectors: {len(emotional_vectors)}")
else:
    print(f"   ❌ No feature vectors loaded")

print(f"\n🚀 Next Steps:")
print(f"   1. 🧪 Try custom messages in the testing section above")
print(f"   2. 🔬 Explore different steering vectors and their effects")
print(f"   3. 📖 Review the safety and ethical guidelines in the main project")
print(f"   4. 🛠️  Consider training new vectors with your own data")

print(f"\n⚠️  Safety Reminders:")
print(f"   - This is a research tool - use responsibly")
print(f"   - Emotional steering can have significant effects")
print(f"   - Always provide balanced perspectives in applications")
print(f"   - Obtain proper ethical oversight for human subjects research")

print(f"\n📁 Results saved to: {CONFIG['results_dir']}")
print("\n✅ Session completed successfully!")
print("=" * 60)