### [Colab Link](https://colab.research.google.com/drive/146ZKJIa8K9rm0tMgxhP27KOaZeCHLrkq?usp=drive_link)

# Fake News Detection using (Small and Large) Language Models

- Assignment: ST 311 Final Project
- Authors: 24788, 21840

# SLMs - LIAR

References:
- We use the following Hugging Face notebook as a reference: https://colab.research.google.com/github/huggingface/notebooks/blob/main/transformers_doc/en/pytorch/training.ipynb

In [None]:
#!pip install accelerate -U
#!pip install transformers datasets
#!pip install evaluate

## Load and Filter the dataset to include only rows where the label is either 0 or 3

In [None]:
from datasets import DatasetDict
from datasets import load_dataset

dataset = load_dataset("liar")

In [None]:
# Define a function to filter and convert labels
def filter_and_convert_labels(example):
    if example['label'] in [0, 3]:
        return {'label': 0 if example['label'] == 0 else 1}
    return None

In [None]:
# Apply filtering and label conversion
filtered_dataset = DatasetDict({
    split: dataset[split].filter(lambda example: example['label'] in [0, 3])
    .map(filter_and_convert_labels) for split in dataset.keys()
})

In [None]:
# Display the first 5 entries of the training dataset
for i in range(6):
    print(filtered_dataset['train'][i])

## Preprocessing LIAR with two labels (TRUE AND FALSE)

In [None]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

In [None]:
def preprocess_function(examples):
    return tokenizer(examples["statement"], truncation=True)

In [None]:
tokenized_liar_train = filtered_dataset["train"].map(preprocess_function, batched=True)
tokenized_liar_test = filtered_dataset["test"].map(preprocess_function, batched=True)

In [None]:
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
id2label = {0: "false", 1: "true"}
label2id =  {"false": 0, "true": 1}

## Helpers

In [None]:
import evaluate

accuracy = evaluate.load("accuracy")

import numpy as np

def compute_metrics(eval_pred): # will use this inside the training function
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

def custom_precision_recall_f1(predicted, actual, true_label=1, false_label=0):
    true_positives = sum((p == true_label) and (a == true_label) for p, a in zip(predicted, actual))
    true_negatives = sum((p == false_label) and (a == false_label) for p, a in zip(predicted, actual))
    false_positives = sum((p == true_label) and (a == false_label) for p, a in zip(predicted, actual))
    false_negatives = sum((p == false_label) and (a == true_label) for p, a in zip(predicted, actual))

    precision_r = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    precision_f = true_negatives / (true_negatives + false_negatives) if (true_negatives + false_negatives) > 0 else 0
    precision = (precision_r + precision_f) / 2


    recall_r = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    recall_f = true_negatives / (true_negatives + false_positives) if (true_negatives + false_positives) > 0 else 0
    recall = (recall_r + recall_f) / 2

    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1_score

In [None]:
# Login to the Hugging Face CLI within a Google Colab environment
!huggingface-cli login

### For hyperparameters used in training for each SLM, here are the training arguments part of the provided code:

- learning_rate: 3e-06
- train_batch_size: 8
- eval_batch_size: 8
- seed: 42
- optimizer: Adam with betas=(0.9,0.999) and epsilon=1e-08
- lr_scheduler_type: linear
- num_epochs: 5

## Bert - Training and Evaluation

In [None]:
# Load the pretrained bert-base-cased model
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

model_binary_bert = AutoModelForSequenceClassification.from_pretrained(
      "bert-base-cased",
      num_labels=2,
      id2label=id2label,
      label2id=label2id
)

In [None]:
training_args = TrainingArguments(
    output_dir="liar_binaryclassifier_bert_cased",
    learning_rate=3e-06,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.1,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
)

