In [2]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import json
model_name = "Qwen/Qwen2.5-Math-1.5B-Instruct"
device = "cuda" # the device to load the model onto

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map=device
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/656 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/160 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/7.32k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

In [3]:
import torch
boxed_token = torch.tensor((tokenizer.encode('boxed{')))
class ActivationPatcher:
    def __init__(self, model, tokenizer, device='cuda'):
        """Initialize the ActivationPatcher with model and tokenizer.

        Args:
            model: The language model to use
            tokenizer: The tokenizer for the model
            device: Device to run on (default: 'cuda')
        """
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.sample_probs = None
        self.indirect_effect = {}
        self.boxed_token = torch.tensor((tokenizer.encode('boxed{')))  # This needs to be set

    def prepare_input(self, prompt):
        """Convert a text prompt to model inputs.

        Args:
            prompt: Text prompt to convert

        Returns:
            Tokenized input IDs
        """
        messages = [{"role": "user", "content": prompt}]
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
        return model_inputs.input_ids

    def inference(
        self,
        prompt,
        temperature=0.7,
        top_p=0.8,
        save_activations=False,
        patch_activations=False,
        patch_layer=0,
        patch_sequence=-1,
        activations=None,
        max_tokens=512
    ):
        """Run inference with options for saving or patching MLP activations.

        Args:
            prompt: Text prompt to process
            temperature: Sampling temperature
            top_p: Top-p sampling parameter
            save_activations: Whether to save activations
            patch_activations: Whether to patch activations
            patch_layer: Which layer to patch if patching
            activations: Pre-saved activations to use for patching
            max_tokens: Maximum tokens to generate

        Returns:
            Dictionary with outputs including generated sequences
        """
        # Initialize
        input_ids = self.prepare_input(prompt)
        sequences = input_ids
        stored_activations = {}

        # For tracking boxed answers
        track = False
        cnt = 0
        hooks = []

        # Configure model
        self.model.config.use_cache = False

        if patch_activations:
            print(f'Applying patching at layer {patch_layer}')
            hooks = self._apply_activation_patch(patch_layer, activations, patch_sequence)

        # Generate tokens
        final_answer_logit = None
        with torch.no_grad():
            for _ in range(max_tokens):
                outputs = self.model(
                    input_ids=sequences,
                )

                # Remove hooks immediately after this forward pass
                for h in hooks:
                    h.remove()

                # Get logits and sample next token
                next_token_logits = outputs.logits[:, -1, :]
                next_token_logits = next_token_logits / temperature
                probs = torch.softmax(next_token_logits, dim=-1)
                next_token_id = torch.multinomial(probs, num_samples=1)

                # Add new token to sequence
                sequences = torch.cat([sequences, next_token_id], dim=-1)

                # Track probabilities for correct and perturbed answers
                # if track and cnt < len(self.correct_ans_tokens):
                #     return next_token_logits

                if track:
                    final_ans_logit = next_token_logits.clone()
                    # return next_token_logits

                    # pr = f'pr*_{patch_layer}' if patch_activations else 'pr'
                    # self.indirect_effect[f'{pr}_{cnt}'] = probs[0, self.correct_ans_tokens[cnt]]
                    # self.indirect_effect[f'{pr}!_{cnt}'] = probs[0, self.perturbed_ans_tokens[cnt]]
                    cnt += 1

                # Detect when we enter a boxed answer
                if torch.all(sequences[0, -2:] == self.boxed_token.to(self.device)):
                    track = True
                    cnt = 0
                    # self.sample_probs = probs

                # Detect end of boxed answer
                if track and torch.all(sequences[0, -1:] == torch.tensor(self.tokenizer.encode('}')).to(self.device)):
                    track = False
                    print('End of boxed answer detected')


                # Stop on EOS token
                if next_token_id.item() == self.tokenizer.eos_token_id:
                    break

        # Clean up
        torch.cuda.empty_cache()
        print('Clearing hooks')
        for h in hooks:
            h.remove()
        return {
            "sequences": sequences,
            'final_answer_logit' : final_answer_logit
        }

    def save_layer_activations(self, prompt):
        """Save MLP activations for all layers.

        Args:
            sequences: Current token sequences

        Returns:
            Dictionary of activations by layer
        """
        input_ids = self.prepare_input(prompt)
        sequences = input_ids

        stored_activations = {}
        hooks = []

        try:
            # Create hooks for each layer
            for layer in range(self.model.config.num_hidden_layers):
                print(f"Saving activations for layer {layer}")

                def hook_fn(name):
                    def hook(module, input, output):
                        # print(module)
                        stored_activations[name] = output[0].detach().clone()
                    return hook

                # module = self.model.model.layers[layer].mlp
                module = self.model.model.layers[layer]
                handle = module.register_forward_hook(hook_fn(f'layer_{layer}'))
                hooks.append(handle)

            # Run forward pass to collect activations
            self.model(input_ids=sequences)

        finally:
            # Remove hooks when done
            for h in hooks:
                h.remove()

        return stored_activations

    def _apply_activation_patch(self, patch_layer, activations, patch_sequence=-1):
        """Apply activation patching to a specific layer.

        Args:
            patch_layer: Layer to patch
            activations: Saved activations to use for patching

        Returns:
            List of hook handles
        """
        hooks = []

        def hook_fn(name):
            def hook(module, input, output):
                # print('output len', len(output))
                # print('module', module)
                print('output len', len(output))
                print('output shape', output[0].shape)
                output[0][:, patch_sequence, :] = activations[f'layer_{patch_layer}'][:, patch_sequence, :]
                return output
            return hook

        # module = self.model.model.layers[patch_layer].mlp
        module = self.model.model.layers[patch_layer]
        handle = module.register_forward_hook(hook_fn(f'layer_{patch_layer}'))
        hooks.append(handle)

        return hooks

    def run_indirect_effect_analysis(
        self,
        correct_prompt,
        perturbed_prompt,
        correct_ans_tokens,
        perturbed_ans_tokens
    ):
        """Run full indirect effect analysis with patching.

        Args:
            correct_prompt: The original correct prompt
            perturbed_prompt: The modified prompt with perturbations
            correct_ans_tokens: Token IDs for correct answers
            perturbed_ans_tokens: Token IDs for perturbed answers

        Returns:
            Dictionary with analysis results
        """
        # Set up answer tokens
        self.correct_ans_tokens = correct_ans_tokens
        self.perturbed_ans_tokens = perturbed_ans_tokens

        # Step 1: Get activations from perturbed prompt
        print("Getting activations from perturbed prompt...")
        activations = self.save_layer_activations(perturbed_prompt)
        # activations = self.save_layer_activations(correct_prompt)


        # Step 2: Run clean inference on correct prompt
        print("Running inference on clean prompt...")
        clean_results = self.inference(correct_prompt)

        # Step 3: Run patched inference for each layer
        all_results = {"clean": clean_results}
        # for layer in range(self.model.config.num_hidden_layers):
            # print(f"Testing indirect effect at layer {layer}...")
            # patched_results = self.inference(
            #     correct_prompt,
            #     patch_activations=True,
            #     patch_layer=layer,
            #     activations=activations
            # )
            # all_results[f"patched_layer_{layer}"] = patched_results
        layer = 0

        print(f"Testing indirect effect at layer {layer}...")
        patched_results = self.inference(
            correct_prompt,
            patch_activations=True,
            patch_layer=layer,
            activations=activations
        )
        all_results[f"patched_layer_{layer}"] = patched_results
        return all_results



