In [111]:
# get datasetd from "geometry of truth paper"
!git clone https://github.com/saprmarks/geometry-of-truth.git /root/geometry-of-truth
!mv /root/geometry-of-truth/datasets/*.csv data

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


fatal: destination path '/root/geometry-of-truth' already exists and is not an empty directory.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


mv: cannot stat '/root/geometry-of-truth/datasets/*.csv': No such file or directory


In [None]:
import os
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
from baukit import Trace
import torch
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [None]:
# import my modules
import importlib
# join the path to the modules to the current working directory

import utils, dataset_utils
importlib.reload(utils)
importlib.reload(dataset_utils)
from utils import *
from dataset_utils import *

In [None]:
# iterate through all csv files in data folder
for file in os.listdir('data'):
    if not file.endswith('.csv'):
        continue
    df = pd.read_csv('data/'+file)
    print('\n','#'*10, file, '#'*10)
    # print column names
    print(df.columns)

In [None]:
dataset_name = "cities"
df = pd.read_csv('data/'+dataset_name+'.csv')

train_dataset = {'dataset_name': dataset_name,
           'org_data': list(df.statement),
           'label': list(df.label)}

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
precision = torch.float16
# load model
# model_name = "mistralai/Mistral-7B-v0.1"
# model_name = "HuggingFaceH4/zephyr-7b-beta" 
model_name = "meta-llama/Llama-2-7b-chat-hf"
short_model_name = model_name.split("/")[-1]
plots_folder = f'plots/{short_model_name}'

if not os.path.exists(plots_folder):
    os.makedirs(plots_folder)
# model_name = "huggyllama/llama-7b"
# load tokenizer
if model_name == "meta-llama/Llama-2-7b-chat-hf":
    # get access token from environment variable
    access_token = os.getenv("HF_TOKEN")
    # access_token = input("Enter your access token: ")
    model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=precision, token=access_token).to(device).eval()
    tokenizer = AutoTokenizer.from_pretrained(model_name, token=access_token)
    tokenizer.padding_side = "left"
    tokenizer.pad_token_id = tokenizer.eos_token_id

else:

    model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16).to(device).eval()
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.padding_side = "left"
    tokenizer.pad_token_id = tokenizer.eos_token_id



In [None]:
torch.cuda.empty_cache()
# get internal activations
module_names = [f'model.layers.{i}' for i in range(model.config.num_hidden_layers)]
num_modules = len(module_names)
token_positions = -1
batch_size = 64
# returns a dictionary with the hidden states of token_position (shape [len(selected_data), hidden_dim]) for each module
train_dataset['hidden_states'] = get_hidden(model, tokenizer, module_names, train_dataset['org_data'], batch_size=batch_size, token_position=-1)

In [None]:
train_dataset['hidden_states'].shape

In [None]:
train_data, train_labels, test_data, test_labels = prep_data(data=train_dataset['hidden_states'], labels=train_dataset['label'], train_perc=0.8)

In [None]:
train_data.shape, test_data.shape

In [None]:
# training probes
probes = {}
for idx, module in tqdm(enumerate(module_names), total=num_modules):
    probes[module] = LRProbe(d_in=model.config.hidden_size, device='cuda', precision=torch.float32)
    probes[module].train(train_data[idx], train_labels, epochs=2, batch_size=batch_size)

# test on same dataset
test_accs = {}
train_accs = {}
for idx, module in enumerate(module_names):
    test_accs[module] = probes[module].test(test_data[idx], test_labels)
    train_accs[module] = probes[module].test(train_data[idx], train_labels)

In [None]:
# print every 5th test acc:
for index, (key, value) in enumerate(test_accs.items()):
    if index % 5 == 0:
        print(f'{key}\t{value:.2g}') 

In [None]:
def test_probes(probes, test_data, test_labels, module_names, normalize=True):
    test_data_ood = torch.cat(test_data, dim=1)
    test_accs_ood = {}
    for idx, module in tqdm(enumerate(module_names), total=len(module_names)):
        test_accs_ood[module] = {}
        for token_pos in token_positions:
        if normalize:
            normalized = test_data_ood[idx,:,token_pos, :]-test_data_ood[idx,:,token_pos, :].mean(0)
            normalized /= normalized.std(0)+1e-11

        test_accs_ood[module][token_pos] = probes[module].test(test_data_ood[idx,:,token_pos, :], test_labels)

    return test_accs_ood

def plot_test_accs(test_accs, positions, train_dataset_name, test_dataset_name):
    # plot test accs for ood dataset
    plt.figure(figsize=(10, 6))
    positions = {-11:'last statement token', -10:'first answer token', -1:'last answer token'}

    for pos, label in positions.items():
        plt.plot([acc[pos] for _,acc in test_accs.items()], label=label)
    plt.ylabel('Accuracy')
    plt.xlabel('Layer')
    plt.grid()
    plt.legend()
    plt.title(f"LogRegr trained on \"{train_dataset_name}\", tested on \"{test_dataset_name}\"")
    plt.savefig(f"{plots_folder}/log_regr_trained_{train_dataset_name}_tested_{test_dataset_name}.png")
    plt.show()

In [None]:
# test on out of distribution dataset

testing_dataset_names = ['Statements1000', 'FreebaseStatements']
max_new_tokens = 10

for name in testing_dataset_names:
    print('\n\n' + '#'*10 + name + '#'*10)

    test_dataset = load_data_set(name)
    if model_name == "meta-llama/Llama-2-7b-chat-hf":
        lie_format = "[INST]You will complete the sentence with intentionally false information. [/INST] {}"
        truth_format = "[INST]You will complete the sentence with accurate information. [/INST] {}"
        change_format(test_dataset, lie_format, truth_format)
    
    print('\nGenerating truths and lies')
    get_overlap_truth_lies(model, tokenizer, test_dataset, max_new_tokens=max_new_tokens, batch_size=batch_size)
    # get hidden states for the new dataset
    token_positions = range(-max_new_tokens-1, 0, 1)
    print('\nCalculating hidden states')
    # returns a dictionary with the hidden states of token_position (shape [len(selected_data), hidden_dim]) for each module
    test_dataset['hidden_states_lie'] = get_hidden_from_tokens(model, module_names, test_dataset['output_tokens_lie'], batch_size=batch_size, token_position=token_positions)
    test_dataset['hidden_states_truth'] = get_hidden_from_tokens(model, module_names, test_dataset['output_tokens_truth'], batch_size=batch_size, token_position=token_positions)

    print('\nTesting probes')
    samples_per_class = test_dataset['hidden_states_lie'].shape[1]
    test_labels = torch.cat([torch.zeros(samples_per_class), torch.ones(samples_per_class)])
    test_accs_ood = test_probes(probes, test_data=[test_dataset['hidden_states_lie'], test_dataset['hidden_states_truth']], 
                                test_labels=test_labels, module_names=module_names)

    # save results
    plot_test_accs(test_accs_ood, positions={-11:'last statement token', -10:'first answer token', -1:'last answer token'}, 
               train_dataset_name = train_dataset['dataset_name'], 
               test_dataset_name = test_dataset['dataset_name'])