trainer = Trainer(
    model=model_binary_bert,
    args=training_args,
    train_dataset=tokenized_liar_train,
    eval_dataset=tokenized_liar_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [None]:
# Evaluate pre-trained model's performance
pretrained_eval_results = trainer.evaluate()
print("Pre-trained model performance:")
print(pretrained_eval_results)

In [None]:
# Extract the actual labels
actual_results = tokenized_liar_test['label']

# Predict labels on the test dataset
pretrained_test_results = trainer.predict(tokenized_liar_test)

# Extract the predicted labels
pretrained_test_predictions = pretrained_test_results.predictions.argmax(axis=1)

# Returns the prediction vector from the pre-trained model on the test dataset
pretrained_test_predictions

In [None]:
# Use pretrained_test_predictions along with actual labels to compute custom precision, recall, and F1 score
precision, recall, f1_score = custom_precision_recall_f1(pretrained_test_predictions, actual_results)
print("precision:", precision, "recall:", recall, "f1_score:", f1_score)

In [None]:
# Fine-tune all layers of the pre-trained model
trainer.train() # call this to start training

In [None]:
# Push the trained model to the Hugging Face model hub
trainer.push_to_hub()

In [None]:
# Extract the actual labels
actual = tokenized_liar_test['label']

# Evaluate trained model results
eval_results = trainer.evaluate()
print("Fine-tuned model performance:")
print(eval_results)

# Predict labels on the test dataset
test_results = trainer.predict(tokenized_liar_test)

# Extract the predicted labels
test_predictions = test_results.predictions.argmax(axis=1)

# Returns the prediction vector from the fine-tuned model on the test dataset
pretrained_test_predictions

In [None]:
# Use test_predictions along with actual labels to compute custom precision, recall, and F1 score
precision, recall, f1_score = custom_precision_recall_f1(test_predictions, actual_results)

print("precision:", precision, "recall:", recall, "f1_score:", f1_score)

## DistilBert - Training and Evaluation

In [None]:
# Load the pretrained distilbert-base-cased model
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

model_binary_distilbert = AutoModelForSequenceClassification.from_pretrained(
      "bert-base-cased",
      num_labels=2,
      id2label=id2label,
      label2id=label2id
)

In [None]:
training_args = TrainingArguments(
    output_dir="liar_binaryclassifier_distilbert_cased",
    learning_rate=3e-06,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.1,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
)

trainer = Trainer(
    model=model_binary_distilbert,
    args=training_args,
    train_dataset=tokenized_liar_train,
    eval_dataset=tokenized_liar_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [None]:
# Evaluate pre-trained model's performance
pretrained_eval_results = trainer.evaluate()
print("Pre-trained model performance:")
print(pretrained_eval_results)

In [None]:
# Extract the actual labels
actual_results = tokenized_liar_test['label']

# Predict labels on the test dataset
pretrained_test_results = trainer.predict(tokenized_liar_test)

# Extract the predicted labels
pretrained_test_predictions = pretrained_test_results.predictions.argmax(axis=1)

# Returns the prediction vector from the pre-trained model on the test dataset
pretrained_test_predictions

In [None]:
# Use pretrained_test_predictions along with actual labels to compute custom precision, recall, and F1 score
precision, recall, f1_score = custom_precision_recall_f1(pretrained_test_predictions, actual_results)
print("precision:", precision, "recall:", recall, "f1_score:", f1_score)

In [None]:
# Fine-tune all layers of the pre-trained model
trainer.train() # call this to start training

In [None]:
# Push the trained model to the Hugging Face model hub
trainer.push_to_hub()

In [None]:
# Extract the actual labels
actual = tokenized_liar_test['label']

# Evaluate trained model results
eval_results = trainer.evaluate()
print("Fine-tuned model performance:")
print(eval_results)

# Predict labels on the test dataset
test_results = trainer.predict(tokenized_liar_test)

# Extract the predicted labels
test_predictions = test_results.predictions.argmax(axis=1)

# Returns the prediction vector from the fine-tuned model on the test dataset
pretrained_test_predictions

In [None]:
# Use test_predictions along with actual labels to compute custom precision, recall, and F1 score
precision, recall, f1_score = custom_precision_recall_f1(test_predictions, actual_results)

print("precision:", precision, "recall:", recall, "f1_score:", f1_score)

## Roberta - Training and Evaluation

In [None]:
# Load the pretrained roberta-base model
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

model_binary_roberta = AutoModelForSequenceClassification.from_pretrained(
      "roberta-base",
      num_labels=2,
      id2label=id2label,
      label2id=label2id
)

In [None]:
training_args = TrainingArguments(
    output_dir="liar_binaryclassifier_roberta_base",
    learning_rate=3e-06,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.1,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
)

trainer = Trainer(
    model=model_binary_roberta,
    args=training_args,
    train_dataset=tokenized_liar_train,
    eval_dataset=tokenized_liar_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [None]:
# Evaluate pre-trained model's performance
pretrained_eval_results = trainer.evaluate()
print("Pre-trained model performance:")
print(pretrained_eval_results)

In [None]:
# Extract the actual labels
actual_results = tokenized_liar_test['label']

# Predict labels on the test dataset
pretrained_test_results = trainer.predict(tokenized_liar_test)

# Extract the predicted labels
pretrained_test_predictions = pretrained_test_results.predictions.argmax(axis=1)

# Returns the prediction vector from the pre-trained model on the test dataset
pretrained_test_predictions

In [None]:
# Use pretrained_test_predictions along with actual labels to compute custom precision, recall, and F1 score
precision, recall, f1_score = custom_precision_recall_f1(pretrained_test_predictions, actual_results)
print("precision:", precision, "recall:", recall, "f1_score:", f1_score)

In [None]:
# Fine-tune all layers of the pre-trained model
trainer.train() # call this to start training

In [None]:
# Push the trained model to the Hugging Face model hub
trainer.push_to_hub()

In [None]:
# Extract the actual labels
actual = tokenized_liar_test['label']

# Evaluate trained model results
eval_results = trainer.evaluate()
print("Fine-tuned model performance:")
print(eval_results)

# Predict labels on the test dataset
test_results = trainer.predict(tokenized_liar_test)

# Extract the predicted labels
test_predictions = test_results.predictions.argmax(axis=1)

# Returns the prediction vector from the fine-tuned model on the test dataset
pretrained_test_predictions

In [None]:
# Use test_predictions along with actual labels to compute custom precision, recall, and F1 score
precision, recall, f1_score = custom_precision_recall_f1(test_predictions, actual_results)

print("precision:", precision, "recall:", recall, "f1_score:", f1_score)

# LLMs - LIAR

References:

- For prompt tuning, we use the following Hugging Face guide as a reference: https://huggingface.co/docs/peft/task_guides/prompt_based_methods?configurations=prompt+tuning

- For Unsloth + SFT tuning, we use this Unsloth guide as a reference: https://colab.research.google.com/drive/135ced7oHytdxu3N2DNe1Z0kqjyYIkDXp?usp=sharing


##  Training (Prompt Tuning) and Evaluation

### Used with: stablelm-2-zephyr-1_6b, bloomz-1b1, bloomz-560m

In [None]:
# !pip install -q peft transformers datasets

In [None]:
from transformers import AutoModelForCausalLM
from peft import get_peft_config, get_peft_model, PromptTuningInit, PromptTuningConfig, TaskType, PeftType
import torch
from datasets import load_dataset
import os
from transformers import AutoTokenizer
from torch.utils.data import DataLoader
from transformers import default_data_collator, get_linear_schedule_with_warmup
from tqdm import tqdm

Below we set the model as "stabilityai/stablelm-2-zephyr-1_6b", but the rest of the notebook is applicable to bloomz-1b1 and bloomz-560m as well.

In [None]:
# Choose the model, can use any LLM suitable for text generation
model_name_or_path = "stabilityai/stablelm-2-zephyr-1_6b"
tokenizer_name_or_path = "stabilityai/stablelm-2-zephyr-1_6b"

In [None]:
device = "cuda"

peft_config = PromptTuningConfig( # This creates the PEFT configuration, used in loading the model later
    task_type=TaskType.CAUSAL_LM, # Text generation
    prompt_tuning_init=PromptTuningInit.TEXT, # Initiate the prompt for prompt tuning
    num_virtual_tokens=8,
    prompt_tuning_init_text="Predict if the statement is true or false.", # This is the prompt initiated for prompt tuning
    tokenizer_name_or_path=model_name_or_path,
)

dataset_name = "liar"
checkpoint_name = f"{dataset_name}_{model_name_or_path}_{peft_config.peft_type}_{peft_config.task_type}_v1.pt".replace(
    "/", "_"
)
text_column = "statement"
label_column = "label"

In [None]:
# hyperparameters
max_length = 64 # max length generated for text, if higher inference takes longer
lr = 3e-3 # learning rate, low is better for LLM fine tuning
num_epochs = 2
batch_size = 8 # higher requires higher memory, 8 is used often

In [None]:
# load and filter dataset: liar
from datasets import load_dataset

dataset = load_dataset(dataset_name)

filtered_dataset = dataset.filter(lambda example: example["label"] in [0, 3])

classes = [k.replace("_", " ") for k in filtered_dataset["train"].features["label"].names]

filtered_dataset = filtered_dataset.map(
    lambda x: {"label": [classes[label] for label in x["label"]]},
    batched=True,
    num_proc=1,
)
filtered_dataset["train"][0] # print a row to check

In [None]:
# data preprocessing
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id
target_max_length = max([len(tokenizer(class_label)["input_ids"]) for class_label in classes])

'''
Create a preprocessing function that tokenizes the tweet text and labels,
  pad the inputs and labels in each batch,
  create an attention mask,
  and truncate sequences to the max_length.
  Then convert the input_ids, attention_mask, and labels to PyTorch tensors.
'''
def preprocess_function(examples):
    batch_size = len(examples[text_column])
    inputs = [f"{text_column} : {x} Label : " for x in examples[text_column]]
    targets = [str(x) for x in examples[label_column]]
    model_inputs = tokenizer(inputs)
    labels = tokenizer(targets, add_special_tokens=False)  # don't add bos token because we concatenate with inputs
    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        label_input_ids = labels["input_ids"][i] + [tokenizer.eos_token_id]
        model_inputs["input_ids"][i] = sample_input_ids + label_input_ids
        labels["input_ids"][i] = [-100] * len(sample_input_ids) + label_input_ids
        model_inputs["attention_mask"][i] = [1] * len(model_inputs["input_ids"][i])
    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        label_input_ids = labels["input_ids"][i]
        model_inputs["input_ids"][i] = [tokenizer.pad_token_id] * (
            max_length - len(sample_input_ids)
        ) + sample_input_ids
        model_inputs["attention_mask"][i] = [0] * (max_length - len(sample_input_ids)) + model_inputs[
            "attention_mask"
        ][i]
        labels["input_ids"][i] = [-100] * (max_length - len(sample_input_ids)) + label_input_ids
        model_inputs["input_ids"][i] = torch.tensor(model_inputs["input_ids"][i][:max_length])
        model_inputs["attention_mask"][i] = torch.tensor(model_inputs["attention_mask"][i][:max_length])
        labels["input_ids"][i] = torch.tensor(labels["input_ids"][i][:max_length])
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

# process the train dataset using train_preprocess_function
processed_datasets = filtered_dataset.map(
    preprocess_function,
    batched=True,
    num_proc=1,
    remove_columns=filtered_dataset["train"].column_names, # remove not needed columns
    load_from_cache_file=False,
    desc="Running tokenizer on dataset",
)

train_dataset = processed_datasets["train"]
eval_dataset = processed_datasets["train"]

# create a data loader to be used in training
train_dataloader = DataLoader(
    train_dataset, shuffle=True, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True
)
eval_dataloader = DataLoader(eval_dataset, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True)

In [None]:
def test_preprocess_function(examples):
    batch_size = len(examples[text_column])
    inputs = [f"{text_column} : {x} Label : " for x in examples[text_column]]
    model_inputs = tokenizer(inputs)
    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        model_inputs["input_ids"][i] = [tokenizer.pad_token_id] * (
            max_length - len(sample_input_ids)
        ) + sample_input_ids
        model_inputs["attention_mask"][i] = [0] * (max_length - len(sample_input_ids)) + model_inputs[
            "attention_mask"
        ][i]
        model_inputs["input_ids"][i] = torch.tensor(model_inputs["input_ids"][i][:max_length])
        model_inputs["attention_mask"][i] = torch.tensor(model_inputs["attention_mask"][i][:max_length])
    return model_inputs

# process the test dataset using test_preprocess_function
test_dataset = filtered_dataset["test"].map(
    test_preprocess_function,
    batched=True,
    num_proc=1,
    remove_columns=filtered_dataset["train"].column_names,
    load_from_cache_file=False,
    desc="Running tokenizer on dataset",
)

test_dataloader = DataLoader(test_dataset, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True)

In [None]:
# creating the model
model = AutoModelForCausalLM.from_pretrained(model_name_or_path)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters() # prints the number of parameters we can train, very low for PEFT!

In [None]:
# optimizer and lr scheduler
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=(len(train_dataloader) * num_epochs),
)

In [None]:
# training and evaluation
model = model.to(device)

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for step, batch in enumerate(tqdm(train_dataloader)):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        total_loss += loss.detach().float()
        loss.backward()
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

    model.eval()
    eval_loss = 0
    eval_preds = []
    for step, batch in enumerate(tqdm(eval_dataloader)):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)
        loss = outputs.loss
        eval_loss += loss.detach().float()
        eval_preds.extend(
            tokenizer.batch_decode(torch.argmax(outputs.logits, -1).detach().cpu().numpy(), skip_special_tokens=True)
        )

    eval_epoch_loss = eval_loss / len(eval_dataloader)
    eval_ppl = torch.exp(eval_epoch_loss)
    train_epoch_loss = total_loss / len(train_dataloader)
    train_ppl = torch.exp(train_epoch_loss)
    print(f"{epoch=}: {train_ppl=} {train_epoch_loss=} {eval_ppl=} {eval_epoch_loss=}")

In [None]:
# saving the model
peft_model_id = f"{dataset}_{model_name_or_path}_{peft_config.peft_type}_{peft_config.task_type}".replace(
    "/", "_"
)
model.save_pretrained(peft_model_id)

In [None]:
# loading the fine-tuned model
from peft import PeftModel, PeftConfig

peft_model_id = f"{dataset_new}_{model_name_or_path}_{peft_config.peft_type}_{peft_config.task_type}".replace(
    "/", "_"
)

# we need to specify PEFT configuration every time we want to load a model fine-tuned with PEFT
config = PeftConfig.from_pretrained(peft_model_id)
model = AutoModelForCausalLM.from_pretrained(config.base_model_name_or_path)
model = PeftModel.from_pretrained(model, peft_model_id)

In [None]:
# inference, calculate test predictions
import re

model.to(device)
model.eval()

predicted_labels = []

for i in range(len(filtered_dataset["test"])):
    inputs = tokenizer(f'{text_column} : {filtered_dataset["test"][i]["statement"]} Label : ', return_tensors="pt")

    with torch.no_grad():
        inputs = {k: v.to(device) for k, v in inputs.items()}
        outputs = model.generate(
            input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], max_new_tokens=10, eos_token_id=3
        )
        generated_text = tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True)

        # Extracting the label from the generated text using regular expressions
        label_text = generated_text[0]
        label_match = re.search(r'Label : (\d+)', label_text)
        if label_match:
            label = int(label_match.group(1))
            predicted_labels.append(label)
        else:
            print(f"Label not found in generated text: {label_text}")

