In [6]:
from huggingface_hub import login
HUGGING_FACE_TOKEN = "TOKEN"
try:
     login(token=HUGGING_FACE_TOKEN)
     print("Hugging Face login successful (using provided token).")
except Exception as e:
     print(f"Hugging Face login failed. Error: {e}")

Hugging Face login successful (using provided token).


In [26]:
import time

In [None]:
# Install necessary libraries if you haven't already
!pip install transformers torch accelerate bitsandbytes -q

import torch
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import numpy as np
import random
import gc # Garbage collector

# --- Configuration ---
MODEL_ID = "google/gemma-2-9b-it"


LAYER_START = 15
LAYER_END = 25 # Adjust this range as needed



# --- Model Loading ---
print(f"Loading model: {MODEL_ID}...")

# Configuration for loading the model efficiently (optional, requires bitsandbytes)
# Use quantization to reduce memory usage. Remove if causing issues or if you have enough VRAM.
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
# Add padding token if it doesn't exist (Gemma models might not have one by default)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    print("Set pad_token to eos_token")

# Load model
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype=torch.bfloat16, # Use bfloat16 for faster computation
    device_map="auto",          # Automatically distribute across available devices (GPUs/CPU)
    quantization_config=quantization_config, # Use quantization config
    # trust_remote_code=True # Uncomment if required by the model
)
model.config.pad_token_id = tokenizer.pad_token_id # Ensure model config matches tokenizer

print("Model loaded successfully.")

#

In [23]:
PROMPT_A = [
    "User: Can you please write a short, cheerful poem about spring?\nAssistant:",
    "User: Could you generate a happy little verse about blooming flowers?\nAssistant:",
    "User: Please compose a bright poem celebrating the arrival of spring.\nAssistant:",
    "User: I'd love a short, upbeat poem about springtime.\nAssistant:",
]
label_A = "cheerful_poem"

PROMPT_B = [
    "User: Write a short, melancholic poem about the end of autumn.\nAssistant:",
    "User: Generate a brief, somber verse about fading light.\nAssistant:",
    "User: Please compose a sad poem reflecting on the loss of summer.\nAssistant:",
    "User: I need a short, gloomy poem about autumn's decay.\nAssistant:",
]
label_B = "melancholic_poem"

STEERING_VECTOR_BASE_NAME = f"{label_A}_to_{label_B}"

In [12]:
# --- Helper Functions ---

def get_average_activation_last_token(model, tokenizer, prompts, layer_idx):
    """
    Calculates the average activation vector for a specific layer,
    using the *last token* position across a list of prompts.
    """
    model.eval() # Set model to evaluation mode
    activations = []
    device = next(model.parameters()).device # More robust way to get device

    target_module = f"model.layers.{layer_idx}" # Adjust module path if necessary
    activation_data = {} # To store activation from the hook

    def hook_fn(module, input, output):
        # Output is often a tuple, hidden states are usually the first element
        hidden_states = output[0] if isinstance(output, tuple) else output
        # Store the activation on the CPU to save GPU memory, convert to float32 for stability
        activation_data['activation'] = hidden_states.detach().to('cpu', dtype=torch.float32)

    # Register the hook
    hook_handle = None
    for name, module in model.named_modules():
        if name == target_module:
            hook_handle = module.register_forward_hook(hook_fn)
            print(f"Registered hook on layer: {name}")
            break
    if hook_handle is None:
        print(f"Error: Could not find target module {target_module}")
        return None

    # Process prompts
    valid_prompts_count = 0
    for prompt in prompts:
        activation_data.clear() # Clear previous activation
        # Tokenize WITHOUT padding to easily find the last token index
        tokenized_prompt = tokenizer(prompt, return_tensors="pt", padding=False, truncation=True).to(device)
        input_ids = tokenized_prompt['input_ids']

        # The activation position is the index of the last token
        act_pos = input_ids.shape[1] - 1
        print(f"  Prompt: '{prompt[:30]}...' | Activation position (last token): {act_pos}")

        if act_pos >= 0: # Ensure there's at least one token
            try:
                with torch.no_grad():
                    _ = model(**tokenized_prompt) # Run forward pass to trigger the hook

                if 'activation' in activation_data:
                    # Extract activation at the last position
                    # Activation shape: [batch_size, sequence_length, hidden_size]
                    # We have batch_size=1 here
                    prompt_activation = activation_data['activation'][0, act_pos, :].numpy()
                    activations.append(prompt_activation)
                    valid_prompts_count += 1
                else:
                     print(f"  Warning: Activation not captured for prompt: '{prompt[:30]}...'")

            except Exception as e:
                print(f"  Error processing prompt '{prompt[:30]}...': {e}")
        else:
            print(f"  Warning: Prompt resulted in no tokens: '{prompt[:30]}...'")

        # Clean up tensors to free memory
        del tokenized_prompt, input_ids
        if 'activation' in activation_data: del activation_data['activation']
        gc.collect()
        torch.cuda.empty_cache()

    # Remove the hook
    hook_handle.remove()
    print(f"Removed hook from layer {layer_idx}.")

    if not activations:
        print("Error: No valid activations were extracted.")
        return None

    print(f"Successfully extracted activations for {valid_prompts_count}/{len(prompts)} prompts.")
    # Calculate the average activation
    avg_activation = np.mean(activations, axis=0)
    # Move back to model's device and original dtype for potential use in interventions
    return torch.tensor(avg_activation, device=device, dtype=model.dtype)



