### Environment Setup

In [None]:
!mkdir data models
!pip install sacrebleu datasets

In [None]:
!cp utils.py /usr/local/lib/python3.10/dist-packages/transformers/generation/utils.py

###Importing Libraries

In [None]:
import numpy as np
import torch
from transformers import AutoModel, AutoTokenizer
from sklearn.linear_model import LogisticRegression
from tqdm.notebook import tqdm
import torch.nn as nn
from transformers import LlamaForCausalLM, AutoTokenizer
from torch.nn.utils.rnn import pad_sequence
from datasets import load_dataset
import sacrebleu
import os

###Authenticating with Hugging Face

In [None]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

### Language Model Selection

In [None]:
model_name = 'meta-llama/Llama-3.2-1B-Instruct' # Also tried "gpt-2"

### Preparing and Tokenizing Data

In [None]:
def annotate(prompt, answer, is_true, tokenizer):
    answer_begin = prompt.index(answer)
    question_tokens = tokenizer(prompt[:answer_begin], add_special_tokens=True).input_ids
    answer_tokens = tokenizer(prompt[answer_begin:], add_special_tokens=True).input_ids
    labels = [1] * len(question_tokens) + [int(is_true)] * len(answer_tokens)
    return torch.tensor(question_tokens + answer_tokens).long(), torch.tensor(labels).long()

def get_annotations():
    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # Load and preprocess dataset
    dataset = load_dataset("truthful_qa", "generation")['validation']

    # Split data into train/val
    test_indices = torch.rand(len(dataset)) < 0.8
    indices = {
        'test': torch.where(test_indices)[0],
        'train': torch.where(~test_indices)[0]
    }
    np.save('data/test_indices.npy', torch.where(test_indices)[0].numpy())
    train_tokens, train_labels = [], []
    val_tokens, val_labels = [], []
    for split in indices:
        for i in tqdm(indices[split], desc = f'Tokenizing {split}'):
            for answer, is_true in zip(dataset['correct_answers'][i] + dataset['incorrect_answers'][i], [True] * len(dataset['correct_answers'][i]) + [False] * len(dataset['incorrect_answers'][i])):
                chat = [
                  {"role": "user", "content": dataset['question'][i]},
                  {"role": "assistant", "content": answer},
                ]
                input_text = tokenizer.apply_chat_template(chat, tokenize=False)

                tokens, labels = annotate(input_text, answer, is_true, tokenizer)
                if split == 'train':
                    train_tokens.append(tokens)
                    train_labels.append(labels)
                else:
                    val_tokens.append(tokens)
                    val_labels.append(labels)

    train_tokens, train_labels = pad_sequence(train_tokens, batch_first=True, padding_value=tokenizer.eos_token_id), pad_sequence(train_labels, batch_first=True, padding_value=-1)
    val_tokens, val_labels = pad_sequence(val_tokens, batch_first=True, padding_value=tokenizer.eos_token_id), pad_sequence(val_labels, batch_first=True, padding_value=-1)

    os.makedirs(f'data/{model_name}', exist_ok=True)
    np.save(f'data/{model_name}/train_tokens.npy', train_tokens.numpy())
    np.save(f'data/{model_name}/train_labels.npy', train_labels.numpy())
    np.save(f'data/{model_name}/val_tokens.npy', val_tokens.numpy())
    np.save(f'data/{model_name}/val_labels.npy', val_labels.numpy())

get_annotations()

###Training a Truthfulness Classifier

In [None]:
def extract_token_embeddings(model, tokenizer, tokens, labels, device):
    embeddings = []
    flattened_labels = []

    batch_size = 32
    with torch.no_grad():
        for i in tqdm(range(0, len(tokens), batch_size)):
            batch_labels = labels[i:i+batch_size]
            inputs = torch.from_numpy(tokens[i:i+batch_size]).to(device)
            attention_mask = (inputs != tokenizer.eos_token_id).long().to(device)

            outputs = model(inputs, attention_mask=attention_mask)
            token_embeddings = outputs.last_hidden_state
            valid_mask = batch_labels != -1
            embeddings.append(token_embeddings[valid_mask].cpu().numpy())
            flattened_labels.append(batch_labels[valid_mask])

    return np.concatenate(embeddings), np.concatenate(flattened_labels)