print(predicted_labels)
len(predicted_labels) # check the length of labels vector to see if it matches the test set size

In [None]:
# extract the real labels from the test set
actual_labels = []

for i in range(len(filtered_dataset["test"])):
    actual_label = filtered_dataset["test"][i]["label"]
    actual_labels.append(actual_label)

print(actual_labels)

In [None]:
# calculate accuracy
correct_predictions = sum(1 for pred, actual in zip(predicted_labels, actual_labels) if pred == actual)
total_predictions = len(predicted_labels)
accuracy = correct_predictions / total_predictions

print(f"Accuracy: {accuracy:.2%}")

In [None]:
# function to calculate precision, recall, and f1
def custom_precision_recall_f1(predicted, actual, true_label=3, false_label=0):
    true_positives = sum((p == true_label) and (a == true_label) for p, a in zip(predicted, actual))
    true_negatives = sum((p == false_label) and (a == false_label) for p, a in zip(predicted, actual))
    false_positives = sum((p == true_label) and (a == false_label) for p, a in zip(predicted, actual))
    false_negatives = sum((p == false_label) and (a == true_label) for p, a in zip(predicted, actual))

    precision_r = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    precision_f = true_negatives / (true_negatives + false_negatives) if (true_negatives + false_negatives) > 0 else 0
    precision = (precision_r + precision_f) / 2


    recall_r = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    recall_f = true_negatives / (true_negatives + false_positives) if (true_negatives + false_positives) > 0 else 0
    recall = (recall_r + recall_f) / 2

    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1_score

