# Activation addition in Llama with custom wrappers

This notebook shows how to extract and manipulate internal activations of Llama Transformer model. All you need is access to a trained model (either you have it downloaded locally and update the `model_path` accordingly or you have access to models via Huggingface and get an [authentication token](https://huggingface.co/docs/hub/security-tokens).) You also might wanna make sure you have a gpu.

In [1]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import os
from datetime import datetime

In [24]:
# enter your authentication token from huggingface and press enter to access the models
auth_token = input()




In [5]:
# load model
# model_path = "/data/private_models/cais_models/llama/llama_hf_weights_v1.1/llama-7b"
model_path = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_path, use_auth_token=auth_token)
model = AutoModelForCausalLM.from_pretrained(model_path, use_auth_token=auth_token)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [6]:
print(model)

LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(32000, 4096, padding_idx=0)
    (layers): ModuleList(
      (0): LlamaDecoderLayer(
        (self_attn): LlamaAttention(
          (q_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (k_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (v_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (o_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (rotary_emb): LlamaRotaryEmbedding()
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (down_proj): Linear(in_features=11008, out_features=4096, bias=False)
          (up_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (act_fn): SiLUActivation()
        )
        (input_layernorm): LlamaRMSNorm()
        (post_attention_layernorm): LlamaRMSNorm()
      )
      (1): LlamaDecoderLayer(
      

In [7]:
# wrapping classes

class WrappedBlock(torch.nn.Module):
    def __init__(self, block, block_name):
        super().__init__()
        self.block_name = block_name
        self.block = block
        self.output = None
        self.to_add = None
        self.token_pos = None

    def forward(self, *args, **kwargs):
        output = self.block(*args, **kwargs)
        
        if isinstance(output, tuple):
            self.output = output[0]
            modified = output[0]
        else:
            self.output = output
            modified = output
            
        if self.to_add is not None:
            if len(self.to_add.shape)>1:
                # add the several individual streams of activation vec to beginning
                assert self.to_add.shape[0] <= modified.shape[1], "to_add residuals longer than output"
                modified[:, :self.to_add.shape[0], :] = modified[:, :self.to_add.shape[0], :] + self.to_add.unsqueeze(0)                
            elif self.token_pos:
                assert len(self.to_add.shape) == 1, "to_add should be only one vector"
                print(self.token_pos)
                for pos in self.token_pos:
                    modified[:, pos, :] = modified[:, pos, :] + self.to_add.unsqueeze(0)  
            else: 
                assert len(self.to_add.shape) == 1, "to_add should be only one vector"
                modified = modified + self.to_add 
            
        if isinstance(output, tuple):
            output = (modified,) + output[1:] 
        else:
            output = modified
        return output

    def set_to_add(self, activations, token_pos):
        self.to_add = activations.squeeze()
        self.token_pos = token_pos
        
    def reset(self):
        self.output = None
        self.to_add = None
        self.token_pos = None

    
class WrappedModel(torch.nn.Module):
    def __init__(self, model, tokenizer, layer_id, block_name):
        super().__init__()
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = model.to(self.device)
        self.tokenizer = tokenizer
        
        self.layer_id = layer_id
        self.block_name = block_name
        
        # make sure model is not wrapped twice
        self.model = self._unwrap()
        self.wrapped_block = None

        self._wrap_block()

    def forward(self, *args, **kwargs):
        return self.model(*args, **kwargs)
        
    def generate(self, prompt, max_new_tokens=100, random_seed=0):
        torch.random.manual_seed(random_seed)
        inputs = self.tokenizer(prompt, return_tensors="pt")
        generate_ids = self.model.generate(inputs.input_ids.to(self.device), max_new_tokens=max_new_tokens, use_cache=False)
        return self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
    
    def generate_with_reset(self, prompt, max_new_tokens=100, random_seed=0):
        output = self.run_prompt(prompt)
        self.reset_all()
        generate_ids = self.model.generate(past_key_val=output.past_key_values, 
                                           use_cache=True,
                                           max_new_tokens=max_new_tokens)
        return self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
    
    
    def get_logits(self, tokens):
        with torch.no_grad():
            logits = self.model(tokens.to(self.device)).logits
            return logits
        
    def run_prompt(self, prompt, **kwargs):
        inputs = self.tokenizer(prompt, return_tensors="pt") 
        input_ids = inputs.input_ids.to(self.device)
        output = self.model(input_ids, **kwargs)
        return output
 
    def _wrap_block(self):
        if self.block_name == 'self_attn':
            block = self.model.model.layers[self.layer_id].self_attn
            self.model.model.layers[self.layer_id].self_attn = WrappedBlock(block,"self_attn")
            self.wrapped_block = self.model.model.layers[self.layer_id].self_attn
        elif self.block_name == 'mlp':
            block = self.model.model.layers[self.layer_id].mlp
            self.model.model.layers[layer_id].mlp = WrappedBlock(block,"mlp")
            self.wrapped_block = self.model.model.layers[self.layer_id].mlp
        elif self.block_name == 'input_layernorm':
            block = self.model.model.layers[self.layer_id].input_layernorm
            self.model.model.layers[layer_id].input_layernorm = WrappedBlock(block,"input_layernorm")
            self.wrapped_block = self.model.model.layers[self.layer_id].input_layernorm
        elif self.block_name == 'post_attention_layernorm':
            block = self.model.model.layers[self.layer_id].post_attention_layernorm
            self.model.model.layers[layer_id].post_attention_layernorm = WrappedBlock(block,"post_attention_layernorm")
            self.wrapped_block = self.model.model.layers[self.layer_id].post_attention_layernorm
        elif self.block_name == 'decoder_block':
            block = self.model.model.layers[self.layer_id]
            self.model.model.layers[self.layer_id] = WrappedBlock(block,"decoder_block")
            self.wrapped_block = self.model.model.layers[self.layer_id]
        else:
            return f"No wrapped block named {block_name}."

            
    def get_activations(self):
        return self.wrapped_block.output

    def set_to_add(self, activations, token_pos=None):
        self.wrapped_block.set_to_add(activations, token_pos)
        
    def reset(self):
        self.wrapped_block.reset()
    
    def _is_wrapped(self, block):
        if hasattr(block, 'block'):
            return True
        return False
    
    def _unwrap(self):
        for l, layer in enumerate(self.model.model.layers):
            if self._is_wrapped(layer):
                self.model.model.layers[l] = layer.block
            if self._is_wrapped(self.model.model.layers[l].self_attn):
                    self.model.model.layers[l].self_attn = self.model.model.layers[l].self_attn.block
            if self._is_wrapped(self.model.model.layers[l].mlp):
                    self.model.model.layers[l].mlp = self.model.model.layers[l].mlp.block
            if self._is_wrapped(self.model.model.layers[l].input_layernorm):
                    self.model.model.layers[l].input_layernorm = self.model.model.layers[l].input_layernorm.block
            if self._is_wrapped(self.model.model.layers[l].post_attention_layernorm):
                    self.model.model.layers[l].post_attention_layernorm = self.model.model.layers[l].post_attention_layernorm.block

        return self.model

In [8]:
# helper functions

def apply_activation_difference(wrapped_model, layer_id, block_name, sentences, prompt1, prompt2, coeff=150, token_pos=None, max_new_tokens=20, textfile=None):
    """
    wrapped_model: our wrapped model
    sentences: list of strings that are modified with internal activations
    prompt1: first string to encode
    prompt2: 2nd string to encode
    coeff: coefficient with which to multiply normalized difference between prompt1 and prompt2
    token_pos: list of token positions where the difference is applied (if set to None it is applied to every token)
    block_name, layer_id, just for writing it in the text file
    """
    
    formatted_string = f"Parameters:\n\
    prompt1: '{prompt1}'\n\
    prompt2: '{prompt2}'\n\
    coeff: {coeff}\n\
    layer_id: {layer_id}\n\
    block_name: '{block_name}'\n\
    token_pos: {token_pos}\n\
    max_new_tokens: {max_new_tokens}\n\n\
    "


    if textfile:
        with open(textfile, 'w') as f:
            f.write(formatted_string)
            
    def append_to_file(text):
        if textfile:
            with open(textfile, 'a') as f:
                f.write(text)
                
    print("-"*30)
    print("-"*30)
    wrapped_model.reset()
    print("No activation addition:\n")
    append_to_file("\nNo activation addition:\n\n")
    for sentence in sentences:
        generated = wrapped_model.generate(sentence, max_new_tokens=max_new_tokens)
        print(f"{generated}\n")
        append_to_file(f"{generated}\n\n")

    # get internal activations
    wrapped_model.reset()
    wrapped_model.run_prompt(prompt1)
    activations1 = wrapped_model.get_activations()
    wrapped_model.reset()
    wrapped_model.run_prompt(prompt2)
    activations2 = wrapped_model.get_activations()

    diff = activations1[0,-1,:]-activations2[0,-1,:]
    diff = diff/diff.norm()

    wrapped_model.reset()
    wrapped_model.set_to_add(coeff*diff, token_pos=token_pos)
    print("-"*30 + "\nPositive coefficient:\n")
    append_to_file("-"*30 + "\nPositive coefficient:\n\n")
    for sentence in sentences:
        generated = wrapped_model.generate(sentence, max_new_tokens=max_new_tokens)
        print(f"{generated}\n")
        append_to_file(f"{generated}\n\n")
        
    wrapped_model.reset()
    wrapped_model.set_to_add(-coeff*diff, token_pos=token_pos)
    print("-"*30 + "\nNegative coefficient:\n")
    append_to_file("-"*30 + "\nNegative coefficient:\n\n")
    for sentence in sentences:
        generated = wrapped_model.generate(sentence, max_new_tokens=max_new_tokens)
        print(f"{generated}\n")
        append_to_file(f"{generated}\n\n")

        
def make_new_file():# to save results
    path = "results/activation_addition"
    # Create directories recursively
    os.makedirs(path, exist_ok=True)
    # Get current date and time
    now = datetime.now()
    # Format as string in the desired format (here as YearMonthDay_HourMinuteSecond)
    filename = now.strftime("%Y%m%d_%H%M%S.txt")
    full_path = os.path.join(path, filename)
    return full_path

In [9]:
# small observation
layer_id = 25
wrapped_model = WrappedModel(model, tokenizer, layer_id=layer_id, block_name="decoder_block")
output = wrapped_model.run_prompt("some test prompt", output_hidden_states=True, output_attentions=True, return_dict=True)
# output.attentions are just the attention values NOT the self attention outputs
# output.attentions[0].shape is torch.Size([1, 32, 3, 3])
# len(output.attentions) is 32
# output.hidden_states are the outputs of decoder_block, but shifted
# output.hidden_states[0] are the inputs_embeds
print((output.hidden_states[layer_id+1]==wrapped_model.model.model.layers[layer_id].output).all())

tensor(True, device='cuda:0')


# Activation addition

Modify the code as you want. Important parameters are:

* `layer_id`: layer at which activation addition is performed
* `block_name`: block at which activation addition is performed. Must be one of `["decoder_block", "self_attn", "mlp", 'input_layernorm", "post_attention_layernorm"]`
* `sentences`: list of strings that are modified with internal activations
* `prompt1`: first string to encode
* `prompt2`: 2nd string to encode
* `coeff`: coefficient with which to multiply normalized difference between `prompt1` and `prompt2`
* `token_pos`: list of token positions where the difference is applied (if set to None it is applied to every token)
* `max_new_tokens`: how many new tokens to generate
* `textfile`:       a textfile to save results to (you can generate a timestamped one with `make_new_file()`)

The function `apply_activation_difference` does the activation addition by calculating internal representations of `prompt1` and `prompt2` (at the layer and block determined when wrapping the model). The difference is calculated and normalized. This difference is the activation that is added. The function then loops through the given `sentences` and adds the normalized difference multiplied by `coeff` to the internal activations of each sentence at `token_pos`, generating new text. 

In [10]:
# WRAP MODEL
# layer_id:   layer at which activation addition is performed
# block_name: block at which activation addition is performed. 
#             must be one of ["decoder_block", "self_attn", "mlp", 'input_layernorm", "post_attention_layernorm"]
layer_id=25
block_name="decoder_block"
wrapped_model = WrappedModel(model, tokenizer, layer_id=layer_id, block_name=block_name)


In [21]:
# sentences:      list of strings that are modified with internal activations
# prompt1:        first string to encode
# prompt2:        2nd string to encode
# coeff:          coefficient with which to multiply normalized difference between prompt1 and prompt2
# token_pos:      list of token positions where the difference is applied 
#                 (if set to None it is applied to every token)
# max_new_tokens: how many new tokens to generate
# textfile:       a textfile to save results to (you can generate a timestamped one with make_new_file())

sentences = [
    "I think dogs are ", 
    "I think cats are ", 
    "I hate you because",
    "I love you because",
    "I think of you as",    
    "I think you're",
    "Today I feel",
]
prompt1 = "Love"
prompt2 = "Hate"
coeff = 80
token_pos = None
max_new_tokens = 20
textfile = None

In [22]:
apply_activation_difference(wrapped_model, layer_id, block_name, sentences, prompt1, prompt2, coeff=coeff, 
                            token_pos=token_pos, max_new_tokens=max_new_tokens, textfile=textfile)

------------------------------
------------------------------
No activation addition:

I think dogs are 100% better than cats.
I think dogs are 100% better

I think cats are 100% better than dogs.
I think cats are 100% better

I hate you because you are beautiful.
I hate you because you are beautiful. I hate you because you are beautiful

I love you because you are you. I love you because you are you. I love you because you are you.

I think of you as a friend.
I think of you as a friend.
I think of you as a friend

I think you're right. I'm not sure what the problem is, but I'm sure it's

Today I feel like I'm in a bit of a rut. I'm not sure what it is

------------------------------
Positive coefficient:

I think dogs are 100% better than cats. I're a cat person, but I're

I think cats are 100% better than dogs. I're a cat person.
I're a

I hate you because you are a woman. I hate you because you are a woman. I hate you because you are

I love you because you are my best friend. I lo