In [1]:
import torch
import pandas as pd
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
from dictionary import AutoEncoder



  from .autonotebook import tqdm as notebook_tqdm


In [9]:
# Load sentences from CSV file
with open('sentences_2.txt', 'r') as f:
    sentences = [line.strip() for line in f.readlines()]

df = pd.DataFrame(sentences, columns=['sentence'])
sentences = df['sentence'].tolist()

print(sentences)

['While eating a pizza he was annoying his sister.', 'I like pizza very much.', 'A pizza topped with mozzarella is my first choice.', "Why don't we order pizza?", 'We are going downtown to eat pizza.', 'Please help yourself to the pizza.', 'Pizza is my favorite food.', "Pizza is the kind of food that fits into today's life style.", 'Divide the pizza among you three.', "Hey, this pizza isn't bad. Not bad at all.", 'I could go for a nice hot pizza right now.', 'We pigged out on pizza and beer.', 'My work was to deliver pizza by motorcycle.', 'My father likes pizza very much.', 'I pigged out on pizza.', 'When I was a student, I used to go to that pizza parlor.', 'I feel like having some pizza tonight.', 'I ordered a pizza on the phone.', 'He likes such foods as tacos and pizza.', 'We pigged out on pizza and chicken at lunchtime.', "The pasta here's pretty good. And the pizza too.", 'My father loves pizza.', "The pizza delivery guy hasn't come by yet.", 'I love pizza very much.', 'After si

In [10]:
# Load the Pythia model and tokenizer
model = AutoModelForCausalLM.from_pretrained("EleutherAI/pythia-70m-deduped")
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/pythia-70m-deduped")
tokenizer.pad_token = tokenizer.eos_token

The `GPTNeoXSdpaAttention` class is deprecated in favor of simply modifying the `config._attn_implementation`attribute of the `GPTNeoXAttention` class! It will be removed in v4.48


In [12]:
# Testing tokenizer 
sentence = "I liike pizza"
tokenized_sentence = tokenizer(sentence)['input_ids']
decoded_tokens = tokenizer.convert_ids_to_tokens(tokenized_sentence)

print("Tokenized output:", decoded_tokens)

Tokenized output: ['I', 'Ġli', 'ike', 'Ġpizza']


In [13]:
activation_list = []

def hook_fn(module, input, output):
    """Hook function to capture activations from the 4th MLP layer."""
    activation_list.append(output)

# Hook 4th MLP layer (index 3)
layer_to_hook = model.gpt_neox.layers[3].mlp
hook = layer_to_hook.register_forward_hook(hook_fn)


In [14]:
# Store per-token activations
individual_activations = []
    
for sentence in sentences:
    print(f"\nProcessing: '{sentence}'")
    input_ids_batch = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)
    model(**input_ids_batch)  # Forward pass to capture activations

    if activation_list:
        activations = activation_list[-1].squeeze(0)  # Shape: (seq_len, hidden_dim)
        individual_activations.append(activations)
    activation_list.clear()

print(f"Captured activations for {len(individual_activations)} sentences.")


Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.



Processing: 'While eating a pizza he was annoying his sister.'

Processing: 'I like pizza very much.'

Processing: 'A pizza topped with mozzarella is my first choice.'

Processing: 'Why don't we order pizza?'

Processing: 'We are going downtown to eat pizza.'

Processing: 'Please help yourself to the pizza.'

Processing: 'Pizza is my favorite food.'

Processing: 'Pizza is the kind of food that fits into today's life style.'

Processing: 'Divide the pizza among you three.'

Processing: 'Hey, this pizza isn't bad. Not bad at all.'

Processing: 'I could go for a nice hot pizza right now.'

Processing: 'We pigged out on pizza and beer.'

Processing: 'My work was to deliver pizza by motorcycle.'

Processing: 'My father likes pizza very much.'

Processing: 'I pigged out on pizza.'

Processing: 'When I was a student, I used to go to that pizza parlor.'

Processing: 'I feel like having some pizza tonight.'

Processing: 'I ordered a pizza on the phone.'

Processing: 'He likes such foods as tac