In [None]:
# print precision, recall, and f1
custom_precision_recall_f1(predicted_labels, actual_labels)

##  Training (Unsloth + SFT) and Evaluation

### Used with: llama-3-8b-bnb-4bit, gemma-7b-bnb-4bit, mistral-7b-bnb-4bit, tinyllama-bnb-4bit

In [None]:
#! pip install git+https://github.com/huggingface/transformers.git
#! pip install 'transformers>=3.9.1' accelerate
#! pip install -i https://pypi.org/simple/ bitsandbytes
#! pip install datasets

Installing Unsloth as suggested by Hugging Face:

In [None]:
%%capture
import torch
major_version, minor_version = torch.cuda.get_device_capability()
# Must install separately since Colab has torch 2.2.1, which breaks packages
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
if major_version >= 8:
    # Use this for new GPUs like Ampere, Hopper GPUs (RTX 30xx, RTX 40xx, A100, H100, L40)
    !pip install --no-deps packaging ninja einops flash-attn xformers trl peft accelerate bitsandbytes
else:
    # Use this for older GPUs (V100, Tesla T4, RTX 20xx)
    !pip install --no-deps xformers trl peft accelerate bitsandbytes
pass

Import the models using the Fast Language Model framework from Unsloth.

In [None]:
from unsloth import FastLanguageModel
import torch
max_seq_length = 2048 # Choose any! We auto support RoPE Scaling internally!
dtype = None # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
load_in_4bit = True # Use 4bit quantization to reduce memory usage. Can be False.

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/gemma-7b-bnb-4bit", # we use gemma-7b-bnb-4bit but rest of the notebook is fully applicable to other models mentioned
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
    token = "put_your_own_HF_token_here", # needed for gated models like meta-llama/Llama-3-8b
)

