In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# !pip install git+https://github.com/google/BIG-bench.git # This may take a few minutes
# !pip install datasets==2.21.0
# !pip install transformer_lens
!pip install -r /content/drive/MyDrive/University/CoT/geometry-of-truth/requirements.txt
!pip install nnsight



In [3]:
!pip list

Package                            Version
---------------------------------- -------------------
absl-py                            1.4.0
accelerate                         0.34.2
aiohappyeyeballs                   2.4.3
aiohttp                            3.10.9
aiosignal                          1.3.1
alabaster                          0.7.16
albucore                           0.0.16
albumentations                     1.4.15
altair                             4.2.2
annotated-types                    0.7.0
anyio                              3.7.1
argon2-cffi                        23.1.0
argon2-cffi-bindings               21.2.0
array_record                       0.5.1
arviz                              0.19.0
astropy                            6.1.4
astropy-iers-data                  0.2024.10.7.0.32.46
astunparse                         1.6.3
async-timeout                      4.0.3
atpublic                           4.1.0
attrs                              24.2.0
audioread         

# Imports

In [4]:
import torch
import json
import configparser
from nnsight import LanguageModel
import huggingface_hub
import pandas as pd
from transformers import AutoTokenizer
from tqdm import tqdm
import numpy as np
import math

# Constants

In [5]:
DATA_PATH = "/content/drive/MyDrive/University/CoT/data/com2sense.json"
CONFIG_PATH = "/content/drive/MyDrive/University/CoT/geometry-of-truth/config.ini"
MODEL_NAME = "llama-3.2-1b"
N_SHOTS = 5
BATCH_SIZE = 1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
remote = False
MAX_IDX = 100
LAYERS = "ALL"    # Layers to look at activations, "ALL" or list of int

print(f"Executing on device {device}")

Executing on device cuda


# Load data

In [6]:
def load_data(n_shots: int, data_path: str, data_key: str = "examples", lookup_key: str = "pair_id_lookup") -> tuple:
    with open(data_path, "r") as file:
        dataset = json.load(file)

    examples = dataset[data_key]
    pair_id_lookup = dataset[lookup_key]
    dataset = pd.DataFrame(examples)
    train = dataset.sample(n_shots)
    test = dataset.drop(train.index)

    return train, test, pair_id_lookup

train, test, pair_id_lookup = load_data(N_SHOTS, DATA_PATH)

# Load model

Login with the following token: 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

In [7]:
huggingface_hub.login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [8]:
config = configparser.ConfigParser()
config.read(CONFIG_PATH)

def load_model(model_name: str, device='remote') -> LanguageModel:
    print(f"Loading model {model_name}...")
    weights_directory = config[model_name]['weights_directory']
    model = LanguageModel(weights_directory, torch_dtype=torch.bfloat16, device_map=device)
    model.tokenizer = AutoTokenizer.from_pretrained(weights_directory)
    return model

model = load_model(MODEL_NAME, device=device)
model.tokenizer.padding_side = "right"
model.tokenizer.pad_token = model.tokenizer.eos_token
print(model)