### NEW

In [10]:
import torch
boxed_token = torch.tensor((tokenizer.encode('boxed{')))
class ActivationPatcher:
    def __init__(self, model, tokenizer, device='cuda'):
        """Initialize the ActivationPatcher with model and tokenizer.

        Args:
            model: The language model to use
            tokenizer: The tokenizer for the model
            device: Device to run on (default: 'cuda')
        """
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.sample_probs = None
        self.indirect_effect = {}
        self.boxed_token = torch.tensor((tokenizer.encode('boxed{')))  # This needs to be set

    def prepare_input(self, prompt):
        """Convert a text prompt to model inputs.

        Args:
            prompt: Text prompt to convert

        Returns:
            Tokenized input IDs
        """
        messages = [{"role": "user", "content": prompt}]
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
        return model_inputs.input_ids

    def inference(
        self,
        prompt,
        temperature=0.7,
        top_p=0.8,
        save_activations=False,
        patch_activations=False,
        patch_layer=0,
        patch_sequence=-1,
        activations=None,
        max_tokens=512
    ):
        """Run inference with options for saving or patching MLP activations.

        Args:
            prompt: Text prompt to process
            temperature: Sampling temperature
            top_p: Top-p sampling parameter
            save_activations: Whether to save activations
            patch_activations: Whether to patch activations
            patch_layer: Which layer to patch if patching
            activations: Pre-saved activations to use for patching
            max_tokens: Maximum tokens to generate

        Returns:
            Dictionary with outputs including generated sequences
        """
        # Initialize
        input_ids = self.prepare_input(prompt)
        sequences = input_ids
        stored_activations = {}

        # For tracking boxed answers
        track = False
        cnt = 0
        hooks = []

        # Configure model
        self.model.config.use_cache = False

        if patch_activations:
            print(f'Applying patching at layer {patch_layer}')
            hooks = self._apply_activation_patch(patch_layer, activations, patch_sequence)

        # Generate tokens
        final_answer_logit = None
        with torch.no_grad():
            for _ in range(max_tokens):
                outputs = self.model(
                    input_ids=sequences,
                )

                # Get logits and sample next token
                next_token_logits = outputs.logits[:, -1, :]
                next_token_logits = next_token_logits / temperature
                probs = torch.softmax(next_token_logits, dim=-1)
                next_token_id = torch.multinomial(probs, num_samples=1)

                # Add new token to sequence
                sequences = torch.cat([sequences, next_token_id], dim=-1)

                # Track probabilities for correct and perturbed answers
                # if track and cnt < len(self.correct_ans_tokens):
                #     return next_token_logits

                if track:
                    final_ans_logit = next_token_logits.clone()
                    # return next_token_logits

                    # pr = f'pr*_{patch_layer}' if patch_activations else 'pr'
                    # self.indirect_effect[f'{pr}_{cnt}'] = probs[0, self.correct_ans_tokens[cnt]]
                    # self.indirect_effect[f'{pr}!_{cnt}'] = probs[0, self.perturbed_ans_tokens[cnt]]
                    cnt += 1

                # Detect when we enter a boxed answer
                if torch.all(sequences[0, -2:] == self.boxed_token.to(self.device)):
                    track = True
                    cnt = 0
                    # self.sample_probs = probs

                # Detect end of boxed answer
                if track and torch.all(sequences[0, -1:] == torch.tensor(self.tokenizer.encode('}')).to(self.device)):
                    track = False
                    print('End of boxed answer detected')


                # Stop on EOS token
                if next_token_id.item() == self.tokenizer.eos_token_id:
                    break

        # Clean up
        torch.cuda.empty_cache()
        print('Clearing hooks')
        for h in hooks:
            h.remove()
        return {
            "sequences": sequences,
            'final_answer_logit' : final_answer_logit
        }

    def save_layer_activations(self, prompt):
        """Save MLP activations for all layers.

        Args:
            sequences: Current token sequences

        Returns:
            Dictionary of activations by layer
        """
        input_ids = self.prepare_input(prompt)
        sequences = input_ids

        stored_activations = {}
        hooks = []

        try:
            # Create hooks for each layer
            for layer in range(self.model.config.num_hidden_layers):
                print(f"Saving activations for layer {layer}")

                def hook_fn(name):
                    def hook(module, input):
                        # print(module)
                        stored_activations[name] = input[0].detach().clone()
                    return hook

                # module = self.model.model.layers[layer].mlp
                module = self.model.model.layers[layer]
                handle = module.register_forward_pre_hook(hook_fn(f'layer_{layer}'))
                hooks.append(handle)

            # Run forward pass to collect activations
            self.model(input_ids=sequences)

        finally:
            # Remove hooks when done
            for h in hooks:
                h.remove()

        return stored_activations

    def _apply_activation_patch(self, patch_layer, activations, patch_sequence=-1):
        """Apply activation patching to a specific layer.

        Args:
            patch_layer: Layer to patch
            activations: Saved activations to use for patching

        Returns:
            List of hook handles
        """
        hooks = []

        def hook_fn(name):
            def hook(module, input):
                # print('output len', len(output))
                # print('module', module)
                print('output len', len(input))
                print('output shape', input[0].shape)

                modified_input = list(input)
                modified_input[0] = modified_input[0].clone()

                modified_input[0][:, patch_sequence, :] = activations[f'layer_{patch_layer}'][:, patch_sequence, :]
                return tuple(modified_input)

            return hook

        # module = self.model.model.layers[patch_layer].mlp
        module = self.model.model.layers[patch_layer]
        handle = module.register_forward_pre_hook(hook_fn(f'layer_{patch_layer}'))
        hooks.append(handle)

        return hooks