We give access to our drive to save predictions to a drive folder as they are generated so that in case of interrupted connection we don't need to re-do all the inference.

In [None]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

A PEFT configuration is needed for fast language models as well. Note that this is different from the prompt tuning we used in the previous section and needed to use the customized Unsloth models.

In [None]:
model = FastLanguageModel.get_peft_model(
    model,
    r = 16, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj",],
    lora_alpha = 16,
    lora_dropout = 0, # Supports any, but = 0 is optimized
    bias = "none",    # Supports any, but = "none" is optimized
    use_gradient_checkpointing = "unsloth", # True or "unsloth" for very long context
    random_state = 3407,
    use_rslora = False,
    loftq_config = None, # And LoftQ
)

Load and preprocess the Liar dataset. A version with instruction and output columns needed for supervised fine tuning.

In [None]:
from datasets import load_dataset

# Define the function to add columns
def add_columns(example):
    instruction = "Analyze the following statement and decide if it is true or false."
    output = "The statement is true." if example["label"] == 3 else "The statement is false."
    example["instruction"] = instruction
    example["output"] = output
    return example

# Load the dataset
dataset = load_dataset("liar")

# Filter the dataset
filtered_dataset_train = dataset["train"].filter(lambda example: example["label"] in [0, 3])
filtered_dataset_test = dataset["test"].filter(lambda example: example["label"] in [0, 3])