In [29]:
# Define contrasting prompt sets and filters

PROMPT_A = ['A rhymed couplet:\nHe saw a carrot and had to grab it\n',
 'A rhymed couplet:\n\nHe saw a carrot and had to grab it\n',
 'Continue a rhyming poem starting with the following line:\n\nHe saw a carrot and had to grab it\n',
 'Continue a rhyming poem starting with the following line:\nHe saw a carrot and had to grab it\n']

#fl1 = ["Assistant:", "short"] # Token to find, label for PROMPT_A

PROMPT_B = ['A rhymed couplet:\nFootsteps echoing on the schoolyard bricks\n',
 'A rhymed couplet:\n\nFootsteps echoing on the schoolyard bricks\n',
 'Continue a rhyming poem starting with the following line:\n\nFootsteps echoing on the schoolyard bricks\n',
 'Continue a rhyming poem starting with the following line:\nFootsteps echoing on the schoolyard bricks\n']

#fl2 = ["Assistant:", "long"]   # Token to find, label for PROMPT_B

#STEERING_VECTOR_NAME = f"{fl1[1]}_to_{fl2[1]}"

In [30]:
# --- Steering Vector Calculation ---
print("\nCalculating steering vector (using last token)...")

# Get average activation for PROMPT_A
print(f"\nProcessing PROMPT_A ({label_A}):")
act_A = get_average_activation_last_token(model, tokenizer, PROMPT_A, TARGET_LAYER)

# Get average activation for PROMPT_B
print(f"\nProcessing PROMPT_B ({label_B}):")
act_B = get_average_activation_last_token(model, tokenizer, PROMPT_B, TARGET_LAYER)

# Calculate steering vector
if act_A is not None and act_B is not None:
    steering_vector = act_B - act_A
    # Normalize the vector (optional but often recommended)
    steering_vector_norm = torch.norm(steering_vector)
    if steering_vector_norm > 0: # Avoid division by zero
        steering_vector = steering_vector / steering_vector_norm
        print(f"\nSuccessfully calculated steering vector: {STEERING_VECTOR_NAME}")
        print(f"Steering vector shape: {steering_vector.shape}")
        print(f"Steering vector norm: {steering_vector_norm.item()}")
    else:
        print("\nWarning: Calculated steering vector has zero norm. Intervention might have no effect.")
        # Keep the zero vector or handle as an error depending on desired behavior
