# ▶️ Configure GPUs

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2"#"0"

# ▶️ Load and Preprocess DBPedia dataset

In [None]:
from datasets import load_dataset

dataset = load_dataset("fancyzhx/dbpedia_14")
CLASS_LABELS = dataset['train'].features['label'].names

## Reduce dataset size via sampling
Let's obtain the first **n** samples from each class

In [None]:
import numpy as np

def get_n_samples_per_class(dataset, n, shuffle = False, seed=0):
    """
        Given a dataset, obtain the first n samples from each class
        and return a smaller dataset containing all the samples.

        Args:
            dataset (Dataset): The dataset to sample.
            n (int): How many samples from each class to extract.
            shuffle (bool): Whether to sort the final result by class or randomly. NOTE: Dataset.shuffle() hangs indefinitely on Nix.

        Returns:
            sample (Dataset): The sampled dataset.
    """
    ds_sorted = dataset.sort('label')
    _, class_indices = np.unique(ds_sorted['label'], return_index=True)

    class_indices = np.array([list(range(index, index + n)) for index in class_indices])
    class_indices = class_indices.flatten()

    if shuffle:
        sample = dataset.shuffle(seed=seed).sort('label').select(class_indices) # Dataset.shuffle() hangs indefinitely on Nix - No idea why.
    else:
        sample = dataset.sort('label').select(class_indices)

    if shuffle: sample = sample.shuffle(seed=seed) # Dataset.shuffle() hangs indefinitely on Nix - No idea why.
    return sample

def sample_dataset(dataset, ratio = None, size = None, samples_per_class = None, seed=0):
    """
        Given a dataset, return a smaller dataset with an
        equal number of samples per class. You can specify
        the size of the new dataset directly (size), or
        using a number of samples per class (samples_per_class),
        or as a percentage of the original dataset (ratio).

        Args:
            dataset (Dataset): The dataset to sample.
            ratio (float, optional): What percentage of the dataset to sample.
            size (int, optional): Number of samples the new dataset should have.
            samples_per_class (int, optional): Number of samples per class the new dataset should have.
            seed (int, optional): Random seed.

        Returns:
            sampled_dataset (Dataset): The smaller dataset.
    """

    if ratio is None and size is None and samples_per_class is None:
        raise ValueError("Either ratio, size, or samples_per_class must be given.")

    if samples_per_class is None:
        if size is not None:
            ratio = size / dataset.num_rows
        ratio = max(ratio, 0)
        ratio = min(ratio, 1)
    
        samples_per_class = dataset.num_rows // len(dataset.features['label'].names)
        samples_per_class = int(samples_per_class * ratio)

    return get_n_samples_per_class(dataset, samples_per_class)

## 🔧 Configure Dataset Sample Size

In [None]:
ds = {}
ds['train'] = sample_dataset(dataset['train'], ratio=0.1)
ds['test'] = sample_dataset(dataset['test'], ratio=0.25)

print(f"Train: {len(ds['train'])} samples.")
print(f"Test: {len(ds['test'])} samples.")

## Format as a supervised fine-tuning dataset

To fine-tune our LLM, we will use the ``trl`` library with
the ``SFTTrainer`` class. To do this, we must format our data
in [conversational](https://huggingface.co/docs/trl/main/en/dataset_formats#conversational) format using chat templates.

In [None]:
from datasets import Value

def preprocess_dbpedia_sample(sample, class_labels):
    """
        Given a sample in the dbpedia_14 dataset, convert it
        to a format usable by SFTTrainer by substituting the
        label ID with the label name. ("0" -> "Company")
    """
    sample['label'] = class_labels[ int(sample['label'] )]
    sample['content'] = sample['content'].strip()
    return sample
    
def apply_chat_template(sample):
    """
        Convert a standard Dataset to conversational.
    """
    sample['messages'] = [
        {"role":"user", "content":sample['prompt']},
        {"role":"assistant", "content":sample['completion']}]
    del sample['prompt']
    del sample['completion']
    return sample
    
def process_dbpedia_dataset(dataset, seed=0):
    """
    Convert the dbpedia_14 dataset into the Hugging Face conversational dataset format.
    This format is outlined here: https://huggingface.co/docs/trl/main/en/dataset_formats#conversational

    This is needed to finetune our LLM, because the ``SFTTrainer``
    class from ``trl`` requires a dataset in the conversational format.

    Args:
        dataset (Dataset): The dataset to format.
        seed (int): Random number seed.

    Returns:
        dataset (Dataset): The dataset in conversational format.
    """
    
    label_names = dataset.features['label'].names
    
    # Change the label data type to string. (0 -> "0")
    dataset = dataset.cast_column("label", Value(dtype='string'))
    # Substitute the label ID with the label name. ("0" -> "Company")
    dataset = dataset.map( lambda x : preprocess_dbpedia_sample(x, label_names) )

    dataset = dataset.rename_column("content", "prompt")
    dataset = dataset.rename_column("label", "completion")
    dataset = dataset.remove_columns(["title"])
    dataset = dataset.map(apply_chat_template)

    dataset = dataset.shuffle(seed=seed)

    return dataset

In [None]:
ds['train'] = process_dbpedia_dataset(ds['train'])
ds['test'] = process_dbpedia_dataset(ds['test'])

# ▶️ Load Baseline LLM

In [None]:
DEVICE_MAP = "cuda:0"

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch

# Same quantization configuration as QLoRA
bnb_config = BitsAndBytesConfig(
    load_in_4bit = True,
    bnb_4bit_quant_type = "nf4", # QLoRA uses 4-bit NormalFloat precision,
    bnb_4bit_use_double_quant = True, # QLoRA uses double quantising,
    bnb_4bit_compute_dtype = torch.float32
)

model_id = "Qwen/Qwen2.5-7B-Instruct"
model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config, device_map=DEVICE_MAP)

tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
# How many Mb of RAM is the model using?
print(model.get_memory_footprint()/1e6)

# ▶️ Finetune LLM

In [None]:
PEFT_MODEL_NAME = "Qwen2.5-FT-DBPedia"

## 🔧 Configure Parameters

In [None]:
rank_dimension = 6 # the rank of the adapter, the lower the fewer parameters you'll need to train. (smaller = more compression)
lora_alpha = 8 # this is the scaling factor for LoRA layers (higher = stronger adaptation)
lora_dropout = 0.05 # dropout probability for LoRA layers (helps prevent overfitting)
max_seq_length = 10
epochs=1
learning_rate=2e-4

## Add LoRA adapters to model

In [None]:
from peft import get_peft_model, LoraConfig, prepare_model_for_kbit_training

model = prepare_model_for_kbit_training(model)

config = LoraConfig(
    r=rank_dimension,
    lora_alpha=lora_alpha,
    bias="none",           # BEWARE: training biases *modifies* base model's behavior
    lora_dropout=lora_dropout,
    task_type="CAUSAL_LM"
)

model = get_peft_model(model, config)

In [None]:
# How many Mb of RAM is the model using?
print(model.get_memory_footprint()/1e6)

In [None]:
from trl import SFTConfig, SFTTrainer

sft_config = SFTConfig(
    ## GROUP 1: Memory usage
    # These arguments will squeeze the most out of your GPU's RAM
    # Checkpointing
    gradient_checkpointing=True,
    # this saves a LOT of memory
    # Set this to avoid exceptions in newer versions of PyTorch
    gradient_checkpointing_kwargs={'use_reentrant': False},
    # Gradient Accumulation / Batch size
    # Actual batch (for updating) is same (1x) as micro-batch size
    gradient_accumulation_steps=1,
    # The initial (micro) batch size to start off with
    per_device_train_batch_size=16,
    # If batch size would cause OOM, halves its size until it works
    auto_find_batch_size=True,
    
    ## GROUP 2: Dataset-related
    max_seq_length=64,
    # Dataset
    # packing a dataset means no padding is needed
    packing=True,
    
    ## GROUP 3: These are typical training parameters
    num_train_epochs=epochs,
    learning_rate=learning_rate,
    # Optimizer
    # 8-bit Adam optimizer - doesn't help much if you're using LoRA!
    optim='adamw_torch_fused',
    # Learning rate schedule
    warmup_ratio=0.03,  # Portion of steps for warmup
    lr_scheduler_type="constant",  # Keep learning rate constant after warmup    
    
    ## GROUP 4: Logging parameters
    logging_steps=10,
    logging_dir='./logs',
    output_dir=PEFT_MODEL_NAME,
    report_to='none'
)

In [None]:
trainer = SFTTrainer(
    model=model,
    processing_class=tokenizer,
    args=sft_config,
    train_dataset=ds['train'],
)

## Start Finetuning

In [None]:
trainer.train()
trainer.save_model(PEFT_MODEL_NAME)

# ▶️ Load Finetuned LLM

In [None]:
DEVICE_MAP = "auto"#"cuda:0"
PEFT_MODEL_NAME = "Qwen2.5-FT-DBPedia"

from peft import PeftConfig, AutoPeftModelForCausalLM
from transformers import AutoTokenizer, BitsAndBytesConfig
import torch