# Add new columns "instruction" and "output" to each example
filtered_dataset_train = filtered_dataset_train.map(add_columns)

# Print a sample example to verify the addition of the new columns
print(filtered_dataset_train[0])

We create an alpaca prompt to use in training and inference and preprocess the dataset to have a single text column with instruction, input, and response.

In [None]:
alpaca_prompt =
"""
Below is an instruction that describes a task,
paired with an input that provides further context.
Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}
"""

EOS_TOKEN = tokenizer.eos_token # Get the EOS_TOKEN of the tokenizer
def formatting_prompts_func(examples):
    instructions = examples["instruction"]
    inputs       = examples["statement"]
    outputs      = examples["output"]
    texts = []
    for instruction, input, output in zip(instructions, inputs, outputs):
        # Must add EOS_TOKEN, otherwise your generation will go on forever!
        text = alpaca_prompt.format(instruction, input, output) + EOS_TOKEN
        texts.append(text)
    return { "text" : texts, }
pass

from datasets import load_dataset
dataset = filtered_dataset_train.map(formatting_prompts_func, batched = True,)

Now, load the SFTTrainer from Hugging Face and set training arguments.

In [None]:
from trl import SFTTrainer
from transformers import TrainingArguments

trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    train_dataset = dataset,
    dataset_text_field = "text",
    max_seq_length = max_seq_length,
    dataset_num_proc = 2,
    packing = False, # Can make training 5x faster for short sequences.
    args = TrainingArguments(
        per_device_train_batch_size = 2,
        gradient_accumulation_steps = 4,
        warmup_steps = 5,
        num_train_epochs = 1,
        #max_steps = 120, # could be used if we don't want full epoch runs
        learning_rate = 2e-4,
        fp16 = not torch.cuda.is_bf16_supported(),
        bf16 = torch.cuda.is_bf16_supported(),
        logging_steps = 1,
        optim = "adamw_8bit",
        weight_decay = 0.01,
        lr_scheduler_type = "linear",
        seed = 3407,
        output_dir = "outputs",
    ),
)