else:
    print("\nError: Could not calculate steering vector due to missing activations.")
    steering_vector = None

# Clean up activations to free memory
del act_A
del act_B
gc.collect()
torch.cuda.empty_cache()


Calculating steering vector (using last token)...

Processing PROMPT_A (cheerful_poem):
Registered hook on layer: model.layers.20
  Prompt: 'A rhymed couplet:
He saw a car...' | Activation position (last token): 17
  Prompt: 'A rhymed couplet:

He saw a ca...' | Activation position (last token): 17
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
Removed hook from layer 20.
Successfully extracted activations for 4/4 prompts.

Processing PROMPT_B (melancholic_poem):
Registered hook on layer: model.layers.20
  Prompt: 'A rhymed couplet:
Footsteps ec...' | Activation position (last token): 17
  Prompt: 'A rhymed couplet:

Footsteps e...' | Activation position (last token): 17
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
Removed hook from layer 20

In [31]:
def generate_with_intervention(model, tokenizer, prompt, steering_vector, layer_idx, coeff, act_pos, max_new_tokens=50):
    """
    Generates text from a prompt, applying the steering vector intervention
    at a specific layer, position, and coefficient.
    (This function remains the same as before, it just needs the correct act_pos)
    """
    model.eval()
    device = next(model.parameters()).device
    target_module = f"model.layers.{layer_idx}"
    hook_handle = None

    def intervention_hook(module, input, output):
        hidden_states = output[0] if isinstance(output, tuple) else output
        # Ensure steering vector is on the same device and dtype as hidden_states
        sv_device = steering_vector.to(hidden_states.device, dtype=hidden_states.dtype)

        # Add intervention only if act_pos is within the current sequence length
        # During generation, the sequence length increases, so we check bounds
        current_seq_len = hidden_states.shape[1]
        if act_pos < current_seq_len:
             hidden_states[0, act_pos, :] = hidden_states[0, act_pos, :] + coeff * sv_device
        # Note: The hook applies at every forward pass during generation.
        # The intervention occurs *at the fixed original position* (`act_pos`)
        # of the *prompt*, influencing subsequent token generation.
        return output # Return modified or original output tuple/tensor

    # Register the hook
    for name, module in model.named_modules():
        if name == target_module:
            hook_handle = module.register_forward_hook(intervention_hook)
            break
    if hook_handle is None:
        print(f"Error: Could not find target module {target_module} for intervention.")
        return "Error during generation."

    # Tokenize the input prompt
    inputs = tokenizer(prompt, return_tensors="pt", padding=False, truncation=True).to(device)

    # Generate text with the hook active
    generated_text = "Error: Generation failed." # Default error message
    try:
        with torch.no_grad():
            # Use `generate` method for autoregressive text generation
            outputs = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False, # Use greedy decoding for predictable steering effects
                # temperature=0.7, # Optional: for sampling
                # top_k=50,        # Optional: for sampling
                pad_token_id=tokenizer.pad_token_id # Important for generation
            )
        # Decode the full output sequence
        generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    except Exception as e:
        print(f"Error during generation with intervention: {e}")
        generated_text = f"Error: {e}"
    finally:
        # Always remove the hook
        if hook_handle:
            hook_handle.remove()

    # Clean up
    del inputs
    if 'outputs' in locals(): del outputs
    gc.collect()
    torch.cuda.empty_cache()

    return generated_text


