# Process True False Dataset

Dataset from the paper [The Internal State of an LLM Knows When It's Lying](https://aclanthology.org/2023.findings-emnlp.68.pdf)

You can download the dataset [here](http://azariaa.com/Content/Datasets/true-false-dataset.zip).

In [None]:
import pandas as pd
import numpy as np
import os
import torch
import matplotlib.pyplot as plt
import seaborn as sns

from tqdm.notebook import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer

from sklearn.metrics.pairwise import cosine_similarity

from utils import add_split_column

device = "cuda" if torch.cuda.is_available() else "CPU"
print(f'Device: {device}')

In [None]:
dataset_name = 'true-false-dataset'
categories = ['animals', 'cities', 'companies', 'elements', 
              'facts', 'generated', 'inventions']
only_train = True

data_path = f'../Data/{dataset_name}'
embeddings_path = f'Embeddings/{dataset_name}'
concepts_path = f'Concepts/{dataset_name}'
cos_sims_path = f'Cosine_Similarities/{dataset_name}'
images_path = f'Images/{dataset_name}'

In [None]:
embeddings_file = os.path.join(embeddings_path, 'embeddings.pt')
    
if os.path.exists(embeddings_file):
    print('Embeddings file found. No need to load model and tokenizer.')
    model = None
    tokenizer = None
else:
    print('Embeddings file NOT found. Loading model and tokenizer...')
    
    from huggingface_hub import notebook_login
    notebook_login()
    
    model_name_or_path = "meta-llama/Meta-Llama-3-8B-Instruct"
    
    tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"
    
    model = AutoModelForCausalLM.from_pretrained(model_name_or_path)
    model.to(device)
    model.eval()
    print('Done.')

## Processing statements

In [None]:
def get_category(category, data_path):
    cat_path = os.path.join(data_path, f'{category}_true_false.csv')
    df_cat = pd.read_csv(cat_path)
    df_cat[category] = 1
    return df_cat

In [None]:
def get_categories(categories, data_path):
    df_list = []
    for category in categories:
        df_cat = get_category(category, data_path=data_path)
        df_list.append(df_cat)
    metadata_df = pd.concat(df_list, ignore_index=True)
    metadata_df = metadata_df.fillna(0)
    for category in categories:
        metadata_df[category] = metadata_df[category].astype(int)
    metadata_df = metadata_df.rename(columns={'label': 'true'})
    return metadata_df

In [None]:
def get_hidden_states(statements, model, tokenizer, embeddings_path, 
                      device=device, save=True):
    embeddings_file = os.path.join(embeddings_path, 'embeddings.pt')
    
    if os.path.exists(embeddings_file):
        print('   Hidden states file found.')
        hidden_states = torch.load(embeddings_file)
    else:
        print('   Hidden states file NOT found.')
        hidden_states = []
        for statement in tqdm(statements, desc='Getting hidden states'):
            tokenized_prompt = tokenizer(statement, return_tensors="pt").to(device)
            output = model(**tokenized_prompt, output_hidden_states=True)
            
            features = output.hidden_states[-1][0][-1]
            hidden_state = features.cpu().detach()
            hidden_states.append(hidden_state)
            
            del output
            del features
            torch.cuda.empty_cache()

        hidden_states = torch.stack(hidden_states, dim=0)
        # Center embeddings
        average_embedding = torch.mean(hidden_states, 0) 
        centered_hidden_states = hidden_states - average_embedding
        if save:
            os.makedirs(embeddings_path, exist_ok=True)
            torch.save(hidden_states, embeddings_file)
            
    return hidden_states

In [None]:
def process_data(categories, model, tokenizer, data_path, embeddings_path):
    print('Getting statements...', end=' ')
    metadata_df = get_categories(categories, data_path=data_path)
    metadata_df = add_split_column(metadata_df)
    metadata_df.to_csv(os.path.join(data_path, 'metadata.csv'), index=False)
    
    statements = metadata_df['statement'].to_list()

    train_mask = metadata_df['split'] == 'train'
    train_metadata_df = metadata_df[train_mask].reset_index(drop=True)
    train_metadata_df.to_csv(os.path.join(data_path, 'train_metadata.csv'), index=False)
    print('Done.')

    print('Getting hidden states...')
    hidden_states = get_hidden_states(statements, model, tokenizer, embeddings_path)
    train_hidden_states = hidden_states[train_mask]
    torch.save(train_hidden_states, os.path.join(embeddings_path, 'train_embeddings.pt'))
    print('Done.')

    return metadata_df, hidden_states

In [None]:
metadata_df, hidden_states = process_data(categories, model, tokenizer, data_path, embeddings_path)

In [None]:
print(f'Number of statements: {metadata_df.shape[0]}')
print(f'Number of true statements: {metadata_df[metadata_df["true"] == 1].shape[0]}')
print(f'Number of false statements: {metadata_df[metadata_df["true"] == 0].shape[0]}')

## Computing concepts

In [None]:
def compute_single_concept(concept, metadata_df, hidden_states):
    # Get embeddings from the concept
    hidden_states_cat = hidden_states[metadata_df[concept]==1] 
    # Get average embedding
    concept_vect = torch.mean(hidden_states_cat, 0) 
    # Normalize vector
    concept_vect = concept_vect / concept_vect.norm() 
    return concept_vect

def compute_concepts(metadata_df, hidden_states, concepts_path, save=True, only_train=True):
    if only_train:
        train_mask = metadata_df['split'] == 'train'
        hidden_states = hidden_states[train_mask]
        metadata_df = metadata_df[train_mask].reset_index(drop=True)
    concept_names = list(metadata_df.columns[1:-1])
    concepts = {}

    print('Computing concept vectors...', end=' ')
    for concept in concept_names:
        concept_vect = compute_single_concept(concept, metadata_df, hidden_states)
        concepts[concept] = concept_vect
    print('Done.')
        
    if save:
        print('Saving concepts...', end=' ')
        os.makedirs(concepts_path, exist_ok=True)
        if only_train:
            concepts_path = os.path.join(concepts_path, 'train_concepts.pt')
        else:
            concepts_path = os.path.join(concepts_path, 'concepts.pt')
        torch.save(concepts, concepts_path)
        print('Done.')

    print(f'Concepts computed: {list(concepts.keys())}')
    return concepts

In [None]:
concepts = compute_concepts(metadata_df, hidden_states, concepts_path, only_train=only_train)

## Compute cosine similarities

In [None]:
def compute_cosine_sims(hidden_states, metadata_df, concept_embs, concept_names, cos_sims_path,
                        save=True, only_train=True):
    print('Computing cosine similarities...', end=' ')
    similarities = cosine_similarity(hidden_states, concept_embs)
    cosine_similarity_df = pd.DataFrame(similarities, columns=concept_names)
    print('Done.')
    if save:
        print('Saving cosine similarities...', end=' ')
        os.makedirs(cos_sims_path, exist_ok=True)
        if only_train:
            cos_sims_df_path = os.path.join(cos_sims_path, 'train_cosine_similarities.csv')
        else:
            cos_sims_df_path = os.path.join(cos_sims_path, 'cosine_similarities.csv')
        cosine_similarity_df.to_csv(cos_sims_df_path, index=False)
        print('Done.')
        
    return cosine_similarity_df

In [None]:
concept_embs = torch.stack(list(concepts.values()), dim=0)
concept_names = list(concepts.keys())
cosine_similarity_df = compute_cosine_sims(hidden_states, metadata_df, concept_embs, concept_names, cos_sims_path,
                                          only_train=only_train)

In [None]:
def plot_cos_sim_histograms(metadata_df, cosine_similarity_df, images_path, only_train=True):
    if only_train:
        train_mask = metadata_df['split'] == 'train'
        metadata_df = metadata_df[train_mask].reset_index(drop=True)
        cosine_similarity_df = cosine_similarity_df[train_mask].reset_index(drop=True)
    concept_names = list(metadata_df.columns[1:-1])
    fig, axs = plt.subplots(2, 4, figsize=(8,4), sharex=True)
    bins = np.linspace(0, 1, 20)
    for i, concept in enumerate(concept_names):
        ix, iy = int(i/4), i%4
        sns.kdeplot(cosine_similarity_df[metadata_df[concept]==1][concept], 
                    label='In concept', ax=axs[ix,iy])
        sns.kdeplot(cosine_similarity_df[metadata_df[concept]==0][concept], 
                    label='Out of concept', ax=axs[ix,iy])
        axs[ix,iy].set_xlabel('')
        axs[ix,iy].set_ylabel('')
        axs[ix,iy].set_title(concept)
    
    handles, labels = axs[0,0].get_legend_handles_labels()
    lgd = fig.legend(handles, labels, bbox_to_anchor=(1.2, 0.9))
    xl = fig.supxlabel('Cosine Similarity')
    yl = fig.supylabel('Proportion of samples')
    title = fig.suptitle('Cosine similarity in/out of concept')
    fig.tight_layout()

    print('Saving image...', end=' ')
    os.makedirs(images_path, exist_ok=True)
    if only_train:
        cos_sims_image_path = os.path.join(images_path, 'train_cosine_similarities.png')
    else:
        cos_sims_image_path = os.path.join(images_path, 'cosine_similarities.png')
    fig.savefig(cos_sims_image_path, bbox_extra_artists=(lgd,xl,yl,title), 
                bbox_inches='tight')
    print('Done.')
    
    plt.show()

In [None]:
plot_cos_sim_histograms(metadata_df, cosine_similarity_df, images_path, only_train=only_train)

In [None]:
def show_similarity_concepts(concepts, images_path, only_train=True):
    concept_embs = torch.stack(list(concepts.values()), dim=0)
    concept_names = list(concepts.keys())
    cos_sim_concepts_df = compute_cosine_sims(concept_embs, 
                                              metadata_df,
                                              concept_embs, 
                                              concept_names, 
                                              '', save=False)
    cos_sim_concepts_df['Concept'] = list(concepts.keys())
    cos_sim_concepts_df = cos_sim_concepts_df.set_index('Concept')
    ax = sns.heatmap(cos_sim_concepts_df, annot=True, fmt=".2f", cmap="Blues",
               cbar_kws={'label': 'Cosine similarity'})
    ax.set_title('Cosine similarity between concepts')

    print('Saving image...', end=' ')
    os.makedirs(images_path, exist_ok=True)
    if only_train:
        cos_sims_image_path = os.path.join(images_path, 'train_cosine_similarities_concepts.png')
    else:
        cos_sims_image_path = os.path.join(images_path, 'cosine_similarities_concepts.png')
    plt.savefig(cos_sims_image_path)
    print('Done.')
    
    plt.show()

In [None]:
show_similarity_concepts(concepts, images_path, only_train)

## Concept presence models

In [None]:
metadata_df = pd.read_csv(f'../Data/{dataset_name}/metadata.csv')
embeddings = torch.load(f'Embeddings/{dataset_name}/embeddings.pt')
cosine_similarity_df = pd.read_csv(f'Cosine_Similarities/{dataset_name}/cosine_similarities.csv')

train_mask = metadata_df['split'] == 'train'
train_embeddings = embeddings[train_mask]
train_metadata_df = metadata_df[train_mask].reset_index(drop=True)
train_cosine_similarity_df = cosine_similarity_df[train_mask].reset_index(drop=True)

### (M1) Cosine similarity global threshold

In [None]:
from models import get_global_threshold

m1_models, m1_global_train_error, m1_train_errors = get_global_threshold(train_metadata_df, train_cosine_similarity_df)
m1_train_errors['Model'] = 'GT'

### (M2) Cosine similarity individual threshold

In [None]:
from models import get_individual_thresholds

m2_models, m2_train_errors = get_individual_thresholds(train_metadata_df, train_cosine_similarity_df)
m2_train_errors['Model'] = 'CT'

### (M3) Global cosine similarity logistic regression

In [None]:
from models import get_global_similarity_log_reg

m3_models, m3_global_train_error, m3_train_errors = get_global_similarity_log_reg(train_metadata_df, 
                                                                                  train_cosine_similarity_df)
m3_train_errors['Model'] = 'GLR'

### (M4) Cosine similarity logistic regression

In [None]:
from models import get_similarity_log_reg

m4_models, m4_train_errors = get_similarity_log_reg(train_metadata_df, train_cosine_similarity_df)
m4_train_errors['Model'] = 'CLR'

### (M5) Embeddings logistic regression

In [None]:
from models import get_embeddings_log_reg

m5_models, m5_train_errors = get_embeddings_log_reg(train_embeddings, train_metadata_df, train_cosine_similarity_df)
m5_train_errors['Model'] = 'EmbCLR'

## Comparing Models

### Train classification error

In [None]:
error_comparison_df = pd.DataFrame.from_dict([m1_train_errors, 
                                              m2_train_errors,
                                              m3_train_errors,
                                              m4_train_errors,
                                              m5_train_errors
                                             ])
error_comparison_df = error_comparison_df.set_index('Model')
error_comparison_df

### Test classification metrics

In [None]:
from utils import get_all_models_classification_metric

metadata_df = pd.read_csv(os.path.join(data_path, 'metadata.csv'))
cosine_similarity_df = pd.read_csv(os.path.join(cos_sims_path, 'train_cosine_similarities.csv'))
hidden_states = torch.load(os.path.join(embeddings_path, 'embeddings.pt'))

test_mask = metadata_df['split'] == 'test'
test_hidden_states = hidden_states[test_mask]
test_metadata_df = metadata_df[test_mask].reset_index(drop=True)
test_cosine_similarity_df = cosine_similarity_df[test_mask].reset_index(drop=True)

In [None]:
print('Test Accuracy')
models = {'GT': m1_models,
          'CT': m2_models,
          'GLR': m3_models,
          'CLR': m4_models,
          'EmbCLR': m5_models
         }
acc_df = get_all_models_classification_metric(models, test_metadata_df, test_cosine_similarity_df,
                                         test_hidden_states, metric='Acc')
acc_df

In [None]:
print('Test F1')
f1_df = get_all_models_classification_metric(models, test_metadata_df, test_cosine_similarity_df,
                                         test_hidden_states, metric='F1')
f1_df

In [None]:
print('Test AUC')
auc_df = get_all_models_classification_metric(models, test_metadata_df, test_cosine_similarity_df,
                                         test_hidden_states, metric='AUC')
auc_df

In [None]:
print('Test K1')
k1_df = get_all_models_classification_metric(models, test_metadata_df, test_cosine_similarity_df,
                                         test_hidden_states, metric='K1')
k1_df

In [None]:
print('Test K2')
k2_df = get_all_models_classification_metric(models, test_metadata_df, test_cosine_similarity_df,
                                         test_hidden_states, metric='K2')
k2_df

In [None]:
print('Test Kmax')
kmax_df = get_all_models_classification_metric(models, test_metadata_df, test_cosine_similarity_df,
                                         test_hidden_states, metric='Kmax')
kmax_df