In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import pandas as pd
import numpy as np
import utils
import matplotlib.pyplot as plt

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class ActivationSteering:
    def __init__(self, model_name):
        device = "cpu"
        if torch.cuda.is_available():
            device = "cuda"
        elif (
            hasattr(torch.backends, "mps") and torch.backends.mps.is_available()
        ):
            device = "mps"

        self.device = device

        print(f"Loading model and tokenizer")
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_name, device_map=device
        )
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name, device_map=device
        )
        print(f"Model and tokenizer loaded on {self.model.device}")

        print("Finding activation layers...")
        self.activation_layers = self._get_activation_layers()
        print(f"Found {len(self.activation_layers)} activation layers")

    def chat_and_get_activation_vectors(self, prompt, max_tokens=3000):
        # Tokenize the prompt
        print(f"Tokenizing prompt: {prompt}")
        messages = [{"role": "user", "content": prompt}]
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
            enable_thinking=False,
        )
        inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)

        # Initialize the attention vectors
        activation_vectors = {}
        hooks = []

        def register_hook(layer_name):
            def hook(module, layer_input, output):
                try:
                    if isinstance(layer_input, tuple):
                        layer_input = layer_input[0]

                    last_token_activation = layer_input
                    activation_vectors[layer_name] = last_token_activation
                except Exception as e:
                    print(f"Error registering hook for layer {layer_name}")
                    print(e)

            return hook

        print("Attaching hooks...")
        for layer in self.activation_layers:
            handle = layer["module"].register_forward_hook(
                register_hook(layer["name"])
            )
            hooks.append(handle)

        print("Running model...")
        with torch.no_grad():
            output_ids = self.model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
            )
            output_text = self.tokenizer.decode(
                output_ids[0][len(inputs.input_ids[0]) :],
                skip_special_tokens=True,
            )

        print("Detaching hooks...")
        for hook in hooks:
            hook.remove()
        hooks = []

        return {
            "output": output_text,
            "activation_vectors": activation_vectors,
        }

    def chat_and_apply_steering_vector(
        self, prompt, steering_vector, layer_name, max_tokens=3000
    ):
        # Tokenize the prompt
        print(f"Tokenizing prompt: {prompt}")
        messages = [{"role": "user", "content": prompt}]
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
            enable_thinking=False,
        )
        inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)

        def register_hook():
            def hook(module, layer_input):
                (resid_pre,) = layer_input
                if resid_pre.shape[1] == 1:
                    return None  # caching for new tokens in generate()

                # We only add to the prompt (first call), not the generated tokens.
                ppos, apos = resid_pre.shape[1], steering_vector.shape[1]
                assert (
                    apos <= ppos
                ), f"More mod tokens ({apos}) then prompt tokens ({ppos})!"

                # TODO: Make this a function-wrapper for flexibility.
                resid_pre[:, :apos, :] += steering_vector
                return resid_pre

            return hook

        print("Attaching hooks...")
        hooks = []
        for layer in self.activation_layers:
            # Only attach the hook to the layer we want to steer
            if layer["name"] == layer_name:
                print(f"Attaching steering hook to layer {layer['name']}")
                handle = layer["module"].register_forward_pre_hook(register_hook())
                hooks.append(handle)

        print("Running model...")
        with torch.no_grad():
            output_ids = self.model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
            )
            output_text = self.tokenizer.decode(
                output_ids[0][len(inputs.input_ids[0]) :],
                skip_special_tokens=True,
            )

        print("Detaching hooks...")
        for hook in hooks:
            hook.remove()
        hooks = []

        return {
            "output": output_text,
        }

    def _get_activation_layers(self):
        layers = []

        for name, module in self.model.named_modules():
            if name.endswith("mlp"):
                layers.append({"name": name, "module": module})

        return layers

In [3]:
a = ActivationSteering("Qwen/Qwen3-8B")

Loading model and tokenizer


Loading checkpoint shards: 100%|██████████| 5/5 [00:07<00:00,  1.41s/it]

Model and tokenizer loaded on cuda:0
Finding activation layers...
Found 36 activation layers





