In [286]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from transformers.models.gpt2.modeling_gpt2 import GPT2Attention
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AdamW
from tqdm import tqdm
import pandas as pd
import yaml

In [2]:
# Load model and tokenizer
model_name = "gpt2"
model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

In [3]:
#model
#for name, module in model.named_modules():
#   print(name)

In [4]:
#Next to-do:
# Write some passable version of a hypernetwork function, that accepts inputs of a context sentence and outputs 
# some stuff for 1 single layer of the GPT-2 model!

# This will be trivial as stated, since we aren't feeding in any inputs to the model as of yet


In [5]:
#Now, writing only the hypernetwork, in a simplified form which does not accept
# any input from the target model
# The only input is a text string (in this case first sentence of wikipedia)
# The output is to be taken directly from layer 8 of the gpt2 model
from transformers import GPT2LMHeadModel

class EditorModelBlind(nn.Module):
    # Separating the editor config file, from its base model's configurations
    def __init__(self, editor_yaml_file_path='config/editor_blind.yaml'):

        super().__init__()
        # Load the configuration from the YAML file
        with open(editor_yaml_file_path, 'r') as file:
            self.editor_config = yaml.safe_load(file)
        self.basemodel = GPT2LMHeadModel.from_pretrained('gpt2') #R# replace this with a statement to load from config file
        
    def forward(self, input_ids, attention_mask=None, target_cache = None):
        result = self.basemodel(input_ids, attention_mask=attention_mask, output_hidden_states=True)
        hidden_states = result.hidden_states
        #This cuts the model off at the specified layer in the config
        return hidden_states[self.editor_config["cut_layer"]]
        



        #We have a lot more parameters than we need right now, 
        #but running it thru torch.compile later will cut down waste


In [392]:
#Getting full hidden state outputs from the model with some input
inputtext = "The quick brown fox jumps over the lazy dog"
editor_ids = tokenizer.encode(inputtext, return_tensors="pt")
editor_hidden_states = model(editor_ids, output_hidden_states=True).hidden_states
editor_hidden_states[12].shape

torch.Size([1, 9, 768])

In [163]:
layer_idx = 0
testblock = model.transformer.h[layer_idx]


In [396]:
import copy
testblock.crossattention = GPT2Attention(model.config, is_cross_attention=True)


In [414]:
#Need to add layernorm. This is slightly different from gpt2 layernorm command, probably due to updates in pytorch
testblock.ln_cross_attn = nn.LayerNorm(normalized_shape = 768, eps=model.config.layer_norm_epsilon)

In [397]:
original_query_weights = testblock.attn.c_attn.weight[:,:768]
original_keys_values = testblock.attn.c_attn.weight[:,768:]
original_query_bias = testblock.attn.c_attn.bias[:768]
original_keys_values_bias = testblock.attn.c_attn.bias[768:]

In [398]:
testblock.crossattention.c_attn.weight.shape , original_keys_values.shape

(torch.Size([768, 1536]), torch.Size([768, 1536]))

In [399]:
type(testblock.crossattention.c_attn)

transformers.pytorch_utils.Conv1D

In [400]:
#Setting up the query matrix to be initialized to the same query as cross-attention:
#I intend to initialize the key & value matrices to be the same, but then applied to all 12 layers in copies
with torch.no_grad():
    # Initialize the new layer with these parameters
    testblock.crossattention.q_attn.weight = nn.Parameter(original_query_weights)
    testblock.crossattention.q_attn.bias =  nn.Parameter(original_query_bias)

    testblock.crossattention.c_attn.weight = nn.Parameter(original_keys_values)
    testblock.crossattention.c_attn.bias = nn.Parameter(original_keys_values_bias)


In [None]:
#Need to add layernorm 
testblock.ln_cross_attn = nn.LayerNorm(hidden_size = 768, eps=config.layer_norm_epsilon)


TypeError: __init__() got an unexpected keyword argument 'hidden_size'

In [407]:
import inspect
from transformers.models.gpt2.modeling_gpt2 import GPT2Block
print(inspect.getsource(GPT2Block))