Run the training.

In [None]:
trainer_stats = trainer.train()

Now we do inference and save the predictions.

Note that for base models, we start from here and just do predictions.

In [None]:
import json

# Initialize an empty list to store the results
results = []

# Specify the file path where you want to save the results
file_path = "/content/drive/My Drive/results_liar_gemma7b_finetuned.json"  # Change the path as needed

# Define a function to save results to a file
def save_results(results, file_path):
    with open(file_path, 'w') as f:
        json.dump(results, f)

# Define the Alpaca prompt
alpaca_prompt =
"""Below is an instruction that describes a task,
paired with an input that provides further context.
Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""
FastLanguageModel.for_inference(model) # Enable native 2x faster inference

for example in filtered_dataset_test:
    inputs = tokenizer(
    [
        alpaca_prompt.format(
            "Analyze the news provided and decide if it is true or false.", # instruction
            f"{example['statement']}", # input
            "", # output - leave this blank for generation!
        )
    ], return_tensors = "pt").to("cuda")

    outputs = model.generate(**inputs, max_new_tokens = 128, use_cache = True)
    generated_text = tokenizer.batch_decode(outputs)

    print(generated_text)
    # Append the results to the list
    results.append({'statement': example['statement'], 'generated_text': generated_text, 'label': example['label']})

    # Periodically save the results to the file
    if len(results) % 10 == 0:  # Save every 10 predictions
        save_results(results, file_path)
        print("Result saved")

# Save the final results to the file
save_results(results, file_path)

Accuracy calculation.

In [None]:
import json

# Specify the file path from where you want to load the results
file_path = "/content/drive/My Drive/results_liar_gemma7b_finetuned.json"  # Change the path as needed

# load a json file
def load_results(file_path):
    with open(file_path, 'r') as f:
        results = json.load(f)
    return results

loaded_results = load_results(file_path)

In [None]:
# function to calculate accuracy, save extract predicted and actual labels
def calculate_accuracy(results):
    correct_predictions = 0
    total_predictions = len(results)
    prediction_labels = []
    actual_labels = []

    # this for loop is necessary because the answers generated are not always in the same format
    for entry in results:
        # extract the generated_text
        generated_text = entry['generated_text'][0]

        # find the index where the response starts
        start_index = generated_text.find("\n\n### Response:\n")

        # extract the response of the generated text
        response_part = generated_text[start_index + len("\n\n### Response:\n"):]

        generated_decision = None

        # split the response by line
        response_lines = response_part.split('\n')
        print(response_lines)

        true_line_number = None
        false_line_number = None

        # loop through each line in the response to find the indices of "true" and "false"
        for line_number, line in enumerate(response_lines):
            line = line.strip()  # Remove whitespace
            if "true" in line.lower():
                true_line_number = line_number
                if false_line_number is not None:  # If both "true" and "false" are found, break the loop
                    break
            elif "false" in line.lower():
                false_line_number = line_number
                if true_line_number is not None:  # If both "true" and "false" are found, break the loop
                    break

        # determine the decision based on which index first
        if true_line_number is not None and (false_line_number is None or true_line_number < false_line_number):
            generated_decision = "true"
        elif false_line_number is not None and (true_line_number is None or false_line_number < true_line_number):
            generated_decision = "false"
        else:
            generated_decision = None  # Neither "true" nor "false" found or found at the same line number

        # if no True or False found, default to "false" because we cannot be sure
        if generated_decision is None:
            generated_decision = "false"

        # convert the generated decision to label (0 for False and 3 for True)
        generated_label = 3 if generated_decision == 'true' else 0

        # get the actual label
        actual_label = entry['label']

        # append the label to use later
        prediction_labels.append(generated_label)
        actual_labels.append(actual_label)

        # check if the generated label matches the actual label
        if generated_label == actual_label:
            correct_predictions += 1

    # calculate accuracy
    accuracy = correct_predictions / total_predictions * 100
    return accuracy, prediction_labels, actual_labels

# output
accuracy, prediction_labels, actual_labels = calculate_accuracy(loaded_results)
print("Accuracy:", accuracy, "%")

In [None]:
# function to calculate precision, recall, and f1 using prediction and actual labels
def custom_precision_recall_f1(predicted, actual, true_label=3, false_label=0):
    true_positives = sum((p == true_label) and (a == true_label) for p, a in zip(predicted, actual))
    true_negatives = sum((p == false_label) and (a == false_label) for p, a in zip(predicted, actual))
    false_positives = sum((p == true_label) and (a == false_label) for p, a in zip(predicted, actual))
    false_negatives = sum((p == false_label) and (a == true_label) for p, a in zip(predicted, actual))

    precision_r = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    precision_f = true_negatives / (true_negatives + false_negatives) if (true_negatives + false_negatives) > 0 else 0
    precision = (precision_r + precision_f) / 2


    recall_r = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    recall_f = true_negatives / (true_negatives + false_positives) if (true_negatives + false_positives) > 0 else 0
    recall = (recall_r + recall_f) / 2

    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1_score

In [None]:
# print precision, recall, and f1
custom_precision_recall_f1(prediction_labels, actual_labels)