In [4]:
from trl import SFTTrainer, DataCollatorForCompletionOnlyLM
from transformers import (AutoTokenizer,
                          AutoModelForCausalLM,
                          BitsAndBytesConfig,
                          TrainingArguments,
                          pipeline)
import torch
from peft import LoraConfig
import huggingface_hub
import pandas as pd
import os
import wandb
import numpy as np
from torch.utils.data import Dataset
import logging
from tqdm import tqdm
from sklearn.metrics import (accuracy_score,
                             classification_report,
                             confusion_matrix)

In [None]:
!pip install 'accelerate>=0.26.0'
!pip install -U bitsandbytes
!pip install trl
!pip install datasets
!pip install transformers
!pip install peft
!pip install torch
!pip install huggingface-hub
!pip install wandb

In [2]:
def create_test_prompted_text(
    dataset,
    label_name,
):
    texts = []
    classes_names = ', '.join(list(dataset[label_name].unique()))

    for _, row in dataset.iterrows():
        texts.append(
            f"You will be given a part of an interview."
            f"Classify the response to the selected question"
            f"into one of the following categories: {classes_names}"
            f". \n\n ### Part of the interview ### \nIntervier:"
            f" {row['interview_question']} \nResponse:"
            f" {row['interview_answer']} \n\n### Selected Question ###\n"
            f"{row['question']} \n\nLabel:"
        )
    return texts


def predict(test, categories, model, tokenizer):

    # Set logging level to ERROR to suppress INFO messages
    logging.basicConfig(level=logging.ERROR)

    y_pred = []
    pipe = pipeline(task="text-generation",
                    model=model,
                    tokenizer=tokenizer,
                    # max_new_tokens=4
                    )

    for i in tqdm(range(len(test))):
        prompt = test.iloc[i]["text"]
        result = pipe(prompt)
        answer = result[0]['generated_text'].split("Label:")[-1].strip()

        # Determine the predicted category
        for category in categories:
            if category.lower() in answer.lower():
                print("Right label:" + answer.lower())
                y_pred.append(category)
                break
        else:
            print("Wrong label:" + answer.lower())
            y_pred.append("none")

    return y_pred


def evaluation_report(y_true, y_pred, labels, run=None):
    mapping = {label: idx for idx, label in enumerate(labels)}

    def map_func(x):
        return mapping.get(x, -1)  # Map to -1 if not found

    y_true_mapped = np.vectorize(map_func)(y_true)
    y_pred_mapped = np.vectorize(map_func)(y_pred)

    if run:
        wandb_log_dict = {}

    # Calculate accuracy
    accuracy = accuracy_score(y_true=y_true_mapped, y_pred=y_pred_mapped)
    print(f'Accuracy: {accuracy:.2f}')
    if run:
        wandb_log_dict["Accuracy"] = accuracy

    # Generate accuracy report
    unique_labels = set(y_true_mapped)  # Get unique labels

    for label in unique_labels:
        label_indices = [i for i in range(len(y_true_mapped))
                         if y_true_mapped[i] == label]
        label_y_true = [y_true_mapped[i] for i in label_indices]
        label_y_pred = [y_pred_mapped[i] for i in label_indices]
        label_accuracy = accuracy_score(label_y_true, label_y_pred)
        print(f'Accuracy for label {labels[label]}: {label_accuracy:.2f}')
        if run:
            wandb_log_dict[f"Accuracy for label {labels[label]}"] = label_accuracy

    unsplit_labels = [label.replace(" ", "_") for label in labels]

    # Generate classification report
    class_report = classification_report(y_true=y_true_mapped,
                                         y_pred=y_pred_mapped,
                                         target_names=unsplit_labels,
                                         labels=list(range(len(labels))))
    print('\nClassification Report:')
    print(class_report)

    report_columns = ["Class", "Precision", "Recall", "F1-score", "Support"]
    report_table = []
    class_report = class_report.splitlines()
    for line in class_report[2:(len(labels)+2)]:
        report_table.append(line.split())

    if run:
        wandb_log_dict["Classification Report"] = wandb.Table(
            data=report_table,
            columns=report_columns)

    # For not predicted classes
    mask = y_pred_mapped != -1
    y_true_mapped2 = y_true_mapped[mask]
    y_pred_mapped2 = y_pred_mapped[mask]

    # Generate confusion matrix
    conf_matrix = confusion_matrix(y_true=y_true_mapped,
                                   y_pred=y_pred_mapped,
                                   labels=list(range(len(labels))))
    print('\nConfusion Matrix:')
    print(conf_matrix)

    if run:
        wandb_log_dict["Confusion Matix"] = wandb.plot.confusion_matrix(
            y_true=y_true_mapped2,
            preds=y_pred_mapped2,
            class_names=labels
        )
        run.log(wandb_log_dict)


