In [1]:
import mamba_ssm
from nnsight import LanguageModel, util
from nnsight.tracing.Proxy import Proxy
from nnsight.models.Mamba import MambaInterp
from transformers import AutoTokenizer
import numpy as np
import torch as t
import torch.nn.functional as F
import einops
from tqdm import tqdm
from functools import partial

from rich import print as rprint
from rich.table import Table

from typing import List, Callable, Union

device = t.device("cuda:2" if t.cuda.is_available() else "cpu")

  torch.utils._pytree._register_pytree_node(


In [2]:
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neox-20b", padding_side="left")
tokenizer.pad_token_id = tokenizer.eos_token_id
mamba_model = MambaInterp("state-spaces/mamba-2.8b", device=device, tokenizer=tokenizer)
sampling_kwargs = {
    "top_p": 0.2,
    "top_k": 0,
    "repetition_penalty": 1.1,
} # in mamba_ssm/utils/generation.py

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [3]:
with mamba_model.generate(max_length=40,**sampling_kwargs,validate=False) as generator:
            
    with generator.invoke("Where is the Eiffel tower located at?",scan=True) as invoker:
        a = mamba_model.backbone.layers[0].mixer.ssm.hx.ah.output.save()
        y =  mamba_model.backbone.layers[12].mixer.ssm.yh.output.save()
        mamba_model.backbone.layers[12].mixer.ssm.yh.next()
        y2= mamba_model.backbone.layers[12].mixer.ssm.yh.output.save()
        
        pass
        


You're using a GPTNeoXTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


In [4]:
print(y.value)
print(y2.value)

tensor([[-0.0081, -0.0081,  0.0015,  ..., -0.0085, -0.0005,  0.0026]],
       device='cuda:2')
tensor([[-0.0063, -0.0069,  0.0008,  ..., -0.0076, -0.0011,  0.0057]],
       device='cuda:2')


["Where is the Eiffel tower located at?\nWhat's a good way to get there from my hotel in Paris, France?\nHow do I find out more about this place on Google Maps"]


In [3]:
def print_table(completions, titles):
    table = Table("Type:","Completion 1", "Competion 2","Competion 3", title=f"Completions for steering at different layers and coefficients", show_lines=True)
    for i in range(len(titles)):
        
        table.add_row(titles[i], completions[i][0],completions[i][1],completions[i][2])
    rprint(table)

In [61]:
def apply_steering_vector(
    model: LanguageModel,
    steering_info:List[tuple],
    new_token_length: int,
    steering_prompt: List[str],
    prompts: List[str],
):
    unsteered_completions_list = []
    steered_completions_list = []
    counter = 0
    
    with model.invoke(prompts) as invoker:
        seq_lens = [len(invoker.input["input_ids"][i]) for i in range(len(prompts))]
        prompt_seq_len = max(seq_lens)
    n_completions = len(prompts)
    middle_layer = 32
    
    for n_layer,steering_coefficient in steering_info:   
        n_layer = n_layer
        start = middle_layer-n_layer
        end = middle_layer+n_layer+1
        if start < 0:
            start = 0
            end = 64 
        with model.generate(max_length=prompt_seq_len+new_token_length,**sampling_kwargs,validate=False) as generator:
            print("Generating vectors")
            steering_prompt_len = len(steering_prompt)
            with generator.invoke(steering_prompt,scan=True) as invoker:
                extracted_state_vectors=[]
                vectors = []
                for i in range(start,end):
                    hidden_state = model.backbone.layers[i]
                    vectors.append([hidden_state.output[0], hidden_state.output[1]])
                    ssm_state = hidden_state.mixer.ssm.hx
                    ssm_state = ssm_state.next(steering_prompt_len)
                    vec = ssm_state.output
                    extracted_state_vectors.append(vec)

            print("Steering step")
            with generator.invoke(prompts) as invoker:
                pass
            
            with generator.invoke(prompts,scan=True) as invoker:
                for prompt_batch in range(n_completions):
                    for i in range(start,end):
                        hidden_state = model.backbone.layers[i]
                        vec = vectors[i-start]
                        hidden_state.output[0][prompt_batch, ...] += steering_coefficient * vec[0]
                        hidden_state.output[1][prompt_batch, ...] += steering_coefficient * vec[1]
                        ssm_state = hidden_state.mixer.ssm.hx
                        hidden_state = model.backbone.layers[i]
                        
                        vec = extracted_state_vectors[i-start]
                        ssm_state.input[0][3][prompt_batch] += steering_coefficient * vec
        if counter==0:
         unsteered_completions_list.append(generator.output[1:-n_completions])
         counter+=1
        steered_completions_list.append(generator.output[-n_completions:])
    
    return unsteered_completions_list, steered_completions_list

In [80]:


new_tokens = 30 # Number of new tokens to generate


all = []
counter = 0
for steering_prompt in [
        ('I hate you.'),
        ('I love you.'),
        ("Cats"),
        ("Birds"),
        ("The Roman Empire"),
        ("The Spanish Civil War")
    ]:
    steering_info = [
        (0, 0.1),
        (0, -0.1),
        (0, 1),
        (0, -1),
        (1, 0.1),
        (1, -0.1),
        (1, 1),
        (1, -1),
        (2, 0.1),
        (2, -0.1),
        (2, 1),
        (2, -1)
    ]
    unsteered_completions, steered_completions = apply_steering_vector(
        mamba_model,
        steering_info,
        new_tokens,
        steering_prompt,
        ["When I think about", "I am thinking about", "I keep thinking that"],

    )
    #print(unsteered_sents)
    #print(steered_sents)
    if counter == 0:
        all.append(unsteered_completions)
        counter=counter+1
    all.append(steered_completions)



Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
Steering step
1
Generating vectors
S

In [83]:
unsteered_sentences = []
all_sentences=[]
for i in all[0]:
    decoded = tokenizer.batch_decode(i)
    unsteered_sentences.append(decoded)
    all_sentences.append(decoded)
unsteered_sentences=unsteered_sentences[0]
steered_sentences = []

for i in range(1,len(all)):
    steered = all[i]
    
    for coefflayer in steered:
        decoded = tokenizer.batch_decode(coefflayer)
        steered_sentences.append(decoded)
        all_sentences.append(decoded)



In [84]:
names=["Unsteered"]
for steering_prompt in [
        ('I hate you.'),
        ('I love you.'),
        ("Cats"),
        ("Birds"),
        ("The Roman Empire"),
        ("The Spanish Civil War")
    ]: 

    for layer, coeff in steering_info:
        names.append(f"Steered {steering_prompt} by {coeff} of {layer} layers around the middle layer.")
print_table(all_sentences, names)

In [76]:
len(names)

13

Figuring out shapes

In [None]:


num_prompts = 3
new_tokens = 30 # Number of new tokens to generate

for layer, coeff in [
    (1, 1)
    ]:
    steering_info = [
        ('The city of Rome.', +coeff)
        
    ]
    steering_prompts, steering_coefficients = zip(*steering_info)
    unsteered_completions, steered_completions = apply_steering_vector(
        mamba_model,
        layer,
        new_tokens,
        steering_prompts,
        steering_coefficients,
        ["The Eiffel Tower can be found in" for _ in range(num_prompts)],
        apply_only_at_end=False
    )

    unsteered_sents = tokenizer.batch_decode(unsteered_completions)
    steered_sents = tokenizer.batch_decode(steered_completions)

    print_steering_table(unsteered_sents, steered_sents, layer, coeff)

Generating vectors
Steering step