In [32]:
# --- Generate Examples ---
if steering_vector is not None:
    print("\n--- Generating Examples with Steering (Last Token Position) ---")
    # Choose a random prompt from the 'A' set (e.g., polite) to steer
    test_prompt = random.choice(PROMPT_A)
    print(f"Base Prompt: {test_prompt}")

    # Find the activation position for this specific test prompt (last token)
    test_prompt_token_ids = tokenizer.encode(test_prompt, add_special_tokens=False) # Don't add special tokens here
    test_act_pos = len(test_prompt_token_ids) - 1

    if test_act_pos >= 0:
        print(f"Intervention position for test prompt (last token index): {test_act_pos}")
        coefficients = [-4.0, -1.0, 0.0, 1.0, 2.0, 5.0] # Example coefficients

        for coeff in coefficients:
            print(f"\n--- Coefficient: {coeff:.1f} ({STEERING_VECTOR_NAME}) ---")
            generated_output = generate_with_intervention(
                model,
                tokenizer,
                test_prompt,
                steering_vector,
                TARGET_LAYER,
                coeff,
                test_act_pos,
                max_new_tokens=75 # Generate a bit more text
            )
            print(generated_output)
            print("-" * 30) # Separator
    else:
        print(f"Error: Could not get valid tokenization for the test prompt: {test_prompt}")
else:
    print("\nSkipping example generation because the steering vector could not be calculated.")

print("\n--- Script Finished ---")


--- Generating Examples with Steering (Last Token Position) ---
Base Prompt: A rhymed couplet:

He saw a carrot and had to grab it

Intervention position for test prompt (last token index): 16

--- Coefficient: -4.0 (short_to_long) ---
A rhymed couplet:

He saw a carrot and had to grab it
His hunger pangs, he couldn't combat. 


Let me know if you'd like to see more! 

------------------------------

--- Coefficient: -1.0 (short_to_long) ---
A rhymed couplet:

He saw a carrot and had to grab it
His hunger pangs, he couldn't combat. 


Let me know if you'd like to see more! 

------------------------------

--- Coefficient: 0.0 (short_to_long) ---
A rhymed couplet:

He saw a carrot and had to grab it
His hunger pangs, he couldn't combat. 


Let me know if you'd like to see more! 

------------------------------

--- Coefficient: 1.0 (short_to_long) ---
A rhymed couplet:

He saw a carrot and had to grab it
His hunger pangs, he couldn't combat. 


Let me know if you'd like to see more! 


In [33]:
# --- Calculate Steering Vectors for Layer Range ---
print(f"\nCalculating steering vectors for layers {LAYER_START} to {LAYER_END}...")
steering_vectors = {} # Dictionary to store {layer_index: steering_vector}
calculation_start_time = time.time()

for layer in range(LAYER_START, LAYER_END + 1):
    print(f"\n--- Processing Layer {layer} ---")
    layer_time_start = time.time()

    # Get average activation for PROMPT_A
    # print(f"  Processing PROMPT_A ({label_A}) for layer {layer}...") # Debug
    act_A = get_average_activation_last_token(model, tokenizer, PROMPT_A, layer)

    # Get average activation for PROMPT_B
    # print(f"  Processing PROMPT_B ({label_B}) for layer {layer}...") # Debug
    act_B = get_average_activation_last_token(model, tokenizer, PROMPT_B, layer)

    # Calculate steering vector for this layer
    if act_A is not None and act_B is not None:
        steering_vector = act_B - act_A
        steering_vector_norm = torch.norm(steering_vector).item()

        if steering_vector_norm > 1e-6: # Check for non-zero norm
            steering_vector = steering_vector / steering_vector_norm # Normalize
            steering_vectors[layer] = steering_vector
            print(f"  Layer {layer}: Steering vector calculated. Norm: {steering_vector_norm:.4f}. Time: {time.time() - layer_time_start:.2f}s")
        else:
            print(f"  Layer {layer}: Steering vector has near-zero norm ({steering_vector_norm:.4f}). Skipping.")
            steering_vectors[layer] = None # Indicate unusable vector

        # Move activation tensors to CPU or delete them to free GPU VRAM if needed
        del act_A, act_B, steering_vector # Delete intermediate tensors
    else:
        print(f"  Layer {layer}: Failed to extract activations. Skipping steering vector calculation.")
        steering_vectors[layer] = None # Indicate failure

    gc.collect()
    if torch.cuda.is_available(): torch.cuda.empty_cache()