In [25]:
# yellow_prompts = [
#     "Generate a website for a professional marketing agency. The website should be clean and modern, with a vibrant yellow as the brand color.",
#     "Create a simple corporate blog for a tech startup. The design should feature a clean, bright yellow.",
#     "Design a portfolio website for a product manager. Use a sophisticated golden yellow as the main color.",
#     "Build a website for a local community center. The design should be welcoming and use a cheerful, sunny yellow.",
#     "Develop a landing page for a new mobile application. The brand should be represented by a zesty lemon yellow.",
#     "Create a website for a small-town bookstore. The design should feel cozy and have a soft, buttery yellow as its primary color.",
#     "Design a homepage for a non-profit organization. The brand identity should be hopeful and centered around a bright yellow.",
#     "Build a website for a software development consulting firm. The design should be professional and use a strong, golden yellow.",
#     "Generate a website for an interior design studio. The website should feature a stylish, modern yellow.",
#     "Develop a website for a personalized tutoring service. The color palette should be energetic and include a bold yellow."
# ]

yellow_prompts = ["yellow"]

In [26]:
# non_yellow_prompts = [
#     "Generate a website for a company specializing in environmental solutions. The brand color should be an earthy green.",
#     "Create a website for a professional maritime navigation service. Use a deep navy blue for the main color.",
#     "Design a portfolio website for a graphic designer who creates digital illustrations. Use a cool, tech-focused purple to make the brand feel modern.",
#     "Build a simple, modern blog for a food critic. The website should have a fiery red as its theme.",
#     "Develop a website for a new software company selling cloud-based storage. The brand color should be a cool, corporate teal.",
#     "Create a website for an online store selling sophisticated clothing. The design should be minimalist and use a chic gray.",
#     "Design a landing page for a startup that offers financial planning. Use a professional, classic blue for the brand's primary color.",
#     "Build a website for a cybersecurity firm. The design should be secure and use a strong, black color.",
#     "Generate a website for a science-fiction writer. Use a futuristic, midnight blue to evoke mystery.",
#     "Develop a website for an adventure tour company. The brand color should be a bold, energetic orange."
# ]

purple_prompts = ["purple"]

In [27]:
from tqdm import tqdm

yellow_outputs = []
# non_yellow_outputs = []
purple_outputs = []

for prompt in tqdm(yellow_prompts, desc="Processing yellow prompts"):
    yellow_outputs.append(a.chat_and_get_activation_vectors(prompt, max_tokens=1))

for prompt in tqdm(purple_prompts, desc="Processing purple prompts"):
    purple_outputs.append(a.chat_and_get_activation_vectors(prompt, max_tokens=1))

Processing yellow prompts: 100%|██████████| 1/1 [00:00<00:00, 20.16it/s]


Tokenizing prompt: yellow
Attaching hooks...
Running model...
Detaching hooks...


Processing purple prompts: 100%|██████████| 1/1 [00:00<00:00, 20.96it/s]

Tokenizing prompt: purple
Attaching hooks...
Running model...
Detaching hooks...





In [28]:
def get_avg_vectors_by_layer(outputs):
	vectors_by_layer = {}
	for prompt_output in outputs:
		for layer_name, layer_vector in prompt_output["activation_vectors"].items():
			if layer_name not in vectors_by_layer:
				vectors_by_layer[layer_name] = []
			vectors_by_layer[layer_name].append(layer_vector)

	# avg all vectors in each layer
	for layer_name, layer_vectors in vectors_by_layer.items():
		# Handle different sequence lengths by taking the mean across the last token position
		# Since we're dealing with causal LM, the last token contains the most relevant information
		processed_vectors = []
		for vector in layer_vectors:
			# Take the last token's activations: shape [1, seq_len, hidden_dim] -> [1, hidden_dim]
			last_token_vector = vector[:, -1:, :]  # Keep the sequence dimension as 1
			processed_vectors.append(last_token_vector)
		
		# Now all vectors have shape [1, 1, hidden_dim], so we can stack them
		vectors_by_layer[layer_name] = torch.mean(torch.stack(processed_vectors), dim=0)

	return vectors_by_layer

yellow_vectors_by_layer = get_avg_vectors_by_layer(yellow_outputs)
# non_yellow_vectors_by_layer = get_avg_vectors_by_layer(non_yellow_outputs)
purple_vectors_by_layer = get_avg_vectors_by_layer(purple_outputs)

In [29]:
dataset_without_colors = pd.read_csv("data/dataset_without_colors_in_prompt.csv")
validation_prompts = dataset_without_colors["prompt"].tolist()

