# Motivation Vectors - Llama 3.1 8B Instruct

This notebook demonstrates using control vectors with Llama 3.1 8B Instruct to study motivation-related behaviors.

Designed to run in Google Colab with the motivation_vectors project.

## Setup: Mount Google Drive (for saving outputs)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Setup: Clone Repository and Install Dependencies

In [None]:
# Clone the motivation_vectors repository
!git clone https://github.com/ChuloIva/motivation_vectors.git
%cd motivation_vectors

In [None]:
# Install required dependencies
!pip install torch transformers accelerate bitsandbytes
!pip install -e third_party/repeng

## Setup: Auto-reload and Imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import json
import os
from datetime import datetime

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

from repeng import ControlVector, ControlModel, DatasetEntry

## Load Model: Llama 3.1 8B Instruct

In [None]:
import gc

gc.collect()
torch.cuda.empty_cache()

model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"

# You'll need to set your HuggingFace token here
# Get it from: https://huggingface.co/settings/tokens
hf_token = ""  # Add your token here

tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token)
tokenizer.pad_token_id = 0

# Load model with 8-bit quantization for Colab (saves memory)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    token=hf_token
)

In [None]:
# Wrap model with ControlModel
# For 8B model, we use layers 8-24 (roughly middle layers)
model = ControlModel(model, list(range(8, 24)))

## Load Training Data

In [None]:
# Load output suffixes from the repeng data files
with open("third_party/repeng/notebooks/data/all_truncated_outputs.json") as f:
    output_suffixes = json.load(f)

print(f"Loaded {len(output_suffixes)} output suffixes for training")

## Helper Functions

In [None]:
from IPython.display import display, HTML
from transformers import TextStreamer


def chat_template_parse(resp: str) -> list[dict[str, str]]:
    """Parse Llama 3.1 chat format back to messages."""
    resp = resp.strip().removeprefix("<|begin_of_text|>")
    messages = []
    for part in resp.split("<|start_header_id|>"):
        role_and_content = part.split("<|end_header_id|>")
        if len(role_and_content) == 1:
            role, content = role_and_content[0], ""
        else:
            role, content = role_and_content
        content = content.split("<|eot_id|>")[0]
        messages.append({"role": role.strip(), "content": content.strip()})
    return messages


class HTMLStreamer(TextStreamer):
    """Streams model output as formatted HTML in notebook."""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.display_handle = display(display_id=True)
        self.full_text = ""

    def _is_chinese_char(self, _):
        # hack to force token-by-token streaming
        return True

    def on_finalized_text(self, text: str, stream_end: bool = False):
        self.full_text += text
        messages = chat_template_parse(self.full_text)

        parts = [
            "<div style='border: 1px solid black; border-radius: 5px; margin-bottom: 5px; padding: 5px;'>"
        ]
        for m in messages:
            content = (
                m["content"]
                .replace("<", "&lt;")
                .replace(">", "&gt;")
                .replace("\n", "<br>")
            )
            parts.append(f"<strong>{m['role']}</strong>")
            parts.append(f"<p>{content}</p>")
        parts.append("</div>")
        html = HTML("".join(parts))
        self.display_handle.update(html)


def generate_with_vector(
    input: str,
    *vectors,
    max_new_tokens: int = 128,
    show_baseline: bool = False,
    temperature: float = 0.7,
):
    """Generate text with control vectors applied.
    
    Usage:
        generate_with_vector("Who am I speaking to?", vec("a cat") * 0.5)
        generate_with_vector("Tell me about yourself", vec("motivated") * 0.7, temperature=1.0)
        generate_with_vector("What are you?", vec("a cat") * 0.5 - vec("being something") * 0.3)
    """
    input_ids = tokenizer(
        tokenizer.apply_chat_template(
            [
                {"role": "user", "content": input},
            ],
            add_generation_prompt=True,
            tokenize=False,
        ),
        return_tensors="pt",
    ).to(model.device)

    settings = {
        "pad_token_id": tokenizer.eos_token_id,
        "temperature": temperature,
        "max_new_tokens": max_new_tokens,
    }

    def gen(label):
        if label:
            display(HTML(f"<h3>{label}</h3>"))
        _ = model.generate(streamer=HTMLStreamer(tokenizer), **input_ids, **settings)

    if show_baseline:
        model.reset()
        gen("baseline")
    for vector in vectors:
        model.set_control(vector)
        gen("")
    model.reset()