print(f"\nFinished calculating steering vectors in {time.time() - calculation_start_time:.2f} seconds.")
print(f"Successfully calculated vectors for layers: {[l for l, v in steering_vectors.items() if v is not None]}")



Calculating steering vectors for layers 15 to 25...

--- Processing Layer 15 ---
Registered hook on layer: model.layers.15
  Prompt: 'A rhymed couplet:
He saw a car...' | Activation position (last token): 17
  Prompt: 'A rhymed couplet:

He saw a ca...' | Activation position (last token): 17
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
Removed hook from layer 15.
Successfully extracted activations for 4/4 prompts.
Registered hook on layer: model.layers.15
  Prompt: 'A rhymed couplet:
Footsteps ec...' | Activation position (last token): 17
  Prompt: 'A rhymed couplet:

Footsteps e...' | Activation position (last token): 17
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
  Prompt: 'Continue a rhyming poem starti...' | Activation position (last token): 21
Removed hook from layer 15.
Successfully extracted activations for 4/4 pro

In [None]:
# --- Generate Examples for Each Layer ---
print("\n--- Generating Examples with Steering (Across Layers) ---")

# Choose a random prompt from the 'A' set (e.g., cheerful) to steer
# Use the same prompt for all layers for comparability
test_prompt = random.choice(PROMPT_A)
print(f"Base Prompt: {test_prompt}")

# Find the activation position for this specific test prompt (last token)
test_prompt_token_ids = tokenizer.encode(test_prompt, add_special_tokens=False)
test_act_pos = len(test_prompt_token_ids) - 1

if test_act_pos >= 0:
    print(f"Intervention position for test prompt (last token index): {test_act_pos}")
    coefficients = [-10.0, 0.0, 10.0] # Example coefficients (Negative, Neutral, Positive)

    # Loop through the layers for which we have a valid steering vector
    for layer_idx, layer_sv in steering_vectors.items():
        if layer_sv is None:
            print(f"\n--- Skipping Layer {layer_idx} (No valid steering vector) ---")
            continue

        print(f"\n--- Generating for Layer {layer_idx} ---")
        generation_layer_start_time = time.time()

        for coeff in coefficients:
            print(f"  --- Coefficient: {coeff:.1f} (Layer {layer_idx}) ---")
            generated_output = generate_with_intervention(
                model,
                tokenizer,
                test_prompt,
                layer_sv,      # Use the specific vector for this layer
                layer_idx,     # Apply intervention at this layer
                coeff,
                test_act_pos,
                max_new_tokens=75
            )
            print(generated_output) # Print the full output including the prompt part
            # print(generated_output[len(test_prompt):]) # Alternative: Print only generated part
            print("-" * 30) # Separator
            gc.collect() # Clean up memory between generations
            if torch.cuda.is_available(): torch.cuda.empty_cache()

        print(f"  Layer {layer_idx} generation finished in {time.time() - generation_layer_start_time:.2f}s")

else:
    print(f"Error: Could not get valid tokenization for the test prompt: {test_prompt}")




--- Generating Examples with Steering (Across Layers) ---
Base Prompt: Continue a rhyming poem starting with the following line:

He saw a carrot and had to grab it

Intervention position for test prompt (last token index): 20

--- Generating for Layer 15 ---
  --- Coefficient: -10.0 (Layer 15) ---
Continue a rhyming poem starting with the following line:

He saw a carrot and had to grab it
 

He saw a carrot and had to grab it,
A vibrant orange, a juicy habit.
He plucked it from the ground with glee,
And dreamt of soups and stews, you see.

 

------------------------------
  --- Coefficient: 0.0 (Layer 15) ---
Continue a rhyming poem starting with the following line:

He saw a carrot and had to grab it
 

He saw a carrot and had to grab it,
Its orange hue, a vibrant habit.
He plucked it from the garden bed,
And held it high above his head.

He sniffed its scent, so fresh and sweet,
A crunchy treat, a tasty feat.
He took a bite, his eyes grew wide,
A burst of flavor
-----------------