bnb_config = BitsAndBytesConfig(
   load_in_4bit=True,
   bnb_4bit_quant_type="nf4",
   bnb_4bit_use_double_quant=False,
   bnb_4bit_compute_dtype=torch.float16
)
config = PeftConfig.from_pretrained(PEFT_MODEL_NAME)

tokenizer = AutoTokenizer.from_pretrained(config.base_model_name_or_path)
model = AutoPeftModelForCausalLM.from_pretrained(PEFT_MODEL_NAME, device_map=DEVICE_MAP, quantization_config=bnb_config)

# ▶️ LLM helper methods
Conventional methods to generate text with 🤗 Transformers models such as [TextGenerationPipeline](https://huggingface.co/docs/transformers/en/main_classes/pipelines#transformers.TextGenerationPipeline) don't work with LoRA models, so we will use [Daniel Godoy's method](https://github.com/dvgodoy/FineTuningLLMs/blob/main/Chapter6.ipynb) for text generation.

In [None]:
def gen_prompt(tokenizer, sentence):
    """
        Convert the user's query into conversational format.
        Source: https://github.com/dvgodoy/FineTuningLLMs/blob/main/Chapter6.ipynb
    """
    if type(sentence) is str:
        sentence = [{"role": "user", "content": sentence}]
    prompt = tokenizer.apply_chat_template(
        sentence, tokenize=False, add_generation_prompt=True
    )
    return prompt

def generate(query, model, tokenizer, 
             max_new_tokens=64, 
             skip_special_tokens=True, 
             response_only=True,
             do_sample=True,
             temperature=0.1):
    """
        Generate an LLM response to a user query.
        Source: https://github.com/dvgodoy/FineTuningLLMs/blob/main/Chapter6.ipynb
    """
    # Converts user query into a formatted prompt.
    prompt=gen_prompt(tokenizer,query)
    
    # Tokenizes the formatted prompt
    tokenized_input = tokenizer(prompt,
                                add_special_tokens=False,
                                return_tensors="pt").to(model.device)

    model.eval()
    
    # Generates the response/completion
    generation_output = model.generate(**tokenized_input,
                                       max_new_tokens=max_new_tokens,
                                       do_sample=do_sample,
                                       temperature=temperature)
    
    # If required, removes the tokens belonging to the prompt
    if response_only:
        input_length = tokenized_input['input_ids'].shape[1]
        generation_output = generation_output[:, input_length:]
    
    # Decodes the tokens back into text
    output = tokenizer.batch_decode(generation_output, 
                                    skip_special_tokens=skip_special_tokens)[0]
    return output

# ▶️ Evaluate LLM

## Text Classification Prompts

In [None]:
from model_prompts import *

In [None]:
def get_classification_prompt(article, prompt):
    """
      For a given article in the Dataset,
      return a LLM prompt in chat template form
      to get its category.

      Args:
          article (Dictionary): Any item in the dataset.
          prompt (str): A model prompt with article classification instructions.

      Returns:
          prompt (Dictionary): The prompt as a [Chat Template](https://huggingface.co/docs/transformers/main/en/chat_templating).
    """

    if prompt is not None and prompt != "":
        prompt = {"role": "system", "content": prompt}
    
    return [
      prompt,
      {"role": "user", "content": article['messages'][0]['content'].strip()},
    ]

## Evaluation Method

In [None]:
CLASS_LABELS = ['Company',
 'EducationalInstitution',
 'Artist',
 'Athlete',
 'OfficeHolder',
 'MeanOfTransportation',
 'Building',
 'NaturalPlace',
 'Village',
 'Animal',
 'Plant',
 'Album',
 'Film',
 'WrittenWork',
 'Unknown'] # Final label is only used if LLM *cannot* predict the correct answer.

In [None]:
import random
import re

def get_class_label_from_string(string, class_labels = CLASS_LABELS, unknown_label_strategy = "last"):
    """
    Extract a class label by name from a string and return its ID.
    If no match is found, choose a random label ID.

    Args:
    string (str): A string containing the name of one class label.
    unknown_label_strategy (str): What to do if the string does not contain any class labels.
                                    "random": Return a random class label.
                                    "last": Return the last class label in the list.

    Returns:
    class_id (int): The ID of the matching class label.
    """

    # UNKNOWN_LABEL_STRATEGIES = ["random", "last"]
    # if not unknown_label_strategy in UNKNOWN_LABEL_STRATEGIES:
    #     raise Exception(f"unknown_label_strategy must be one of the following: {UNKNOWN_LABEL_STRATEGIES}")
    
    # Return a direct match if possible
    try:
        class_id = class_labels.index(string)
        return class_id
    except Exception as e:
        pass

    string = string.lower().strip()
    
    # Match class labels if string is truncated.
    # E.g., "mean" -> "meanoftransportation" -> 5
    # This allows us to match labels even we don't
    # have enough tokens to write the entire name.
    # This allows us to optimise the evaluation
    # procedure by using lower max tokens.
    for i, label in enumerate(class_labels):
        
        label_truncated = label.lower().replace(" ", "")[0:len(string)]
        if string == label_truncated:
            # print(f"truncated match: {string} = {label}")
            return i
    
    # Concatenate all label names using boolean OR.
    match = "|".join(class_labels).lower().replace(" ", r"\s*")

    # Find all instances of label name strings within the base string.
    matches = re.findall(match, string)

    # If no class label is found in the LLM text, pick a random label.
    if matches == []:
        # print(f"not found: {string}")
        match unknown_label_strategy:
            case "random":
                return random.randint(0, len(class_labels) - 1)
            case "last":
                return len(class_labels) - 1

    # Get the last matching label from the string.
    final_match = matches[-1]

    # Remove all capitalisation, non-alphabetic characters, and whitespace
    labels_sanitised = [re.sub("[^a-z]", "", label.lower()) for label in class_labels]
    match_sanitised = re.sub("[^a-z]", "", final_match.lower())

    # Find the matching class ID for the label.
    class_id = labels_sanitised.index(match_sanitised)
    # print(f"regex match: {string}")
    return class_id

In [None]:
import re

def get_first_number_from_string(string):
  """
  Returns the first whole number from a string as an integer.
  """
  first_number=re.findall(r"\d+",string)
  if first_number is not None:
    first_number = int(first_number[0])
    return first_number
  else:
    raise Exception(f"No number found in string: {string}")

def get_category_label(article, model, tokenizer, classification_prompt, extractor_func, max_tokens = 10):
  """
  For a given article in the DBPedia dataset, predict its category label.

  Args:
    article (str): Article contents as raw text.
    classifiction_prompt (str): Model instructions on how to classify articles.
    extractor_func (func): A method which takes the model's response and returns a classification label as an integer.
    max_tokens (int): Model response word limit.

  Returns:
    output (tuple<int, str>): The category of the article and the raw LLM output.
  """
  input_prompt = get_classification_prompt(article, classification_prompt)

  response = generate(input_prompt, model=model, tokenizer=tokenizer, max_new_tokens=max_tokens)

  class_id = extractor_func(response)

  return (class_id, response)

In [None]:
from tqdm.notebook import tqdm

def predict_classes(dataset, model, tokenizer, classification_prompt, extractor_func, max_tokens):
  """
    For a given RFPedia dataset, use the contents of each article to predict its label.

    Args:
      dataset (Dataset): The dataset to sample.
      classifiction_prompt (str): Model instructions on how to classify articles.
      extractor_func (func): A method which takes the model's response and returns a classification label as an integer.
      max_tokens (int): Model response word limit.

    Returns:
      results (tuple<list, list, list>):
        y_pred (list<int>): Predicted labels
        y_true (list<int>): Actual labels (groundtruth)
        responses (list<str>): Raw LLM response for each test sample.
  """
  y_pred = []

  labels = [message[-1]['content'] for message in dataset['messages']]
  y_true = [extractor_func(label) for label in labels]
    
  responses = []

  # TODO: This is unoptimized, use a dataset for this.
  for item in tqdm(dataset, "Classifying articles"):

    pred_label, response = get_category_label(item, model, tokenizer, classification_prompt, extractor_func, max_tokens)

    y_pred.append( pred_label )
    responses.append( response )

  return y_pred, y_true, responses

## 🔧 Configure Evaluation

In [None]:
# Finetuned configuration
configurations=[
    {"name" : "Fine-tuned",
     "prompt" : "",
     "max_tokens" : 1,
     "extractor_func" : get_class_label_from_string},
]

In [None]:
# Baseline configuration

# configurations = [
#     {"name" : "Zero-shot",
#      "prompt" : PROMPT_ZEROSHOT,
#      "max_tokens" : 10,
#      "extractor_func" : get_first_number_from_string},

#     {"name" : "Chain-of-Thought",
#      "prompt" : PROMPT_COT,
#      "max_tokens" : 100,
#      "extractor_func" : get_class_label_from_string},

#     {"name" : "Meta Prompt",
#      "prompt" : PROMPT_META,
#      "max_tokens" : 100,
#      "extractor_func" : get_class_label_from_string},

#     {"name" : "2-Shot CoT",
#      "prompt" : PROMPT_COT_2SHOT,
#      "max_tokens" : 100,
#      "extractor_func" : get_class_label_from_string},

#     {"name" : "4-Shot CoT",
#      "prompt" : PROMPT_COT_4SHOT,
#      "max_tokens" : 100,
#      "extractor_func" : get_class_label_from_string}
# ]

## Run Evaluation

In [None]:
# Predict all article categories in the dataset

y_true = None

for config in tqdm(configurations, "Testing LLM configurations"):

  args = config['prompt'], config['extractor_func'], config['max_tokens']

  config['y_pred'], config['y_true'], config['responses'] = predict_classes(ds['test'], model, tokenizer, *args)

## Return Evaluation Results

### Save all LLM responses

In [None]:
def save_answers(config, class_labels, incorrect_only = False):
    output_dir = os.path.join("output", config['name'].replace(' ', '_').lower())
    
    # Get a boolean mask for every incorrect prediction
    if incorrect_only:
        mask = np.array(config['y_pred']) != np.array(config['y_true'])
    else:
        mask = np.full(len(config['y_pred']), True)
    
    # Get the index of every incorrect prediction
    index = np.array(list(range(ds['test'].num_rows)))[mask]
    
    # Get every incorrect predicted label
    labels = np.array(config['y_pred'])[mask]
    
    answers = {
      # Obtain title and content of article
      #"Title" : np.array([item['title'] for item in ds['test']])[mask],
      "Content" : np.array([item['messages'][0]['content'] for item in ds['test']])[mask],
    
      # Get the class names for y_pred and y_true
      "Predicted Category" : [class_labels[id] for id in np.array(config['y_pred'])[mask]],
      "Actual Category" : np.array([item['messages'][-1]['content'] for item in ds['test']])[mask],
    
      # Get LLM raw text output
      "LLM Output" : np.array(config['responses'])[mask]
    }
    
    answers = pd.DataFrame(answers, index=index)
    file_name = "answers_incorrect.csv" if incorrect_only else "answers.csv"
    
    answers.to_csv( os.path.join(output_dir, file_name), escapechar='\\' )
    print(f"\n{config['name']} {"incorrect " if incorrect_only else ""}answers saved to: {os.path.join(output_dir, file_name)}")
    
    return answers

### Accuracy report + Confusion Matrix

In [None]:
from sklearn.metrics import classification_report, ConfusionMatrixDisplay, confusion_matrix
from matplotlib import pyplot as plt
import pandas as pd
import os
import json
import shutil

def evaluate_model(config, class_labels, display_as_percentage = True):
    """
    Evaluate a model configuration and save the results to output_dir.
    """
    
    output_dir = os.path.join("output", config['name'].replace(' ', '_').lower())
    shutil.rmtree(output_dir) # Remove any existing output files.
    os.makedirs(output_dir, exist_ok=True)

    output = {
        "name": config["name"],
        "prompt" : config["prompt"],
        "max_tokens" : config["max_tokens"],
        "data" : [{
            "y_pred" : config["y_pred"],
            "y_true" : config["y_true"],
            "llm_output" : config["responses"]
        }]
    }

    with open( os.path.join(output_dir, "output.json"), "w" ) as f:
        json.dump(output, f)
    
    y_true, y_pred, config_name = config['y_true'], config['y_pred'], config['name']
    
    # Get precision, recall, and F1 score
    classif_report = classification_report(y_true, y_pred, zero_division=0.0, output_dict=True)
    classif_report = pd.DataFrame(classif_report).transpose()
    
    # Save classification report to output dir
    classif_report.to_csv( os.path.join(output_dir, "evaluation.csv"), escapechar='\\',index=False )

    class_labels = class_labels[0:len(np.unique(y_pred))]

    cm = confusion_matrix(y_true=y_true,y_pred=y_pred,normalize='true' if display_as_percentage else None)

    disp = ConfusionMatrixDisplay(cm, display_labels=class_labels).plot(
        cmap = plt.cm.Blues,
        xticks_rotation='vertical',
        text_kw={'fontsize': 6},
        values_format='.0%' if display_as_percentage else None
    )

    plt.savefig( os.path.join(output_dir, "confusion_matrix.png"), dpi=200, bbox_inches='tight' )
    print(f"\n{config_name} evaluation results saved to: {output_dir}")
    #plt.show()

    

### Run Evaluation

In [None]:
for config in configurations:
    evaluate_model(config, class_labels=CLASS_LABELS)
    save_answers(config, class_labels=CLASS_LABELS, incorrect_only = False)
    save_answers(config, class_labels=CLASS_LABELS, incorrect_only = True)