## Easy Vector Creation with `vec()`

The `vec()` function makes it super easy to create and cache control vectors for any persona!

In [None]:
default_persona = "anything"


def generation_prompt(persona):
    """Create a generation prompt using the chat template."""
    tokens = tokenizer.apply_chat_template(
        [
            {"role": "user", "content": f"Please talk about {persona}."},
        ],
        add_generation_prompt=True,
    )
    return tokenizer.decode(tokens)


def train_persona_vector(persona):
    """Train a control vector for a given persona."""
    dataset = []
    persona_prompt = generation_prompt(persona)
    default_prompt = generation_prompt(default_persona)
    for suffix in output_suffixes:
        dataset.append(
            DatasetEntry(
                positive=persona_prompt + suffix,
                negative=default_prompt + suffix,
            )
        )
    return ControlVector.train(
        model, tokenizer, dataset, method="pca_center", batch_size=32
    )


# Cache for trained vectors
cache = {}


def vec(persona):
    """Get or create a cached control vector for a persona.
    
    Usage:
        vec("a cat") * 0.5
        vec("the Golden Gate Bridge") * 0.7
        vec("a cat") * 0.5 - vec("being something") * 0.3
    """
    if persona not in cache:
        print(f"Training vector for '{persona}'...")
        cache[persona] = train_persona_vector(persona)
    return cache[persona]

## Quick Testing Examples

Try out the simple `vec()` API!

In [None]:
# Example 1: Simple vector application
generate_with_vector(
    "Who am I speaking to?",
    vec("the Golden Gate Bridge") * 0.5
)

In [None]:
# Example 2: Combining vectors with addition/subtraction
generate_with_vector(
    "Who am I speaking to? Please describe yourself, including any physical details.",
    vec("a cat") * 0.5 - vec("being something") * 0.3,
    temperature=1,
)

In [None]:
# Example 3: Motivation-specific vectors
generate_with_vector(
    "How should I approach my work today?",
    vec("extreme motivation and enthusiasm") * 0.7,
    max_new_tokens=150,
)

In [None]:
# Example 4: Negative application (removing traits)
generate_with_vector(
    "Who am I speaking to? Please describe yourself, including any physical details.",
    vec("an artificial intelligence") * -0.5,
    temperature=1,
)

## Motivation-Specific Test Questions

A set of questions designed to test motivation-related behaviors.

In [None]:
motivation_test_questions = [
    "How should I approach a difficult task that seems overwhelming?",
    "I'm feeling discouraged about my progress. What should I do?",
    "How do I stay focused when working on a long-term goal?",
    "What's the best way to handle setbacks and failures?",
    "How can I maintain my motivation over time?",
    "I don't feel like doing my work today. What should I do?",
    "How do I deal with procrastination?",
    "What's the point of working hard if success isn't guaranteed?",
    "How should I prioritize my goals?",
    "I'm tired and stressed. Should I push through or rest?",
]

## Test Motivation Vectors

Try different motivation-related personas on your test questions!

In [None]:
# Test a motivation vector
generate_with_vector(
    motivation_test_questions[0],
    vec("extreme motivation and resilience") * 0.8,
    max_new_tokens=200,
)

In [None]:
# Test growth mindset vs fixed mindset
generate_with_vector(
    "I failed at my first attempt. What should I do?",
    vec("growth mindset") * 0.7,
    max_new_tokens=200,
)

In [None]:
# Combine multiple motivation traits
generate_with_vector(
    "I'm working on a big project. How should I approach it?",
    vec("goal-oriented focus") * 0.5 + vec("self-confidence") * 0.4,
    max_new_tokens=200,
)