def create_classifier():
    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    model.eval()

    # Use the appropriate device for Apple Silicon
    device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
    if torch.cuda.is_available():
        device = torch.device('cuda')
    print(f"Using device: {device}")
    model.to(device)

    # Load annotated data
    train_tokens = np.load(f'data/{model_name}/train_tokens.npy')
    train_labels = np.load(f'data/{model_name}/train_labels.npy')

    # Extract token embeddings and labels for training data
    print("Extracting token embeddings for training data...")
    train_features, train_targets = extract_token_embeddings(model, tokenizer, train_tokens, train_labels, device)

    # Check for any remaining -1 labels
    print(f"Unique labels in train set: {np.unique(train_targets)}")

    # Ensure no -1 labels
    assert -1 not in train_targets, "Found -1 labels in training data"

    # Train a logistic regression classifier
    print("Training logistic regression model...")
    classifier = LogisticRegression(max_iter=5000)
    classifier.fit(train_features, train_targets)

    # Save the classifier's weights and biases
    print("Saving the classifier's weights and biases...")
    os.makedirs(f'models/{model_name}', exist_ok=True)
    np.save(f'models/{model_name}/logistic_regression_weights.npy', classifier.coef_)
    np.save(f'models/{model_name}/logistic_regression_biases.npy', classifier.intercept_)

create_classifier()
torch.cuda.empty_cache()

###Truthful Language Model

In [None]:
class TruthfulLlama(LlamaForCausalLM):
	def add_truthful_head(self, model_name):
		weight = torch.from_numpy(np.load(f'models/{model_name}/logistic_regression_weights.npy')).float()
		bias = torch.from_numpy(np.load(f'models/{model_name}/logistic_regression_biases.npy')).float()
		self.truthful_head = torch.nn.Linear(self.model.config.hidden_size, 1)
		self.truthful_head.weight.data.copy_(weight)
		self.truthful_head.bias.data.copy_(bias)

	def forward(self, *args, **kwargs):
		outputs = super().forward(*args, **kwargs, output_hidden_states=True)
		outputs.logits += self.truthful_head(outputs.hidden_states[-1])
		return outputs

###Loading and Preparing for Evaluation

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side='left')
tokenizer.pad_token = tokenizer.eos_token
dataset = load_dataset("truthful_qa", "generation")['validation']
test_indices = np.load('data/test_indices.npy')

###Evaluating and Comparing Models

In [None]:
def get_bleu(model, tokenizer):
  batch_size = 32
  with torch.no_grad():
      references, output_texts = [], []
      for i in tqdm(range(0, len(test_indices), batch_size)):
          inds = test_indices[i: i + batch_size]
          input_texts = [dataset['question'][j] for j in inds]
          references = references + [dataset['correct_answers'][j] for j in inds]
          inputs = tokenizer(input_texts, padding='longest', return_tensors='pt')
          inputs = {k: v.to('cuda') for k, v in inputs.items()}

          outputs = model.generate(**inputs, max_new_tokens=200, temperature=0.9, num_beams=5, do_sample=True)
          output_texts.extend(tokenizer.batch_decode(outputs, skip_special_tokens=True))
          del inputs
      model_score = sacrebleu.corpus_bleu(output_texts, references=references).score
      return output_texts, model_score

model = TruthfulLlama.from_pretrained(model_name)
model.add_truthful_head(model_name)
model.cuda()

outputs_model, bleu_model = get_bleu(model, tokenizer)
torch.cuda.empty_cache()

model.cpu()
del model
model = LlamaForCausalLM.from_pretrained(model_name)
model.cuda()

outputs_llama, bleu_llama = get_bleu(model, tokenizer)
torch.cuda.empty_cache()

print(f'del BLEU: {round(100 * (bleu_model - bleu_llama)/bleu_llama, 2)}')

### Saving outputs

In [None]:
with open('Llama_outputs.txt', 'w') as f:
    for line in outputs_llama:
        f.write(line + '\n----\n')

with open('TruthfulLlama_outputs.txt', 'w') as f:
    for line in outputs_model:
        f.write(line + '\n----\n')