def evaluate(base_model_name, fine_tuned_model_path, label_name, run=None):

    cache_dir = ""

    model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        return_dict=True,
        low_cpu_mem_usage=True,
        quantization_config=BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.bfloat16
        ),
        # torch_dtype=torch.float16,
        device_map="auto",
        trust_remote_code=True,
        offload_folder="offload/",
        cache_dir=cache_dir
    )

    tokenizer = AutoTokenizer.from_pretrained(base_model_name,
                                              cache_dir=cache_dir)
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})

    # Get test set data
    test_df = pd.read_csv('preprocessed_data/test_set.csv')[[
        'question',
        'interview_question',
        'interview_answer',
        label_name
    ]]

    test_texts = create_test_prompted_text(test_df, label_name)
    dataset = pd.DataFrame(test_texts, columns=['text'])

    labels = list(test_df[label_name].unique())

    y_pred = predict(dataset, labels, model, tokenizer)
    y_true = test_df[label_name]
    evaluation_report(y_true, y_pred, labels, run)

In [None]:
os.environ["WANDB_DISABLED"] = "true"

df = pd.read_csv('preprocessed_data/train_set.csv').iloc[:100]
df.rename(columns={'evasion_label': 'label'}, inplace=True)
df.drop(columns=['clarity_label'], inplace=True)

dataset = Dataset.from_pandas(df)

# Define the quantization config
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    # bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

# Define the Lora config
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    # target_modules=["q_proj", "o_proj", "k_proj", "v_proj"],
    task_type="CAUSAL_LM",
)

huggingface_hub.login(os.environ["HF_KEY"])

# Load the model and tokenizer
model_id = "meta-llama/Llama-3.1-8B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.pad_token = tokenizer.eos_token # Set the pad token to eos token
model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config, device_map={"":0})
classes_names = ', '.join(list(df['label'].unique()))


def format_func(
    dataset
):
    text = (
        f"You will be given a part of an interview. "
        f"Classify the response to the selected question "
        f"into one of the following categories: {classes_names}"
        f". \n\n ### Part of the interview ### \nInterviewer:"
        f" {dataset['interview_question']} \nResponse:"
        f" {dataset['interview_answer']} \n\n### Selected Question ###\n"
        f"{dataset['question']} \n\n### Label: {dataset['label']}"
    )

    return text


response_template = "### Label:"
response_template_ids = tokenizer.encode(response_template, add_special_tokens=False)[2:]
collator = DataCollatorForCompletionOnlyLM(tokenizer=tokenizer, response_template=response_template_ids)

fine_tuned_model_path = "./llama3.1"
lr = 2e-4
epochs = 5
base_model_name = 'meta-llama/Llama-3.1-8B-Instruct'

# Wandb configuration
run = wandb.init(entity="kontilenia-national-technical-university-of-athens",
                 project='political-speech-clarity',
                 job_type="training",
                 # Track hyperparameters and run metadata
                 config={
                    "learning_rate": lr,
                    "architecture": base_model_name,
                    "dataset": "qevasion_dataset_preproccessed",
                    "epochs": epochs,
                 })

# Create the trainer
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    args=TrainingArguments(
        per_device_train_batch_size=1,
        gradient_accumulation_steps=1,
        warmup_steps=1,
        num_train_epochs=5,
        learning_rate=2e-4,
        fp16=True,
        logging_steps=100,
        output_dir="outputs",
        optim="paged_adamw_8bit",
        label_names=classes_names,
    ),
    formatting_func=format_func,
    data_collator=collator,
    peft_config=lora_config,
)

trainer.train()

# Save the model
fine_tuned_model_path = "./llama3.1"
model.save_pretrained(fine_tuned_model_path)

base_model_name = 'meta-llama/Llama-3.1-8B-Instruct'
label_name = "clarity_label"
evaluate(base_model_name,
         fine_tuned_model_path,
         label_name,
         run)