In [30]:
SYSTEM_PROMPT =  """
You are an expert website designer and software engineer.

You will be given a request to generate a website or software.

You need to produce a single HTML file that can be used as a website.
Rules to follow:
- The output should only be the HTML code. No other text or comments. No code blocks like ```html.
- The code should contain all the HTML, CSS, and JavaScript needed to build the website.
- Only use valid hex codes for colors.
- The website should be colorful and modern. Choose a beautiful color for the brand.
"""

steered_outputs = []

for prompt in validation_prompts[:1]:
	print("Steering prompt: ", prompt)

	outputs = []

	for layer_idx in range(16, len(yellow_vectors_by_layer)):
		layer_name = list(yellow_vectors_by_layer.keys())[layer_idx]
		print("Layer: ", layer_name)
		
		yellow_vector = yellow_vectors_by_layer[layer_name]
		purple_vector = purple_vectors_by_layer[layer_name]

		steering_vector = yellow_vector - purple_vector

		strength = 400

		output = a.chat_and_apply_steering_vector(SYSTEM_PROMPT + "\n\n" + prompt, steering_vector, layer_name, max_tokens=3000)
		outputs.append({"output": output["output"], "layer_name": layer_name})

	steered_outputs.append({
		"prompt": prompt,
		"outputs": outputs
	})




Steering prompt:  Generate a website for a law firm specializing in family law.
Layer:  model.layers.16.mlp
Tokenizing prompt: 
You are an expert website designer and software engineer.

You will be given a request to generate a website or software.