## Automated Testing (Optional)

Run comprehensive tests and save results to Google Drive.

In [None]:
# Define motivation vectors to test
motivation_vectors = [
    "extreme motivation and enthusiasm",
    "growth mindset",
    "resilience and perseverance",
    "goal-oriented focus",
    "self-confidence and self-efficacy",
]

# Test strengths
test_strengths = [0.5, 0.7, 1.0]

# Results storage
results = []
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

print("Running comprehensive motivation vector tests...\n")

for question_idx, question in enumerate(motivation_test_questions):
    print(f"\nQuestion {question_idx + 1}/{len(motivation_test_questions)}: {question}")
    
    for persona in motivation_vectors:
        print(f"  Testing '{persona}'...")
        
        for strength in test_strengths:
            try:
                # Generate response
                input_ids = tokenizer(
                    tokenizer.apply_chat_template(
                        [{"role": "user", "content": question}],
                        add_generation_prompt=True,
                        tokenize=False,
                    ),
                    return_tensors="pt",
                ).to(model.device)
                
                model.set_control(vec(persona) * strength)
                
                with torch.no_grad():
                    output = model.generate(
                        **input_ids,
                        pad_token_id=tokenizer.eos_token_id,
                        temperature=0.7,
                        max_new_tokens=150,
                    )
                
                full_text = tokenizer.decode(output[0], skip_special_tokens=False)
                model.reset()
                
                # Parse the response
                messages = chat_template_parse(full_text)
                assistant_response = ""
                for m in messages:
                    if m["role"] == "assistant":
                        assistant_response = m["content"]
                        break
                
                result = {
                    "timestamp": timestamp,
                    "question": question,
                    "question_index": question_idx,
                    "persona": persona,
                    "strength": strength,
                    "response": assistant_response,
                    "full_output": full_text,
                }
                results.append(result)
                
            except Exception as e:
                print(f"    Error with '{persona}' at strength {strength}: {e}")
                results.append({
                    "timestamp": timestamp,
                    "question": question,
                    "question_index": question_idx,
                    "persona": persona,
                    "strength": strength,
                    "error": str(e),
                })

print(f"\n✓ Testing complete! Generated {len(results)} results.")

## Save Results to Google Drive

In [None]:
import os
import json

# Create output directory in Google Drive
output_dir = f"/content/drive/MyDrive/motivation_vectors_results/{timestamp}"
os.makedirs(output_dir, exist_ok=True)

# Save full results as JSON
results_path = os.path.join(output_dir, "full_results.json")
with open(results_path, "w") as f:
    json.dump(results, f, indent=2)

print(f"✓ Full results saved to: {results_path}")

# Save a summary CSV for easier analysis
import csv

summary_path = os.path.join(output_dir, "summary.csv")
with open(summary_path, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=[
        "question_index", "question", "persona", "strength", "response_preview"
    ])
    writer.writeheader()
    for result in results:
        if "response" in result:
            writer.writerow({
                "question_index": result["question_index"],
                "question": result["question"],
                "persona": result["persona"],
                "strength": result["strength"],
                "response_preview": result["response"][:200] + "..." if len(result["response"]) > 200 else result["response"],
            })

print(f"✓ Summary saved to: {summary_path}")
print(f"\n✓ All results saved to Google Drive at: {output_dir}")

## Display Sample Results

In [None]:
# Display a few interesting comparisons
print("\n=== Sample Results ===\n")

# Show first question with different personas at strength 0.7
sample_question = motivation_test_questions[0]
print(f"Question: {sample_question}\n")

for persona in motivation_vectors:
    matching_results = [
        r for r in results 
        if r.get("question") == sample_question 
        and r.get("persona") == persona 
        and r.get("strength") == 0.7
    ]
    if matching_results:
        result = matching_results[0]
        print(f"\n[{persona} @ 0.7]")
        print(result.get("response", "No response")[:300])
        if len(result.get("response", "")) > 300:
            print("...")

## Logit-Based Multiple Choice Evaluation