In [None]:
patcher = ActivationPatcher(model, tokenizer)

In [7]:
# Check token positions for both prompts
incorrect_ids = patcher.prepare_input(incorrect_prompt)
correct_ids = patcher.prepare_input(correct_prompt)
print("Incorrect prompt token IDs:", incorrect_ids)
print("Correct prompt token IDs:", correct_ids)

Incorrect prompt token IDs: tensor([[151644,   8948,    198,   5501,   2874,   3019,    553,   3019,     11,
            323,   2182,    697,   1590,   4226,   2878,   1124,  79075,  46391,
         151645,    198, 151644,    872,    198,   3838,    374,    220,     17,
          54916,    553,    220,     17, 151645,    198, 151644,  77091,    198]],
       device='cuda:0')
Correct prompt token IDs: tensor([[151644,   8948,    198,   5501,   2874,   3019,    553,   3019,     11,
            323,   2182,    697,   1590,   4226,   2878,   1124,  79075,  46391,
         151645,    198, 151644,    872,    198,   3838,    374,    220,     17,
          54916,    553,    220,     22, 151645,    198, 151644,  77091,    198]],
       device='cuda:0')


In [9]:
tokenizer.decode([17])

'2'

In [14]:
patcher = ActivationPatcher(model, tokenizer)