class GPT2Block(nn.Module):
    def __init__(self, config, layer_idx=None):
        super().__init__()
        hidden_size = config.hidden_size
        inner_dim = config.n_inner if config.n_inner is not None else 4 * hidden_size

        self.ln_1 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)
        self.attn = GPT2Attention(config, layer_idx=layer_idx)
        self.ln_2 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)

        if config.add_cross_attention:
            self.crossattention = GPT2Attention(config, is_cross_attention=True, layer_idx=layer_idx)
            self.ln_cross_attn = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)

        self.mlp = GPT2MLP(inner_dim, config)

    def forward(
        self,
        hidden_states: Optional[Tuple[torch.FloatTensor]],
        layer_past: Optional[Tuple[torch.Tensor]] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        e

In [402]:
input_ids = tokenizer.encode("some test text", return_tensors='pt')
with torch.no_grad():  # No need to calculate gradients
        targetmodel = GPT2Model.from_pretrained('gpt2', output_hidden_states=True)  # Set output_hidden_states to True
        outputs = targetmodel(input_ids)
        hidden_states = outputs.hidden_states  # Output has shape [num_layers, batch_size, sequence_length, hidden_size]



loading configuration file config.json from cache at /Users/michaelsklar/.cache/huggingface/hub/models--gpt2/snapshots/11c5a3d5811f50298f278a704980280950aedb10/config.json
Model config GPT2Config {
  "activation_function": "gelu_new",
  "architectures": [
    "GPT2LMHeadModel"
  ],
  "attn_pdrop": 0.1,
  "bos_token_id": 50256,
  "embd_pdrop": 0.1,
  "eos_token_id": 50256,
  "initializer_range": 0.02,
  "layer_norm_epsilon": 1e-05,
  "model_type": "gpt2",
  "n_ctx": 1024,
  "n_embd": 768,
  "n_head": 12,
  "n_inner": null,
  "n_layer": 12,
  "n_positions": 1024,
  "output_hidden_states": true,
  "reorder_and_upcast_attn": false,
  "resid_pdrop": 0.1,
  "scale_attn_by_inverse_layer_idx": false,
  "scale_attn_weights": true,
  "summary_activation": null,
  "summary_first_dropout": 0.1,
  "summary_proj_to_labels": true,
  "summary_type": "cls_index",
  "summary_use_proj": true,
  "task_specific_params": {
    "text-generation": {
      "do_sample": true,
      "max_length": 50
    }
  },
 

In [415]:
encoded_hidden_states = torch.cat(hidden_states, dim=1)
res = testblock.crossattention(torch.randn(1, 3, 768), encoder_hidden_states=encoded_hidden_states)
encoded_hidden_states.shape, res[0]

(torch.Size([1, 39, 768]),
 tensor([[[ 1.9051e+01,  3.1598e+01, -1.2699e+02,  ...,  1.9869e+01,
           -9.6727e+01, -6.3696e+01],
          [ 2.0594e+01, -3.3401e+01, -5.5558e+01,  ..., -4.2343e+00,
           -4.8823e+01, -2.8938e+01],
          [ 1.7966e+01, -5.3368e+01, -0.0000e+00,  ..., -2.2162e+00,
            1.6913e+01, -9.6692e-02]]], grad_fn=<MulBackward0>))

Next, implementing the forward pass of testblock

In [416]:
testblock.forward(editor_hidden_states[0], encoder_hidden_states=encoded_hidden_states)

(tensor([[[  39.2682,  -31.1375,  -91.9388,  ...,   80.6915,   12.0526,
             -0.8885],
          [  18.6787,   69.9323,  -10.5715,  ...,  113.7082, -143.0773,
            -23.7116],
          [  68.0760,  -83.1047, -109.8782,  ...,    1.6196, -132.1950,
            -10.8186],
          ...,
          [  47.5304,  -38.1824, -139.6781,  ...,    3.6882, -129.7056,
            -36.8067],
          [  45.1635,  -30.1209, -150.8162,  ...,  114.6934,   16.1154,
            -29.9821],
          [  37.3090,  -44.7251, -123.3011,  ...,  100.8260, -107.3991,
            -25.8370]]], grad_fn=<AddBackward0>),)

In [425]:
#Seems like it worked!? Now let's make a function to generically input a layer (like testnetwork) and output the modified layer
def add_cross_attention_to_layer(model,layer_idx):
    block = model.h[layer_idx]
    block.crossattention = GPT2Attention(model.config, is_cross_attention=True)
    block.ln_cross_attn = nn.LayerNorm(normalized_shape = 768, eps=model.config.layer_norm_epsilon)
    original_query_weights = block.attn.c_attn.weight[:,:768]
    original_keys_values = block.attn.c_attn.weight[:,768:]
    original_query_bias = block.attn.c_attn.bias[:768]
    original_keys_values_bias = block.attn.c_attn.bias[768:]
    with torch.no_grad():
        # Initialize the new layer with these parameters
        block.crossattention.q_attn.weight = nn.Parameter(original_query_weights)
        block.crossattention.q_attn.bias =  nn.Parameter(original_query_bias)
        block.crossattention.c_attn.weight = nn.Parameter(original_keys_values)
        block.crossattention.c_attn.bias = nn.Parameter(original_keys_values_bias)
    return


In [427]:
for i in range(12):
    add_cross_attention_to_layer(model,i)

In [None]:
#Setup to test the forward pass
edit_instructions = "replace dog with cat" #Note to self: it's a good idea, for purposes of auto-editing, to look into how hard it is to get the editor to literally replace one set of tokens with another; since the encoding is literally the "correct" optimal solution in this case, so we can directly check if it's working on a task where the edit is equivalent to token replacement. Will tell us how many samples are necessary, can it possible work, etc.
editor_ids = tokenizer.encode(edit_instructions, return_tensors="pt")

target_ids = tokenizer.encode("the quick brown fox jumped over the lazy cat", return_tensors='pt')
with torch.no_grad():  # No need to calculate gradients
        targetmodel = GPT2Model.from_pretrained('gpt2', output_hidden_states=True)  # Set output_hidden_states to True
        outputs = targetmodel(input_ids)
        hidden_states = outputs.hidden_states  # Output has shape [num_layers, batch_size, sequence_length, hidden_size]


encoded_hidden_states = torch.cat(hidden_states, dim=1)

In [430]:
#Now, let's try to run the forward pass
res = model(editor_ids, encoder_hidden_states=encoded_hidden_states)
res[0].shape

torch.Size([1, 4, 768])

In [None]:
input_ids = tokenizer.encode("some test text", return_tensors='pt')
with torch.no_grad():  # No need to calculate gradients
        targetmodel = GPT2Model.from_pretrained('gpt2', output_hidden_states=True)  # Set output_hidden_states to True
        outputs = targetmodel(input_ids)
        hidden_states = outputs.hidden_states  # Output has shape [num_layers, batch_size, sequence_length, hidden_size]



In [None]:

# A raw (needs to be fixed) looping funciton
        for module_name in self.target_config["target_layers"]:
            #path = parse_module_path(module_name)
            #module = get_module(model, path)
            # Apply your replacement function here
            #new_module = replace_layer(module, BiasEditedTransformerLayer, bias = torch.zeros(model.config.n_embd))
            #set_module(model, path, new_module)
            module = self.model.transformer.h[module_name]
            new_module = replace_layer(module, BiasEditedTransformerLayer, bias = torch.zeros(self.target_config["target_n_embed"]))
            self.model.transformer.h[module_name] = new_module

In [419]:
model.config.add_cross_attention = True

In [418]:
#Actually, first, let's inspect the model code above the level of the block
print(inspect.getsource(GPT2Model.forward))

    @add_start_docstrings_to_model_forward(GPT2_INPUTS_DOCSTRING)
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_DOC,
        output_type=BaseModelOutputWithPastAndCrossAttentions,
        config_class=_CONFIG_FOR_DOC,
    )
    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.Tensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool

In [None]:
#Next, implementing this in all layers of model

In [None]:
import copy
for layer_idx in range(12):
    testblock = model.transformer.h[layer_idx]
    testblock.cross_attention = GPT2Attention(model.config, is_cross_attention=True)
    original_query_weights = testblock.attn.c_attn.weight[:,:768]
    original_keys_values = testblock.attn.c_attn.weight[:,768:]
    original_query_bias = testblock.attn.c_attn.bias[:768]
    original_keys_values_bias = testblock.attn.c_attn.bias[768:]   
    with torch.no_grad():
    # Initialize the new layer with these parameters
    testblock.cross_attention.q_attn.weight = nn.Parameter(original_query_weights)
    testblock.cross_attention.q_attn.bias =  nn.Parameter(original_query_bias)

    testblock.cross_attention.c_attn.weight = nn.Parameter(original_keys_values)
    testblock.cross_attention.c_attn.bias = nn.Parameter(original_keys_values_bias)


transformers.pytorch_utils.Conv1D

In [None]:
testblock

GPT2Block(
  (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (attn): GPT2Attention(
    (c_attn): Conv1D()
    (c_proj): Conv1D()
    (attn_dropout): Dropout(p=0.1, inplace=False)
    (resid_dropout): Dropout(p=0.1, inplace=False)
  )
  (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (mlp): GPT2MLP(
    (c_fc): Conv1D()
    (c_proj): Conv1D()
    (act): NewGELUActivation()
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (cross_attention): GPT2Attention(
    (c_attn): Conv1D()
    (q_attn): Conv1D()
    (c_proj): Conv1D()
    (attn_dropout): Dropout(p=0.1, inplace=False)
    (resid_dropout): Dropout(p=0.1, inplace=False)
  )
)

In [None]:
input_ids = tokenizer.encode("some test text", return_tensors='pt')
with torch.no_grad():  # No need to calculate gradients
        model = GPT2Model.from_pretrained('gpt2', output_hidden_states=True)  # Set output_hidden_states to True
        outputs = model(input_ids)
        hidden_states = outputs.hidden_states  # Output has shape [num_layers, batch_size, sequence_length, hidden_size]
print(hidden_states[2].shape)


loading configuration file config.json from cache at /Users/michaelsklar/.cache/huggingface/hub/models--gpt2/snapshots/11c5a3d5811f50298f278a704980280950aedb10/config.json
Model config GPT2Config {
  "activation_function": "gelu_new",
  "architectures": [
    "GPT2LMHeadModel"
  ],
  "attn_pdrop": 0.1,
  "bos_token_id": 50256,
  "embd_pdrop": 0.1,
  "eos_token_id": 50256,
  "initializer_range": 0.02,
  "layer_norm_epsilon": 1e-05,
  "model_type": "gpt2",
  "n_ctx": 1024,
  "n_embd": 768,
  "n_head": 12,
  "n_inner": null,
  "n_layer": 12,
  "n_positions": 1024,
  "output_hidden_states": true,
  "reorder_and_upcast_attn": false,
  "resid_pdrop": 0.1,
  "scale_attn_by_inverse_layer_idx": false,
  "scale_attn_weights": true,
  "summary_activation": null,
  "summary_first_dropout": 0.1,
  "summary_proj_to_labels": true,
  "summary_type": "cls_index",
  "summary_use_proj": true,
  "task_specific_params": {
    "text-generation": {
      "do_sample": true,
      "max_length": 50
    }
  },
 

torch.Size([1, 3, 768])


torch.Size([1, 3, 9984])

In [360]:
# Stuck here in testing! I'm trying and failing to reproduce the cross-attention pass, when I push thru some data
# Perhaps it is connected to the internal error-handling of the attention layer with nones for attention masks?
# should ask Ben!
# testcrossattnoutput = testblock.cross_attention.q_attn(hidden_tensor)


torch.Size([1, 3, 768])

In [None]:
#OK, I think that worked! Now I need to figure out how to add this cross-attention layer to every layer in the model

TypeError: tuple indices must be integers or slices, not tuple

In [271]:
testcrossattnoutput = testblock.cross_attention(
    hidden_states = hidden_states[0], attention_mask=None, head_mask=None, encoder_hidden_states=hidden_tensor, encoder_attention_mask=None, output_attentions=True)
#                 hidden_states,
#                 attention_mask=attention_mask,
#                 head_mask=head_mask,
#                 encoder_hidden_states=encoder_hidden_states,
#                 encoder_attention_mask=encoder_attention_mask,
#                 output_attentions=output_attentions,
#             )

RuntimeError: The expanded size of the tensor (768) must match the existing size (2304) at non-singleton dimension 1.  Target sizes: [3, 768].  Tensor sizes: [2304]

In [203]:
def get_all_hidden_states(input_text, model, tokenizer):

    # Encode text input to tensor
    input_ids = tokenizer.encode(input_text, return_tensors='pt')

    # Get all hidden states
    with torch.no_grad():  # No need to calculate gradients
        outputs = model(input_ids)
        hidden_states = outputs.hidden_states  # Output has shape [num_layers, batch_size, sequence_length, hidden_size]

    return hidden_states
testcache = get_all_hidden_states("thisisatest", model, tokenizer)

In [205]:
dor(testcache)

NameError: name 'dor' is not defined

In [None]:
#Next, we shall have to check if our modified cross-attention actually works!!
#First, we need to get some encodings, from the target cache
# Then, we need to try feeding it thru running the attention module


#If that works, we can try to add it to testblock, and see if the whole block is willing to run

In [None]:
#Now, writing the editor model in such a way that it can accept huge inputs from the target model cache.

#We can vectorize the residual sterams by collapsing over layers; but it is still tensorized by token position and batch size
# The input cache variable has shape (batch_size, sequence_length, cache_size) where cache_size = hidden_size * num_layers in the target network
# Although, looking further down, it seems nice to keep the cache in-place so we can later read from it in layer-wise form

#We must write the following new nn.Modules:

# "Secondary transformer layer" which has a residual stream, feedforward, and layernorm as usual [everything same as the base model]
#[in practice, this is probably sufficient to just literally copy over everything as a second instance of the base model transformer? 
# But we must add:

#1
#2. "Secondary attention" which uses MultiHead Attention, queries the base-text editor [IN THE PARALLEL MODEL] ; keys the target cache [TARGET MODEL] ; and values the target cache [TARGET MODEL] ; and outputting to the secondary residual stream in the layer above
# Note: this should probably be in addition to, rather than instead-of, the original attention mechanism of gpt-2? That makes things easy, too, since we don't have to delete stuff. 
# Just need to ensure the whole module has access to the cache, and that we can add this sub-module in the right place.
# 3. "Tertiary attention" which replaces the output module of the secondary editor. 
# Each of l (= num_layers_target) tertiary attention heads will"
    # queries the final token and layer of the secondary residual stream [EDITOR MODEL]; 
    # keys the target cache in a specific layer; and 
    # values the final token and layer of the secondary residual stream [EDITOR MODEL];
# The attentional softmax will be taken simultaneously over all tokens and all layers, incentivizing a single editing intervention bottleneck

# 4. Decide what to do with for the secondary editor model's input embeddings. We could literally keep this as a copy of the base model's input embeddings, so that it has some direct text access to the instructions;
# but maybe that's not the correct choice. We could decide to delete the embeddings entirely.

# 5. Modify the target model class, so that it can accept changes to all layers AND all token positions
# 6. Add a penalty for the L-2 norm of the interventions vector [this should be normalized somehow; normalized to the typical norm of activations]

# Finally: we should also append a final placeholder token to all text in the editor model! This might be a simple padding change, but we should make sure to implement it in the dataset class.


# from transformers import GPT2LMHeadModel
# class EditorModelOmniscient(nn.Module):
#     # Separating the editor config file, from its base model's configurations
#     def __init__(self, editor_yaml_file_path='config/editor_omniscient.yaml'):

#         super().__init__()
#         # Load the configuration from the YAML file
#         with open(editor_yaml_file_path, 'r') as file:
#             self.editor_config = yaml.safe_load(file)
#         self.basemodel = GPT2LMHeadModel.from_pretrained('gpt2') 
#         self.secondarymodel = GPT2LMHeadModel.from_pretrained('gpt2')
        


#     def forward(self, input_ids, attention_mask=None, target_cache = None):
#         basemodel_hiddenstates = self.basemodel(input_ids, attention_mask=attention_mask, output_hidden_states=True).hidden_states
        

#         #If we want to cut off the model at a certain layer, we can do so here
#         hidden_states[self.editor_config["cut_layer"]]
        
#         else:
#             #write code here for using the target cache architecture
#             return 



        #We have a lot more parameters than we need right now, 
        #but running it thru torch.compile later will cut down waste

In [6]:
# Load the model and tokenizer
model_name = "gpt2"
model = EditorModel()
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

In [7]:
# Example usage
def get_layer_8_residual(text, model, tokenizer):
    # Tokenize the input text
    inputs = tokenizer(text, return_tensors="pt")

    # Forward pass through the modified model
    with torch.no_grad():
        residual_output = model(**inputs)

    return residual_output

text = "Your example text here."
residual_output = get_layer_8_residual(text, model, tokenizer)
print(residual_output.shape)

torch.Size([1, 5, 768])


In [8]:
residual_output[0,-1,:].shape
# This will be our actual output.
# For now, we are going to pipe it very directly into all tokens of the residual stream of the target network

torch.Size([768])

In [9]:
model.basemodel.transformer.h[7]

GPT2Block(
  (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (attn): GPT2Attention(
    (c_attn): Conv1D()
    (c_proj): Conv1D()
    (attn_dropout): Dropout(p=0.1, inplace=False)
    (resid_dropout): Dropout(p=0.1, inplace=False)
  )
  (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (mlp): GPT2MLP(
    (c_fc): Conv1D()
    (c_proj): Conv1D()
    (act): NewGELUActivation()
    (dropout): Dropout(p=0.1, inplace=False)
  )
)

In [18]:
class BiasEditedTransformerLayer(torch.nn.Module):
    def __init__(self, layer, bias):
        super(BiasEditedTransformerLayer, self).__init__()
        self.layer = layer
        self.bias = bias
        # Assuming the bias is a fixed tensor; its size depends on the model's hidden size
    
    def update_bias(self, new_bias):
        self.bias = new_bias

    def forward(self, x, *args, **kwargs):
        # Apply the original layer
        hidden_states, present = self.layer(x, *args, **kwargs)

        # Add the fixed bias to the residual connection
        # This assumes that 'hidden_states' is the output you want to modify
        hidden_states = hidden_states + self.bias

        return hidden_states, present

# Example usage:
# from transformers import GPT2Model
# model = GPT2Model.from_pretrained('gpt2')
# model.h[7] = BiasEditedTransformerLayer(model.h[7], bias = torch.zeros(model.config.n_embd))

In [19]:
def replace_layer(module, replacement_fn, **kwargs):
    return replacement_fn(module, **kwargs)

def parse_module_path(path):
    """ Parse the module path into a list of keys """
    return path.split('.')

def get_module(model, path):
    """ Traverse the model to get the module at the specified path """
    module = model
    for key in path:
        module = getattr(module, key)
    return module

def set_module(model, path, new_module):
    """ Set the new module at the specified path in the model """
    *parent_path, key = path
    parent_module = get_module(model, parent_path)
    setattr(parent_module, key, new_module)

In [38]:
class TargetModel(nn.Module):
    # Input the target model's configuration file
    def __init__(self, target_yaml_file_path='config/target.yaml'):
        super().__init__()
        
        # Load the configuration from the YAML file
        #R# Replace gpt2 with loading of a generic model
        with open(target_yaml_file_path, 'r') as file:
            self.target_config = yaml.safe_load(file)
        self.model = GPT2LMHeadModel.from_pretrained('gpt2') 
        
        #Now we iterate over the layers in the list of target layers, and add bias parameters to them
        for module_name in self.target_config["target_layers"]:
            #path = parse_module_path(module_name)
            #module = get_module(model, path)
            # Apply your replacement function here
            #new_module = replace_layer(module, BiasEditedTransformerLayer, bias = torch.zeros(model.config.n_embd))
            #set_module(model, path, new_module)
            module = self.model.transformer.h[module_name]
            new_module = replace_layer(module, BiasEditedTransformerLayer, bias = torch.zeros(self.target_config["target_n_embed"]))
            self.model.transformer.h[module_name] = new_module

    # Freeze the parameters of the target model
    #def freeze_parameters(self):
        for param in self.parameters():
            param.requires_grad = False

    def update_bias(self, new_bias):
        #This function only currently accepts a single bias intervention! 
        for module_name in self.target_config["target_layers"]:
            #path = parse_module_path(module_name)
            #module = get_module(model, path)
            self.model.transformer.h[module_name].update_bias(new_bias) #Currently always the same bias for all layers

    def forward(self, *args, **kwargs):
        return self.model(*args, **kwargs)
            

#Example usage:
            
#Initializing the target model and Updating the bias of the target model
#target_model = TargetModel()
#new_bias = torch.zeros(model.config.n_embd)
#target_model.update_bias(new_bias)
#target_model.model.transformer

In [39]:
#Constructing the wikipedia dataset that we'll use to train the hypernetwork
df = pd.read_csv("/Users/michaelsklar/aiplay/wikipedia/wikipedia_three_sentences.csv",nrows = 100)
# Define a fixed sequence length
def tokenize_and_pad(text, max_length=50):
    return tokenizer(text, max_length=max_length, padding='max_length', truncation=True, return_tensors="pt")

df['editor_input_tokenized'] = df['first_sentences'].apply(lambda x: tokenize_and_pad(x))
df['target_input_tokenized'] = df['second_sentences'].apply(lambda x: tokenize_and_pad(x))

model

In [40]:
#df['editor_input_tokenized'][0]

In [67]:
class HypernetworkEditor(nn.Module):
    def __init__(self,editor_yaml_file_path:str,
                 target_yaml_file_path:str):
        
        super().__init__()

        #self.config = config #currently does nothing. Keeping as placeholder
        with open(editor_yaml_file_path, 'r') as file:
            self.editor_config = yaml.safe_load(file)

        self.editor_model = EditorModel(editor_yaml_file_path=editor_yaml_file_path)
        self.target_model = TargetModel(target_yaml_file_path=target_yaml_file_path)

        #malmen had some nonsense here for managing the list of intervention layers
        #We are keeping things simple with one one trivial intervention layer for now

        self.opt = torch.optim.Adam(
            self.editor_model.parameters(),
            float(self.editor_config["meta_lr"])
        )

        self.lossfn = nn.CrossEntropyLoss()

    def check_config(self):
        #Check that dimensions of the hypernetwork output match the target model's bias width and also matches the config file
        return True

    def forward(self, editor_inputs, target_inputs, gradients = True):
        #Batching not yet implemented! currently works only for a single datapoint at a time. 
        if gradients:
            hypernetwork_output = self.editor_model(editor_inputs["input_ids"], attention_mask=editor_inputs["attention_mask"])
            self.target_model.update_bias(hypernetwork_output)
            prediction = self.target_model(target_inputs["input_ids"], attention_mask=target_inputs["attention_mask"])
        else:
            with torch.no_grad():
                hypernetwork_output = self.editor_model(editor_inputs["input_ids"], attention_mask=editor_inputs["attention_mask"])
                self.target_model.update_bias(hypernetwork_output)
                prediction = self.target_model(target_inputs["input_ids"], attention_mask=target_inputs["attention_mask"])
        return prediction
        
    def train(self, dataframe, batch_size = 1, epochs = 1):

        for epoch in range(epochs):
            # Create a tqdm progress bar
            with tqdm(total=len(dataframe), desc=f"Epoch {epoch + 1}/{epochs}", unit='batch') as pbar:
                for i in range(0, len(dataframe), batch_size):
                    self.opt.zero_grad()
                    
                    # Assuming these lines extract batches correctly from your dataframe
                    self.editor_inputs = dataframe['editor_input_tokenized'][i:i+batch_size]
                    self.target_inputs = dataframe['target_input_tokenized'][i:i+batch_size] 
                    self.prediction = self.forward(self.editor_inputs, self.target_inputs, gradients=True)
                    self.y = dataframe["target_input_tokenized"][i]["input_ids"][:, 1:]
                    self.num_tokens = self.prediction.logits.shape[-1]
                    self.loss = self.lossfn(self.prediction.logits[:, :-1, :].view(-1, self.num_tokens), self.y.view(-1))
                    
                    self.loss.backward()
                    self.opt.step()

                    # Update progress bar
                    pbar.update(batch_size)

In [68]:
HyperEditor = HypernetworkEditor(editor_yaml_file_path='config/editor.yaml', target_yaml_file_path='config/target.yaml')

In [69]:
#HyperEditor.prediction.logits[:, :-1, :].view(-1,50257).shape , HyperEditor.y.view(-1).shape

In [70]:
HyperEditor.train(df, batch_size = 1, epochs = 1)

Epoch 1/1:   0%|          | 0/100 [00:00<?, ?batch/s]

Epoch 1/1: 100%|██████████| 100/100 [02:24<00:00,  1.45s/batch]


In [344]:
output_target = HyperEditor.target_model(df['target_input_tokenized'][0]["input_ids"], attention_mask=df['target_input_tokenized'][0]["attention_mask"])
output_editor = HyperEditor.editor_model(df['editor_input_tokenized'][0]["input_ids"], attention_mask=df['editor_input_tokenized'][0]["attention_mask"])
output_target.logits.shape, output_editor.shape

(torch.Size([1, 50, 50257]), torch.Size([1, 50, 768]))

NameError: name 'HypernetworkEditor' is not defined

In [249]:
df.head()

Unnamed: 0,title,first_sentences,second_sentences,third_sentences
0,Anarchism,Anarchism is a political philosophy and moveme...,Anarchism calls for the abolition of the state...,"As a historically left-wing movement, placed o..."
1,Autism,Autism is a neurodevelopmental disorder charac...,Parents often notice signs during the first th...,"These signs often develop gradually, though so..."
2,A,"A, or a, is the first letter and the first vow...","Its name in English is a (pronounced ), plural...",It is similar in shape to the Ancient Greek le...
3,Alabama,Alabama () is a state in the Southeastern regi...,Alabama is the 30th largest by area and the 24...,states.
4,Abraham Lincoln,"Abraham Lincoln (; February 12, 1809 – April 1...",Lincoln led the nation through the American Ci...,economy..


In [None]:
from typing import Tuple

import torch
import torch.nn as nn


class RunningMeanStd(nn.Module):

    def __init__(self, size: int):
        super().__init__()

        self.register_buffer("n", torch.zeros(1))
        self.register_buffer("mean", torch.zeros((size)))
        self.register_buffer("var", torch.zeros((size)))
        self.register_buffer("std", torch.zeros((size)))

    def update(self, x: torch.FloatTensor):

        n = self.n + x.shape[0]
        delta = x.mean(0) - self.mean
        self.mean += x.shape[0] * delta / n
        self.var += x.shape[0] * x.var(0) + self.n * x.shape[0] * delta.pow(2) / n
        self.std = (self.var / (n - 1 + torch.finfo(x.dtype).eps)).sqrt()
        self.n = n
              
    def forward(self, x: torch.FloatTensor) -> torch.FloatTensor:

        return (x - self.mean) / (self.std + torch.finfo(x.dtype).eps)


class MALMENBlock(nn.Module):

    def __init__(self, size: int, rank: int, n_modules: int):
        super().__init__()

        self.A = nn.Parameter(torch.randn(size, rank))
        self.B = nn.Parameter(torch.zeros(rank, size))
        self.bias = nn.Parameter(torch.zeros(size))
        
        self.scale = nn.Embedding(n_modules, size)
        self.shift = nn.Embedding(n_modules, size)
        
        self.scale.weight.data.fill_(1)
        self.shift.weight.data.fill_(0)

    def forward(
        self,
        y: torch.FloatTensor,
        module_idx: torch.LongTensor
    ) -> torch.FloatTensor:

        x = y @ self.A @ self.B + self.bias
        x = x.clamp(0)
        x = self.scale(module_idx) * x + self.shift(module_idx)
        x = x + y

        return x


class MALMENNet(nn.Module):

    def __init__(
        self,
        key_size: int,
        value_size: int,
        rank: int,
        n_blocks: int,
        n_modules: int,
        lr: float
    ):
        super().__init__()
        self.key_size = key_size
        self.value_size = value_size

        self.normalizer = RunningMeanStd(key_size + value_size)
        self.blocks = nn.ModuleList([
            MALMENBlock(key_size + value_size, rank, n_modules)
            for _ in range(n_blocks)
        ])

        self.lr = nn.Embedding(n_modules, 1)
        self.lamda = nn.Embedding(n_modules, 1)
        
        self.lr.weight.data.fill_(lr)
        self.lamda.weight.data.fill_(0)
        
    def forward(
        self,
        keys: torch.FloatTensor,
        values_grad: torch.FloatTensor,
        module_idx: torch.LongTensor
    ) -> Tuple[torch.FloatTensor]:

        hidden_states = torch.cat((keys, values_grad), -1)
        hidden_states = self.normalizer(hidden_states)
        for block in self.blocks:
            hidden_states = block(hidden_states, module_idx)
        return hidden_states.split([self.key_size, self.value_size], -1)