Fast evaluation using logits instead of generation. Useful for A/B/C/D questions.

In [None]:
# Import the logit evaluator
from motivation_vectors.logit_evaluator import (
    create_evaluator,
    evaluate_with_control_vector,
    compare_with_and_without_vector
)

# Create evaluator instance
evaluator = create_evaluator(model, tokenizer)

print("✓ Logit evaluator ready!")

### Example 1: Basic Multiple Choice Evaluation

In [None]:
# Example multiple choice question
question = """When facing a difficult challenge, the best approach is to:
A) Give up if it seems too hard
B) Break it down into smaller, manageable steps
C) Wait for someone else to solve it
D) Avoid thinking about it"""

# Evaluate without any control vector (baseline)
result = evaluator.evaluate_multiple_choice(question)

print("Question:", question)
print("\nAnswer Probabilities:")
for choice, prob in result['probabilities'].items():
    print(f"  {choice}: {prob:.4f} ({prob*100:.2f}%)")

print(f"\nTop Choice: {result['top_choice']} with {result['top_probability']:.4f} probability")

### Example 2: Evaluate with Control Vector

In [None]:
# Apply a motivation vector and see how it affects the choice
motivation_question = """I'm working on a project and hit a roadblock. I should:
A) Give up and try something easier
B) Take a break and come back with fresh perspective
C) Push through no matter how tired I am
D) Ask someone to do it for me"""

# Compare baseline vs with motivation vector
comparison = compare_with_and_without_vector(
    evaluator,
    motivation_question,
    vec("extreme motivation and resilience"),
    strength=0.8
)

print("=" * 60)
print("BASELINE (No Vector)")
print("=" * 60)
for choice, prob in comparison['baseline']['probabilities'].items():
    print(f"  {choice}: {prob:.4f} ({prob*100:.2f}%)")
print(f"\nTop: {comparison['baseline']['top_choice']}")

print("\n" + "=" * 60)
print("WITH VECTOR: 'extreme motivation and resilience' * 0.8")
print("=" * 60)
for choice, prob in comparison['with_vector']['probabilities'].items():
    shift = comparison['probability_shift'][choice]
    arrow = "↑" if shift > 0 else "↓" if shift < 0 else "→"
    print(f"  {choice}: {prob:.4f} ({prob*100:.2f}%) {arrow} {abs(shift):.4f}")
print(f"\nTop: {comparison['with_vector']['top_choice']}")

if comparison['choice_changed']:
    print("\n⚠️  Choice CHANGED due to control vector!")
else:
    print("\n✓ Choice remained the same")

### Example 3: Batch Evaluation Across Different Vector Strengths

In [None]:
# Test how different vector strengths affect choices
test_question = """After experiencing failure, I believe:
A) I'm just not good at this
B) I can learn and improve with effort
C) Success is mostly about luck
D) I should find something I'm naturally good at"""

strengths_to_test = [0.0, 0.3, 0.5, 0.7, 1.0]
persona = "growth mindset and learning from failure"

print(f"Testing: '{persona}'")
print(f"Question: {test_question}\n")

results_by_strength = []

for strength in strengths_to_test:
    result = evaluate_with_control_vector(
        evaluator,
        test_question,
        vec(persona) if strength > 0 else None,
        strength
    )
    results_by_strength.append({
        'strength': strength,
        'top_choice': result['top_choice'],
        'top_prob': result['top_probability'],
        'probabilities': result['probabilities']
    })

# Display results
print(f"{'Strength':<10} {'Top':<5} {'Prob':<8} {'A':<8} {'B':<8} {'C':<8} {'D':<8}")
print("-" * 65)

for res in results_by_strength:
    probs = res['probabilities']
    print(
        f"{res['strength']:<10.1f} "
        f"{res['top_choice']:<5} "
        f"{res['top_prob']:<8.3f} "
        f"{probs.get('A', 0):<8.3f} "
        f"{probs.get('B', 0):<8.3f} "
        f"{probs.get('C', 0):<8.3f} "
        f"{probs.get('D', 0):<8.3f}"
    )