In [15]:
# Load Dictionary Learning AutoEncoder
ae = AutoEncoder.from_pretrained(
    "dictionaries/pythia-70m-deduped/mlp_out_layer3/10_32768/ae.pt",
    # Let torch automatially choose device based on asvail.
    # map_location=torch.device('cpu')
)



In [16]:
# Convert activations to sparse representations
sparse_representations = []
for activations in individual_activations:
    sparse_repr = ae.encode(activations).detach().cpu().numpy()  # (seq_len, dict_size)
    sparse_representations.append(sparse_repr)
print(f"Processed {len(sparse_representations)} sentences into token-aligned sparse representations.")

Processed 101 sentences into token-aligned sparse representations.


In [22]:
# Aggregate features: Find top activated features across all tokens in all sentences
avg_feature_weights = {}
ovr_features = set([])
for sentence_features in sparse_representations:
    curr_sentence_features = set([])  # Reset curr_sentence_features for each sentence[]
    i = 0
    for token_features in sentence_features:
        # Can features be negative???
        # Does this 800 make sense anymore?
        top_indices = np.argsort(token_features)[-100:][::-1]  # Top 800 features per token
        # Aggregate all the features across a sentence. Do set intersection afterwards so that only feature common across all the sentences are considered. We'll also maintain the average value for all the seen features. then, we'll get the decoded output with these valeus. Then, we'll scale those weights alone by 10x. using  this new model, we'll do text generation and see if the feature common across all sentences shows up a lot, which would confirm our hypothesis. 
        # for idx in top_indices:
        #     feature_counts[idx] = feature_counts.get(idx, 0) + 1
        # Aggregate features across sentences. 
        for idx in top_indices:
            curr_sentence_features.add(idx)
            # This might get too big. 
            avg_feature_weights[idx] = [0, 0] if idx not in avg_feature_weights else [avg_feature_weights[idx][0] + token_features[idx], avg_feature_weights[idx][1] + 1]
            
    if len(ovr_features) == 0:
        ovr_features = set(curr_sentence_features)
    else:
        # Adding  this print statement so that we don't get here everytime, which means that there is nothing common across all the sentences
        i += 1
        if i > 1:
            print("Entering else")
        ovr_features = ovr_features.intersection(set(curr_sentence_features))
    
    
            

In [23]:
# Get the average weights of the features. 
avg_feature_weights = {k: v[0] / v[1] for k, v in avg_feature_weights.items() if k in ovr_features}

# Print subset for sanity
print(len(avg_feature_weights))
print(len(ovr_features))

# Print 5 items of avg feature weights
print(list(avg_feature_weights.items())[:5])

# Print 5 indices from itnersection
print(list(ovr_features)[:5])

# Only 6 common vectors? COuld be a problem somewhere. 

6
6
[(np.int64(20383), np.float32(14.459406)), (np.int64(21462), np.float32(1.9070497)), (np.int64(7082), np.float32(2.792452)), (np.int64(18192), np.float32(4.6524243)), (np.int64(30484), np.float32(0.6164037))]
[np.int64(7082), np.int64(18192), np.int64(30484), np.int64(21462), np.int64(9690)]


In [None]:
# Decoding
# COnstruct a sparse vector with the common features and average weights
sparse_vector = np.zeros((32768,))  # Assume dictionary size is 32768
for idx in ovr_features:
    sparse_vector[idx] = avg_feature_weights[idx]
    

# Decode the sparse vector
decoded_output = ae.decode(sparse_vector)

In [None]:
# Weight scaling

# Save the copy of the og layer weights
layer_weights_clone = model.gpt_neox.layers[3].mlp.fc_out.weight.detach().clone()
torch.save(layer_weights_clone, "og_layer_weights.pt")

layer_weights = model.gpt_neox.layers[3].mlp.fc_out.weight


# What part of the decoded outputs are significant?
# Considering non-zero indices for now
non_zero_indices = np.nonzero(decoded_output)[0]

# Scale all the non zero indices by 10
layer_weights[non_zero_indices] *= 10

# write back to the model
model.gpt_neox.layers[3].mlp.fc_out.weight = layer_weights

In [20]:
#  Test out hypothesis
test_input = "My favorite food is"