correct_prompt = "What is 2 multiplied by 33"
# correct_prompt = "This is some gibberish text"
incorrect_prompt = "What is 2 multiplied by 2"


save_layer_activations = patcher.save_layer_activations(correct_prompt)

patch_sequence=30
patch_layer=0
final_dict = {}
final_dict[0] = patcher.inference(incorrect_prompt, patch_activations=True, activations=save_layer_activations, patch_layer=patch_layer, patch_sequence=patch_sequence,max_tokens=150)

Saving activations for layer 0
Saving activations for layer 1
Saving activations for layer 2
Saving activations for layer 3
Saving activations for layer 4
Saving activations for layer 5
Saving activations for layer 6
Saving activations for layer 7
Saving activations for layer 8
Saving activations for layer 9
Saving activations for layer 10
Saving activations for layer 11
Saving activations for layer 12
Saving activations for layer 13
Saving activations for layer 14
Saving activations for layer 15
Saving activations for layer 16
Saving activations for layer 17
Saving activations for layer 18
Saving activations for layer 19
Saving activations for layer 20
Saving activations for layer 21
Saving activations for layer 22
Saving activations for layer 23
Saving activations for layer 24
Saving activations for layer 25
Saving activations for layer 26
Saving activations for layer 27
Applying patching at layer 0
output len 1
output shape torch.Size([1, 36, 1536])
output len 1
output shape torch.S

In [15]:
print(tokenizer.decode(final_dict[0]['sequences'].squeeze()))

<|im_start|>system
Please reason step by step, and put your final answer within \boxed{}.<|im_end|>
<|im_start|>user
What is 2 multiplied by 2<|im_end|>
<|im_start|>assistant
To find the product of 2 multiplied by 3, we can follow these steps:

1. Write down the numbers to be multiplied: 2 and 3.
2. Multiply the numbers together: \(2 \times 3\).
3. Perform the multiplication: \(2 \times 3 = 6\).

So, the answer is \(\boxed{6}\).<|im_end|>


In [13]:
tokenized_input_ids = (patcher.prepare_input(correct_prompt).squeeze())

In [92]:
chat_temp = tokenizer.decode(tokenized_input_ids)

for i,t in enumerate(tokenizer.tokenize(chat_temp)):
  print(i,t)

0 <|im_start|>
1 system
2 Ċ
3 Please
4 Ġreason
5 Ġstep
6 Ġby
7 Ġstep
8 ,
9 Ġand
10 Ġput
11 Ġyour
12 Ġfinal
13 Ġanswer
14 Ġwithin
15 Ġ\
16 boxed
17 {}.
18 <|im_end|>
19 Ċ
20 <|im_start|>
21 user
22 Ċ
23 What
24 Ġis
25 Ġ
26 2
27 Ġmultiplied
28 Ġby
29 Ġ
30 7
31 <|im_end|>
32 Ċ
33 <|im_start|>
34 assistant
35 Ċ