Loading model llama-3.2-1b...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(128256, 2048)
    (layers): ModuleList(
      (0-15): 16 x LlamaDecoderLayer(
        (self_attn): LlamaSdpaAttention(
          (q_proj): Linear(in_features=2048, out_features=2048, bias=False)
          (k_proj): Linear(in_features=2048, out_features=512, bias=False)
          (v_proj): Linear(in_features=2048, out_features=512, bias=False)
          (o_proj): Linear(in_features=2048, out_features=2048, bias=False)
          (rotary_emb): LlamaRotaryEmbedding()
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear(in_features=2048, out_features=8192, bias=False)
          (up_proj): Linear(in_features=2048, out_features=8192, bias=False)
          (down_proj): Linear(in_features=8192, out_features=2048, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): LlamaRMSNorm((2048,), eps=1e-05)
        (post_attention_layernorm): LlamaRMSNorm((2048,), eps=1e-05)
      )
    )
    (norm):

# Train model with prompt for true/false questions

In [9]:
def train_model(model: LanguageModel, prompts: pd.DataFrame, prompt_key: str = "sent", label_key: str = "label") -> tuple:
    """
    Trains the model on the given prompts. Will return the past key value pairs representing this context.
    args:
        model: The model to train.
        prompts: The prompts to train on.
        prompt_key: The key in the prompts dataframe that contains the prompt.
        label_key: The key in the prompts dataframe that contains the label.
    returns: tuple of shape (n_layers, 2) that contains the past key/value (torch.Tensor) with shape (batch_size, num_attention_heads, seq_len, head_dim)
    """
    prompt = ""
    for index, example in prompts.iterrows():
        prompt += example[prompt_key]
        label = example[label_key]

        if bool(label):
            prompt += " TRUE\n"
        else:
            prompt += " FALSE\n"
    with torch.no_grad():
        with model.trace(prompt, output_hidden_states=True, use_cache=True) as tracer:
            output = tracer.output.save()
    return output["past_key_values"], prompt

past_key_values, train_prompt = train_model(model, train)

You're using a PreTrainedTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.
We detected that you are passing `past_key_values` as a tuple and this is deprecated and will be removed in v4.43. Please use an appropriate `Cache` class (https://huggingface.co/docs/transformers/v4.41.3/en/internal/generation_utils#transformers.Cache)


# Obtain output difference from testing set for true/false questions

In [10]:
def obtain_output_diff(model: LanguageModel, queries: pd.DataFrame, batch_size: int, past_key_values: tuple, train_prompt: str, prompt_key: str = "sent", max_idx: int = -1) -> torch.Tensor:
    """
    Obtains the output difference between the model's predictions of true or false for the given queries.
    args:
        model: The model to use.
        queries: The queries to use.
        batch_size: The batch size to use.
        past_key_values: The past key values to use.
        train_prompt: The prompt used to train the model.
        prompt_key: The key in the queries dataframe that contains the prompt.
        max_idx: The maximum number of queries to use. Set to -1 to use all queries.
    returns: The output difference between the model's predictions for the given queries.
    """
    true_idx, false_idx = model.tokenizer.encode(' TRUE')[-1], model.tokenizer.encode(' FALSE')[-1]
    diffs = []
    max_idx = len(queries) if max_idx == -1 else max_idx
    for batch_idx in tqdm(range(0, max_idx, batch_size), desc="Processing batches"):
        batch = queries.iloc[batch_idx : batch_idx + batch_size][prompt_key].tolist()
        batch = [train_prompt + query for query in batch]

        # prepare past_key_values
        pkv_batch = tuple(
            (
              past_key_values[layer][0].expand(batch_size, *past_key_values[layer][0].shape[1:]),
              past_key_values[layer][1].expand(batch_size, *past_key_values[layer][0].shape[1:])
            ) for layer in range(len(past_key_values))
        )
        batch_tokens = model.tokenizer(batch, return_tensors="pt", padding=True, truncation=True)["input_ids"]

        batch_lens = [len(model.tokenizer.encode(query, add_special_tokens=False)) for query in batch]
        with torch.no_grad():
            with model.trace(batch_tokens) as tracer:
                logits = model.lm_head.output.save()
                logits = logits[torch.arange(len(batch)), torch.tensor(batch_lens) - 1, :]
                probs = logits.softmax(-1)
                diffs.append((probs[:, true_idx] - probs[:, false_idx]).save())
    diffs = torch.cat([diff.value for diff in diffs])
    return diffs

diffs = obtain_output_diff(model, test, BATCH_SIZE, past_key_values, train_prompt, max_idx=MAX_IDX)

Processing batches: 100%|██████████| 100/100 [00:19<00:00,  5.04it/s]


# Compute accuracy for true/false questions

In [11]:
def compute_accuracy(diffs: torch.Tensor, test: pd.DataFrame, gamma: int = 0, max_idx: int = -1) -> float:
    """
    Computes the accuracy of the model's predictions.
    args:
        diffs: The activation difference between the model's predictions for the given queries.
        test: The test set to use.
        gamma: The threshold to use.
        max_idx: The maximum number of queries to use. Set to -1 to use all queries.
    returns: The accuracy of the model's predictions.
    """
    max_idx = len(test) if max_idx == -1 else max_idx
    predicted_labels = diffs > gamma
    string_array = test["label"][:max_idx].values
    bool_array = np.array([val == "True" for val in string_array])
    ground_truth = torch.tensor(bool_array, device=device, dtype=torch.bool)
    print(ground_truth[:max_idx].shape)
    acc = (predicted_labels == ground_truth).float().mean().item()
    return acc

accuracy = compute_accuracy(diffs, test, max_idx=MAX_IDX)
print(f"Accuracy: {accuracy}")

torch.Size([100])
Accuracy: 0.5099999904632568


# Obtain layer-level activation for zero-shot CoT vs. non-CoT in residual stream

In [12]:
def get_layer_acts(statements, model: LanguageModel, layers: list) -> dict:
    """
    Get given layer activations for the statements. Activations are obtained after the last token is read.
    args:
        statements: The statements to obtain activations for.
        model: The model to use.
        layers: The layers (int) to obtain activations for as a list.
    Return dictionary of stacked activations.
    """
    acts = {}
    with model.trace(statements):
        for layer in layers:
            acts[layer] = model.model.layers[layer].output[0][:, -1, :].save()

    for layer, act in acts.items():
        acts[layer] = act.value

    return acts


def obtain_act_diff(model: LanguageModel, queries: pd.DataFrame, batch_size: int, exp: str, layers: list, train_prompt: str, prompt_key: str = "sent", max_idx: int = -1) -> list:
    """
    Obtains the activation difference between the model's predictions of true or false for the given queries.
    args:
        model: The model to use.
        queries: The queries to use.
        batch_size: The batch size to use.
        exp: The prompt to experiment with, added to the end of the sentence.
        layers: The list of layers (int) to obtain diff.
        train_prompt: The prompt used to train the model.
        prompt_key: The key in the queries dataframe that contains the prompt.
        max_idx: The maximum number of queries to use. Set to -1 to use all queries.
    returns: The activation difference between the model's predictions for the given queries.
    """
    diffs = []
    max_idx = len(queries) if max_idx == -1 else max_idx
    for batch_idx in tqdm(range(0, max_idx, batch_size), desc="Processing batches"):
        batch = queries.iloc[batch_idx : batch_idx + batch_size][prompt_key].tolist()
        batch = [train_prompt + query for query in batch]
        batch_exp = [query + exp for query in batch]

        act = get_layer_acts(batch, model, layers)
        act_exp = get_layer_acts(batch_exp, model, layers)
        diff = torch.cat([act_exp[layer] - act[layer] for layer in layers])
        diffs.append(diff)
    return diffs

layers = list(range(len(model.model.layers))) if LAYERS == "ALL" else LAYERS
diffs = obtain_act_diff(model, test, 1, " let's think step by step", layers, "", max_idx=MAX_IDX)

Processing batches: 100%|██████████| 100/100 [00:19<00:00,  5.06it/s]


In [13]:
print(diffs[0].shape)

[tensor([[ 0.0308,  0.0566,  0.1543,  ..., -0.0496, -0.0083, -0.0193],
        [ 0.1465,  0.1357,  0.1680,  ..., -0.1211, -0.1621, -0.0081],
        [ 0.0957,  0.1250,  0.1934,  ..., -0.0334, -0.1738,  0.1177],
        ...,
        [-0.1719,  0.0503,  0.2109,  ...,  0.0811, -0.3711,  0.0918],
        [-0.0137,  0.1299,  0.2031,  ..., -0.0742, -0.2275, -0.0488],
        [-0.5742,  0.3008, -0.1094,  ...,  0.2891, -0.0547, -0.3184]],
       device='cuda:0', dtype=torch.bfloat16, grad_fn=<CatBackward0>), tensor([[ 0.0347,  0.0444,  0.1523,  ..., -0.0486, -0.0026, -0.0183],
        [ 0.1377,  0.1147,  0.1641,  ..., -0.1221, -0.1494, -0.0209],
        [ 0.0991,  0.0684,  0.2109,  ..., -0.0527, -0.1709,  0.0811],
        ...,
        [-0.4414,  0.0879,  0.3711,  ...,  0.3203, -0.2578,  0.0535],
        [-0.3750,  0.1299,  0.4453,  ...,  0.1992, -0.1133,  0.0645],
        [-0.7891,  0.3438,  0.4688,  ...,  0.6211, -0.1094, -0.4824]],
       device='cuda:0', dtype=torch.bfloat16, grad_fn=<CatBa