In [1]:
#Ad hoc inspection/addition of internal representaions using wrapped models
!pip install -q -U transformers 
import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from transformers import AutoTokenizer, AutoModelForCausalLM
from tqdm import tqdm
import json
import shutil
import os
from datetime import datetime
from glob import glob
import torch.nn.functional as F

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Model wrapper to handle activation reading/adding
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, StoppingCriteria

class BlockOutputWrapper(torch.nn.Module):
    def __init__(self, block):
        super().__init__()
        self.block = block
        self.last_hidden_state = None
        self.add_activations = None

    def forward(self, *args, **kwargs):
        output = self.block(*args, **kwargs)
        self.last_hidden_state = output[0]#activation
        ###print("Initial output:", output[0][:, :5])  # Debug: Print first few elements of the output tensor
        if self.add_activations is not None:
            output = (output[0] + self.add_activations,) + output[1:]#reconstruct output (with whatever other junk they throw in there, if any) with modified activations
            ###print("Modified output:", output[0][:, :5])  # Debug: Print first few elements of the modified output tensor
        return output

    def add(self, activations):
        self.add_activations = activations
        ###print("Activations set:", activations[:5])

    def reset(self):
        self.last_hidden_state = None
        self.add_activations = None
        
class StopAtTokenCriteria(StoppingCriteria):
    def __init__(self, stop_token_id):
        self.stop_token_id = stop_token_id
    
    def __call__(self, input_ids, scores):
        # Check if the last generated token is the stop token
        return input_ids[0, -1] == self.stop_token_id
    
class GPT2Helper:
    def __init__(self, pretrained_model="gpt2", device="cpu"):
        self.device = device
        self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model)
        self.model = AutoModelForCausalLM.from_pretrained(pretrained_model).to(self.device)
        for i, layer in enumerate(self.model.transformer.h):
            self.model.transformer.h[i] = BlockOutputWrapper(layer)

    def generate_text(self, prompt, max_length=100, stop_token=""):
        inputs = self.tokenizer(prompt, return_tensors="pt")
        attention_mask = inputs['attention_mask']
        generate_ids = self.model.generate(
            inputs.input_ids.to(self.device), 
            attention_mask=attention_mask.to(self.device), 
            max_length=max_length
            ,pad_token_id=self.tokenizer.eos_token_id
            ,stopping_criteria=[StopAtTokenCriteria(self.tokenizer.convert_tokens_to_ids(stop_token))] if stop_token else []
        )
        return self.tokenizer.decode(generate_ids[0][inputs['input_ids'].shape[-1]:], skip_special_tokens=True, clean_up_tokenization_spaces=False)

    def get_logits(self, tokens):
        with torch.no_grad():
            outputs = self.model(tokens.to(self.device))
            return outputs.logits
    
    def get_last_activations(self, layer):
        return self.model.transformer.h[layer].last_hidden_state

    def set_add_activations(self, layer, activations):
        self.model.transformer.h[layer].add(activations)

    def reset_all(self):
        for layer in self.model.transformer.h:
            layer.reset()

In [4]:
#device = "mps" if torch.backends.mps.is_available() else "cpu"
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
model = GPT2Helper('gpt2-medium', device)

cpu


In [15]:
p_prompt = "You are very agreeable"
n_prompt = "You are very disagreeable"
model.reset_all()
p_toks = model.tokenizer.encode(p_prompt, return_tensors="pt")[0]
n_toks = model.tokenizer.encode(n_prompt, return_tensors="pt")[0]

p_acts = []
s_out = model.get_logits(p_toks.unsqueeze(0))
for layer in range(model.model.config.n_layer):
    p_acts.append(model.get_last_activations(layer)[0, -1, :].detach().cpu())#get activations for the final token

n_acts = []
n_out = model.get_logits(n_toks.unsqueeze(0))
for layer in range(model.model.config.n_layer):
    n_acts.append(model.get_last_activations(layer)[0, -1, :].detach().cpu())#get activations for the final token

In [16]:
layer, mult = 18, 1
idx=30
stop_token = ""
max_length=60
vec = p_acts[layer] - n_acts[layer]
unit_vec = vec / torch.norm(vec, p=2)
print(n_acts[layer][1:50])
print(vec[1:50])
print(unit_vec[1:50])

tensor([ 7.1830,  1.9270,  5.1968,  6.0977,  5.3821, -2.6856, -0.2952,  0.4066,
        -0.2658, -4.0027,  2.1794,  4.6510,  6.4931,  2.6129, -6.4691,  5.0159,
         8.8746, -1.0100,  2.8153,  0.5017,  1.5510, -8.2151,  0.9558, -3.1539,
        -4.3042, -4.4566, -3.0349,  4.4153,  0.0792,  6.5177,  0.6068,  0.0648,
         3.6052,  0.7922, -5.6465, 13.3190,  0.2954,  2.6713, -8.4822, -2.9878,
         2.4930,  1.9570, -5.5974, -0.8833,  1.0815,  1.9333,  3.0459,  0.7197,
        -4.5167])
tensor([-1.6926, -0.2512, -4.1873,  0.0191, -5.9428, -1.6885,  2.6109,  0.2905,
         0.4051,  1.8282,  0.7615,  1.8177, -2.3198, -0.3884, -2.3970, -6.8944,
         0.2999,  0.5865, -3.1230, -0.7018,  4.0287, -9.9568, -1.4977,  1.7440,
        -0.1740,  1.0347, -0.7933,  5.9433, -2.7031, -1.2694, -4.1907, -1.6686,
         2.8490, -0.4332,  2.6055, -4.0529,  4.0895, -2.0123, -0.7257, -0.3868,
         4.5370, -4.2058,  2.9576,  1.1539,  3.8560,  1.2870, -8.9059,  1.5071,
        -5.8092])
tens

In [39]:
torch.norm(vec, p=2)

tensor(101.7022)

In [43]:
layer, mult = 18, 1#1-3
idx=30
stop_token = ""
max_length=60
vec = p_acts[layer] - n_acts[layer]
unit_vec = vec / torch.norm(vec, p=2)
prompt = "I am a software engineer who loves coding. Do you think everyone should learn to code?"#sentences[idx][7:len(sentences[idx])-4]
model.reset_all()
model.set_add_activations(layer, mult * vec.to(device))
sa = model.generate_text(prompt, max_length=max_length, stop_token=stop_token).strip()
model.reset_all()
model.set_add_activations(layer, -mult * vec.to(device))
na = model.generate_text(prompt, max_length=max_length, stop_token=stop_token).strip()
model.reset_all()
xa = model.generate_text(prompt, max_length=max_length, stop_token=stop_token).strip()
print(f"Prompt: |{prompt}|")
print(f"Plus Response: |{sa}|")
print(f"Minus Response: |{na}|")
print(f"Default Response: |{xa}|")

Prompt: |I am a software engineer who loves coding. Do you think everyone should learn to code?|
Plus Response: |I have a family and a family and a family and a family and a family and a family and a family and a family and a family and a family and a family and a family and a family and a|
Minus Response: |"

"

"

"

"

"

"

"

"

"

"

"

"

"|
Default Response: |I think everyone should learn to code. I think everyone should learn to code. I think everyone should learn to code. I think everyone should learn to code. I think everyone should learn to code.|