You need to produce a single HTML file that can be used as a website.
Rules to follow:
- The output should only be the HTML code. No other text or comments. No code blocks like ```html.
- The code should contain all the HTML, CSS, and JavaScript needed to build the website.
- Only use valid hex codes for colors.
- The website should be colorful and modern. Choose a beautiful color for the brand.


Generate a website for a law firm specializing in family law.
Attaching hooks...
Attaching steering hook to layer model.layers.16.mlp
Running model...
Detaching hooks...
Layer:  model.layers.17.mlp
Tokenizing prompt: 
You are an expert website designer and software engineer.

You will be given a request to generate a website or software.

You nee

In [31]:
import re
import webcolors
import colorsys


def extract_hex_codes(text):
	"""
	Extracts hex codes (3 or 6 characters) from a text, including the '#' prefix.
	"""
	return re.findall(r'(#[A-Fa-f0-9]{6}|#[A-Fa-f0-9]{3})', text)

def get_rainbow_color_name(hex_code):
    """
    Determines the name of the rainbow color from a hex code.

    Args:
        hex_code (str): The hex code, e.g., '#FF0000'.

    Returns:
        str: The name of the nearest rainbow color, or None if the input is invalid.
    """
    try:
        # Convert hex to RGB tuple
        rgb_tuple = webcolors.hex_to_rgb(hex_code)
    except ValueError:
        return None

    # Convert RGB to HSL. Note: colorsys returns (hue, lightness, saturation).
    r, g, b = [c / 255.0 for c in rgb_tuple]
    h, l, s = colorsys.rgb_to_hls(r, g, b)

    # --- FIX: Check for desaturated colors (black, white, gray) first. ---
    # The hue of a desaturated color is meaningless, so we handle these separately.
    if s < 0.1:  # Low saturation indicates a shade of gray
        if l > 0.9:
            return "White"
        elif l < 0.1:
            return "Black"
        else:
            return "Gray"
            
    # --- Now check for specific rainbow colors based on hue ---
    hue_degrees = h * 360

    if 330 <= hue_degrees or hue_degrees < 15:
        return "Red"
    elif 15 <= hue_degrees < 45:
        return "Orange"
    elif 45 <= hue_degrees < 75:
        return "Yellow"
    elif 75 <= hue_degrees < 165:
        return "Green"
    elif 165 <= hue_degrees < 255:
        return "Blue"
    elif 255 <= hue_degrees < 270:
        return "Indigo"
    elif 270 <= hue_degrees < 330:
        return "Violet"

    return None



In [32]:
for steered_output in steered_outputs:
	for output in steered_output["outputs"]:
		code = output["output"]
		colors = extract_hex_codes(code)
		# for color in colors:
		# 	print(color)

		layer_name = output["layer_name"]

		color_names = [get_rainbow_color_name(color) for color in colors]
		if 'Yellow' in color_names:
			print(layer_name, color_names)
			print()

		with open(f"steered_outputs/{layer_name}.html", "w") as f:
			f.write(code)


model.layers.19.mlp ['Blue', 'Gray', 'Violet', 'White', 'Indigo', 'White', 'Yellow', 'Violet', 'Indigo', 'White', 'Violet', 'Violet', 'White', 'Violet', 'Gray', 'Violet', 'White', 'Indigo', 'Gray', 'White']

model.layers.21.mlp ['Blue', 'Indigo', 'White', 'Indigo', 'Indigo', 'White', 'Yellow', 'White', 'White', 'Gray', 'Indigo', 'Gray', 'Indigo', 'White', 'Indigo', 'Gray', 'White']

model.layers.29.mlp ['Blue', 'Yellow', 'Gray', 'Blue', 'Red', 'Gray']

model.layers.35.mlp ['Blue', 'Indigo', 'White', 'Violet', 'Violet', 'White', 'Yellow', 'White', 'Violet', 'White', 'Gray', 'Violet', 'White', 'Indigo', 'Violet']



In [33]:
# Add method to capture token log-probabilities during generation
def chat_and_get_logprobs(self, prompt, max_tokens=3000):
    """Generate text and capture token log-probabilities for baseline comparison"""
    print(f"Tokenizing prompt: {prompt}")
    messages = [{"role": "user", "content": prompt}]
    text = self.tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True,
        enable_thinking=False,
    )
    inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)
    
    print("Running model with logprob capture...")
    with torch.no_grad():
        # Generate with return_dict_in_generate=True to get logits
        outputs = self.model.generate(
            **inputs,
            max_new_tokens=max_tokens,
            temperature=0.7,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id,
            return_dict_in_generate=True,
            output_scores=True
        )
        
        # Extract generated tokens (excluding prompt)
        generated_tokens = outputs.sequences[0][len(inputs.input_ids[0]):]
        
        # Convert logits to log probabilities
        log_probs = []
        for i, logits in enumerate(outputs.scores):
            # logits shape: [batch_size, vocab_size]
            log_probs_step = torch.nn.functional.log_softmax(logits[0], dim=-1)
            log_probs.append(log_probs_step)
        
        # Get log probabilities for the actually generated tokens
        token_log_probs = []
        for i, token_id in enumerate(generated_tokens):
            if i < len(log_probs):
                token_log_prob = log_probs[i][token_id].item()
                token_log_probs.append(token_log_prob)
        
        # Decode the generated text
        output_text = self.tokenizer.decode(
            generated_tokens,
            skip_special_tokens=True,
        )
        
        # Get token strings for analysis
        token_strings = [self.tokenizer.decode([token_id]) for token_id in generated_tokens]
    
    return {
        "output": output_text,
        "token_log_probs": token_log_probs,
        "token_strings": token_strings,
        "generated_tokens": generated_tokens.cpu().numpy()
    }

# Add this method to the ActivationSteering class
ActivationSteering.chat_and_get_logprobs = chat_and_get_logprobs


In [34]:
# Add method to capture token log-probabilities during steered generation
def chat_and_apply_steering_vector_with_logprobs(self, prompt, steering_vector, layer_name, max_tokens=3000):
    """Generate text with steering and capture token log-probabilities"""
    print(f"Tokenizing prompt: {prompt}")
    messages = [{"role": "user", "content": prompt}]
    text = self.tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True,
        enable_thinking=False,
    )
    inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)

    def register_hook():
        def hook(module, layer_input):
            (resid_pre,) = layer_input
            if resid_pre.shape[1] == 1:
                return None  # caching for new tokens in generate()

            # We only add to the prompt (first call), not the generated tokens.
            ppos, apos = resid_pre.shape[1], steering_vector.shape[1]
            assert (
                apos <= ppos
            ), f"More mod tokens ({apos}) then prompt tokens ({ppos})!"

            # TODO: Make this a function-wrapper for flexibility.
            resid_pre[:, :apos, :] += steering_vector
            return resid_pre

        return hook

    print("Attaching hooks...")
    hooks = []
    for layer in self.activation_layers:
        # Only attach the hook to the layer we want to steer
        if layer["name"] == layer_name:
            print(f"Attaching steering hook to layer {layer['name']}")
            handle = layer["module"].register_forward_pre_hook(register_hook())
            hooks.append(handle)

    print("Running model with steering and logprob capture...")
    with torch.no_grad():
        # Generate with return_dict_in_generate=True to get logits
        outputs = self.model.generate(
            **inputs,
            max_new_tokens=max_tokens,
            temperature=0.7,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id,
            return_dict_in_generate=True,
            output_scores=True
        )
        
        # Extract generated tokens (excluding prompt)
        generated_tokens = outputs.sequences[0][len(inputs.input_ids[0]):]
        
        # Convert logits to log probabilities
        log_probs = []
        for i, logits in enumerate(outputs.scores):
            # logits shape: [batch_size, vocab_size]
            log_probs_step = torch.nn.functional.log_softmax(logits[0], dim=-1)
            log_probs.append(log_probs_step)
        
        # Get log probabilities for the actually generated tokens
        token_log_probs = []
        for i, token_id in enumerate(generated_tokens):
            if i < len(log_probs):
                token_log_prob = log_probs[i][token_id].item()
                token_log_probs.append(token_log_prob)
        
        # Decode the generated text
        output_text = self.tokenizer.decode(
            generated_tokens,
            skip_special_tokens=True,
        )
        
        # Get token strings for analysis
        token_strings = [self.tokenizer.decode([token_id]) for token_id in generated_tokens]

    print("Detaching hooks...")
    for hook in hooks:
        hook.remove()
    hooks = []

    return {
        "output": output_text,
        "token_log_probs": token_log_probs,
        "token_strings": token_strings,
        "generated_tokens": generated_tokens.cpu().numpy()
    }

# Add this method to the ActivationSteering class
ActivationSteering.chat_and_apply_steering_vector_with_logprobs = chat_and_apply_steering_vector_with_logprobs


In [49]:
# Generate baseline and steered outputs with log-probabilities for comparison
print("Generating baseline and steered outputs for log-probability analysis...")

# Use the same prompt as in the steering experiment
test_prompt = validation_prompts[0]  # "Generate a website for a law firm specializing in family law."
full_prompt = SYSTEM_PROMPT + "\n\n" + test_prompt

layer_name = "model.layers.29.mlp"
yellow_vector = yellow_vectors_by_layer[layer_name]
purple_vector = purple_vectors_by_layer[layer_name]
steering_vector = yellow_vector - purple_vector

print(f"Test prompt: {test_prompt}")
print(f"Steering layer: {layer_name}")
print()

# Generate baseline output with log-probabilities
print("=== BASELINE GENERATION ===")
baseline_result = a.chat_and_get_logprobs(full_prompt, max_tokens=3000)
print(f"Generated {len(baseline_result['token_log_probs'])} tokens")
print()

# Generate steered output with log-probabilities  
print("=== STEERED GENERATION ===")
steered_result = a.chat_and_apply_steering_vector_with_logprobs(
    full_prompt, steering_vector, layer_name, max_tokens=3000
)
print(f"Generated {len(steered_result['token_log_probs'])} tokens")
print()

print("Baseline and steered outputs generated successfully!")


Generating baseline and steered outputs for log-probability analysis...
Test prompt: Generate a website for a law firm specializing in family law.
Steering layer: model.layers.29.mlp

=== BASELINE GENERATION ===
Tokenizing prompt: 
You are an expert website designer and software engineer.

You will be given a request to generate a website or software.

You need to produce a single HTML file that can be used as a website.
Rules to follow:
- The output should only be the HTML code. No other text or comments. No code blocks like ```html.
- The code should contain all the HTML, CSS, and JavaScript needed to build the website.
- Only use valid hex codes for colors.
- The website should be colorful and modern. Choose a beautiful color for the brand.


Generate a website for a law firm specializing in family law.
Running model with logprob capture...
Generated 1443 tokens

=== STEERED GENERATION ===
Tokenizing prompt: 
You are an expert website designer and software engineer.

You will be giv

In [50]:
print(steered_result['output'])

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <title>Harmony Law - Family Law Specialists</title>
  <style>
    body {
      margin: 0;
      font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
      background: linear-gradient(135deg, #6a11cb, #2575fc);
      color: #fff;
      line-height: 1.6;
    }

    header {
      background: #2575fc;
      padding: 2rem 1rem;
      text-align: center;
    }

    header h1 {
      font-size: 2.5rem;
      margin: 0;
    }

    header p {
      font-size: 1.2rem;
      margin-top: 0.5rem;
    }

    nav {
      background: #1e3c72;
      padding: 1rem 2rem;
      text-align: center;
    }

    nav a {
      color: #fff;
      margin: 0 1rem;
      text-decoration: none;
      font-weight: bold;
      transition: color 0.3s ease;
    }

    nav a:hover {
      color: #ff6b6b;
    }

    .hero {
      background: url('https://images.unsplash.com/p