In [81]:
def attn_hook(module, input, output):
  print(len(output))
  print(output.shape)
  # print(output[0].shape, output[1])

  return output

In [11]:
torch.tensor(tokenizer.encode('yoo'))

tensor([  88, 2624])

In [12]:
# # sample_module = model.model.layers[0].self_attn
# sample_module = model.model.layers[0].mlp


# patch = sample_module.register_forward_hook(attn_hook)


# inp = patcher.prepare_input('yoo')

# model(input_ids = inp)
# patch.remove()

1
torch.Size([1, 30, 1536])


In [82]:
save_layer_activations = patcher.save_layer_activations(correct_prompt)

Saving activations for layer 0
Saving activations for layer 1
Saving activations for layer 2
Saving activations for layer 3
Saving activations for layer 4
Saving activations for layer 5
Saving activations for layer 6
Saving activations for layer 7
Saving activations for layer 8
Saving activations for layer 9
Saving activations for layer 10
Saving activations for layer 11
Saving activations for layer 12
Saving activations for layer 13
Saving activations for layer 14
Saving activations for layer 15
Saving activations for layer 16
Saving activations for layer 17
Saving activations for layer 18
Saving activations for layer 19
Saving activations for layer 20
Saving activations for layer 21
Saving activations for layer 22
Saving activations for layer 23
Saving activations for layer 24
Saving activations for layer 25
Saving activations for layer 26
Saving activations for layer 27


In [83]:
patch_sequence=-6
patch_layer=0
final_dict = {}
final_dict[0] = patcher.inference(incorrect_prompt, patch_activations=True, activations=save_layer_activations, patch_layer=patch_layer, patch_sequence=patch_sequence,max_tokens=150)

Applying patching at layer 0
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks


In [98]:
print(tokenizer.decode(final_dict[0]['sequences'].squeeze()))

<|im_start|>system
Please reason step by step, and put your final answer within \boxed{}.<|im_end|>
<|im_start|>user
What is 2 multiplied by 2<|im_end|>
<|im_start|>assistant
To determine the product of 2 multiplied by 2, we can follow these steps:

1. Identify the numbers to be multiplied: 2 and 2.
2. Multiply the numbers together: \(2 \times 2\).

Performing the multiplication:

\[2 \times 2 = 4\]

So, the final answer is \(\boxed{4}\).<|im_end|>


In [99]:
final_dict = {}
# patch_sequence = torch.tensor(-6).to(device)
# incorrect_prompt = 'What is 2 multiplied by 2'
for i in range(model.config.num_hidden_layers):
  final_dict[i] = patcher.inference(incorrect_prompt, patch_activations=True, activations=save_layer_activations, patch_layer=i, patch_sequence=patch_sequence)

Applying patching at layer 0
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 1
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 2
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 3
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 4
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 5
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 6
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 7
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 8
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 9
output len 1
output shape torch.Size([1, 36, 1536])
Clearing hooks
Applying patching at layer 10
output len

In [101]:
for i in range(model.config.num_hidden_layers):
  print('layer,', i, '----------------------------------------------------')
  print(tokenizer.decode(final_dict[i]['sequences'].squeeze()))

layer, 0 ----------------------------------------------------
<|im_start|>system
Please reason step by step, and put your final answer within \boxed{}.<|im_end|>
<|im_start|>user
What is 2 multiplied by 2<|im_end|>
<|im_start|>assistant
To find the product of 2 multiplied by 2, we can follow these steps:

1. Write down the multiplication expression: \(2 \times 2\).
2. Perform the multiplication: \(2 \times 2 = 4\).

So, the answer is \(\boxed{4}\).<|im_end|>
layer, 1 ----------------------------------------------------
<|im_start|>system
Please reason step by step, and put your final answer within \boxed{}.<|im_end|>
<|im_start|>user
What is 2 multiplied by 2<|im_end|>
<|im_start|>assistant
To find the product of 2 multiplied by 2, we can follow these steps:

1. Identify the numbers to be multiplied: 2 and 2.
2. Perform the multiplication: \(2 \times 2\).

The multiplication of 2 by 2 is a basic arithmetic operation. When we multiply 2 by 2, we are essentially adding 2 to itself 2 time

In [None]:
model.device

device(type='cuda', index=0)