# Tokenize the input
tokenized_input = tokenizer(test_input, return_tensors="pt").input_ids

# Is there a better way to do this?
model.eval()
# Pass the tokenized input through the model
with torch.no_grad():
    output = model.generate(tokenized_input, max_length=100)

# Decode the output
decoded_output = tokenizer.decode(output[0].tolist(), skip_special_tokens=True)

# Print decoded output
print(decoded_output)

Predicted output: 
 way, the


In [92]:
# # Analyze which features activate for specific tokens
# top_n = 800  
# top_features = []

# for sentence_idx, per_token_features in enumerate(sparse_representations):
#     sentence_top_features = []
    
#     for token_idx, features in enumerate(per_token_features):
#         # Extract top N active features for this token
#         top_indices = np.argsort(features)[-top_n:][::-1]
#         sentence_top_features.append(set(top_indices))
    
#     top_features.append(sentence_top_features)  # Store per-token top feature indices

# # Example: Print feature activations for each token in the first sentence
# tokenized_sentence = tokenizer(sentences[0])['input_ids']
# decoded_tokens = tokenizer.convert_ids_to_tokens(tokenized_sentence)

# print("\nFeature activations for the first sentence:")
# for token, feature_set in zip(decoded_tokens, top_features[0]):
#     print(f"Token: {token}, Top Features: {list(feature_set)[:10]}")  # Show top 5 features

#     # Example: Print feature activations for each token in the first sentence
# tokenized_sentence = tokenizer(sentences[1])['input_ids']
# decoded_tokens = tokenizer.convert_ids_to_tokens(tokenized_sentence)

# print("\nFeature activations for the first sentence:")
# for token, feature_set in zip(decoded_tokens, top_features[0]):
#     print(f"Token: {token}, Top Features: {list(feature_set)[:10]}")  # Show top 5 features
    


In [18]:
# Select the most frequently occurring features
top_common_features = sorted(feature_counts, key=feature_counts.get, reverse=True)[:800]

# # Create a synthetic sparse vector using these common features
# synthetic_sparse_vector = np.zeros((32768,))  # Assume dictionary size is 32768
# for idx in top_common_features:
#     synthetic_sparse_vector[idx] = 1  # Set these features as active

In [19]:
print(top_common_features[:10])

[np.int64(10900), np.int64(10969), np.int64(10968), np.int64(10918), np.int64(10919), np.int64(10967), np.int64(10966), np.int64(10920), np.int64(10965), np.int64(10921)]


In [94]:
# Decode sparse vector back into model space
synthetic_dense_vector = ae.decode(torch.tensor(synthetic_sparse_vector).float()).detach().cpu()
synthetic_dense_vector *= 10  # Experiment with scaling


In [95]:
# Add a new special token
tokenizer.add_special_tokens({'additional_special_tokens': ["<XYZ>"]})
model.resize_token_embeddings(len(tokenizer))  # Resize embeddings to include new token

masked_sentence = "The capital of <XYZ> is"
input_ids = tokenizer(masked_sentence, return_tensors="pt")["input_ids"]

# Convert token IDs to tokens
decoded_tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
print(f"Tokenized input: {decoded_tokens}")  # Debugging

# **Find the placeholder index**
try:
    placeholder_index = decoded_tokens.index("<XYZ>")
except ValueError:
    raise ValueError(f"Could not find placeholder token in: {decoded_tokens}")


Tokenized input: ['The', 'Ġcapital', 'Ġof', 'Ġ', '<XYZ>', 'Ġis']


In [82]:

# Convert input_ids to embeddings
model_inputs = model.get_input_embeddings()(input_ids)

# Inject synthetic feature vector at the placeholder position
model_inputs[:, placeholder_index, :] = synthetic_dense_vector

# Generate text from modified embeddings
with torch.no_grad():
    outputs = model(inputs_embeds=model_inputs)
    logits = outputs.logits[:, -1, :]  # Get last token logits
    predicted_token_id = torch.argmax(logits, dim=-1).item()

# Decode the predicted token
predicted_word = tokenizer.decode([predicted_token_id])

print(f"Predicted token: {predicted_word}")

Predicted token:  the
