Based on https://colab.research.google.com/drive/1lN6hPQveB_mHSnTOYifygFcrO8C1bxq4?usp=sharing by Unsloth

Install packages to be able to run the code.

In [None]:
#!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
#!pip install --no-deps xformers trl peft accelerate bitsandbytes

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, BitsAndBytesConfig, AutoConfig
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from peft import LoraConfig, prepare_model_for_kbit_training, get_peft_model
from unsloth import FastLanguageModel
from unsloth import is_bfloat16_supported

max_seq_length = 512
dtype = None 
load_in_4bit = True 

In [None]:
from unsloth import FastLanguageModel
import torch

# Load the model
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/Phi-3-mini-4k-instruct",
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)

In [None]:
model = FastLanguageModel.get_peft_model(
    model,
    r = 64, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj", "lm_head"],
    lora_alpha = 16,
    lora_dropout = 0, #
    bias = "none",    
    use_gradient_checkpointing = "unsloth", 
    random_state = 3407,
    use_rslora = False,  
    loftq_config = {},
)

In [None]:
model.print_trainable_parameters()

In [None]:
from unsloth.chat_templates import get_chat_template

tokenizer = get_chat_template(
    tokenizer,
    chat_template = "chatml", # Supports zephyr, chatml, mistral, llama, alpaca, vicuna, vicuna_old, unsloth
)

def formatting_prompts_func(examples):
    convos = examples["conversations"]
    texts = [tokenizer.apply_chat_template(convo, tokenize = False, add_generation_prompt = False) for convo in convos]
    return { "text" : texts, }
pass

In [None]:
# Set up our dataset
import pandas as pd
# Load the data
data = pd.read_csv("../data/interim/ready_for_model.csv")
data = data[['joke_new', 'score_class']]
data = data.rename(columns = {'joke_new': 'text', 'score_class': 'label'})

# Subsample to test the code
data = data.sample(frac=0.3, random_state=42)

# Make all columns objects
data["text"] = data["text"].astype("object")
data["label"] = data["label"].astype(int)

# Set up schema
schema = {
    "type": "object",
    "properties": {
        "rating": {
            "type": "number",
            "minimum": 0,
            "maximum": 4,
            "description": "The rating of the joke, from 0 to 5.",
        }
    },
}

# Make labels JSON format
data["label"] = data["label"].apply(lambda x: f'{{"rating": {x}}}')

# Set up prompt format
data["conversations"] = [
    [
        {
            "role": "system",
            "content": f"You are a joke evaluator that answers in JSON. Here's the json schema you must adhere to:\n{schema}",
        },
        {"role": "user",
            "content": f""" Your task is to evaluate jokes based on their funniness on a scale from 0 to 4, where 0 represents the least funny and 4 represents the most funny. Consider the humor, originality, and overall impact of the joke when making your assessment: \n "{joke}" """},
        {
            "role": "assistant", 
            "content": f"{label}"
        }
    ] for joke, label in zip(data["text"], data["label"])
]


# Split the data
train, test = train_test_split(data, test_size=0.2, random_state=42, shuffle= True, stratify=data["label"])
test, val = train_test_split(test, test_size=0.5, random_state=42, stratify=test["label"])

In [None]:
from datasets import Dataset

train_dataset = Dataset.from_pandas(train)
val_dataset = Dataset.from_pandas(val)

# Format the prompts
train_dataset = train_dataset.map(formatting_prompts_func, batched=True)
#val_dataset = val_dataset.map(formatting_prompts_func, batched=True)


In [None]:
train_dataset[0]

In [None]:
tokenizer.padding_side = 'right'

In [None]:
from trl import SFTTrainer
from transformers import TrainingArguments

trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    train_dataset = train_dataset,
    dataset_text_field = "text",
    max_seq_length = max_seq_length,
    dataset_num_proc = 16,
    packing = False, # Can make training 5x faster for short sequences.
    args = TrainingArguments(
        per_device_train_batch_size = 16,
        gradient_accumulation_steps = 4,
        num_train_epochs = 1,
        max_grad_norm=0.3,
        learning_rate = 2e-4,
        bf16 = True,
        logging_steps = 1,
        optim = "adamw_8bit",
        weight_decay = 0.001,
        lr_scheduler_type = "constant",
        seed = 3407,
        output_dir = "outputs",
    ),
)

In [None]:
trainer_stats = trainer.train()

### Save model as GGUF for llama.cpp

In [None]:
model.save_pretrained("./models/lora_model_json_2") 

In [None]:
model, tokenizer = FastLanguageModel.from_pretrained("lora_model_full")
#model.save_pretrained_gguf("Phi-3-mini-4k-instruct-humor-full-clf-gguf", tokenizer, quantization_method = "q4_k_m") # Will download Llama.cpp if not installed

### Test model
It's faster to do this in the llama.cpp notebook. 

In [None]:
model, tokenizer = FastLanguageModel.from_pretrained(
        model_name = "./models/lora_model_json_2", 
        max_seq_length = max_seq_length,
        dtype = dtype,
        load_in_4bit = load_in_4bit,
    )
FastLanguageModel.for_inference(model)

In [None]:
# Set up the test dataset
from datasets import Dataset
# Set up test prompt format
test["conversations"] = [
    [
        {
            "role": "system",
            "content": f"You are a joke evaluator that answers in JSON. Here's the json schema you must adhere to:\n{schema}",
        },
        {"role": "user",
            "content": f""" Your task is to evaluate jokes based on their funniness on a scale from 0 to 4, where 0 represents the least funny and 4 represents the most funny. Consider the humor, originality, and overall impact of the joke when making your assessment: \n "{joke}" """},
    ] for joke in test["text"]
]

test_dataset = Dataset.from_pandas(test)

# Format for generations:
def formatting_prompts_func_gen(examples):
    convos = examples["conversations"]
    texts = [tokenizer.apply_chat_template(convo, tokenize = True, add_generation_prompt = True, return_tensors = "pt") for convo in convos]
    return { "text" : texts, }

# Format the prompts
test_dataset = test_dataset.map(formatting_prompts_func_gen, batched=True)

FastLanguageModel.for_inference(model)
# Evaluate the model
def evaluate (model, test_dataset): 
    preds = []
    for i in range(len(test_dataset)):
        inputs = torch.tensor(test_dataset[i]["text"]).to('cuda')
        outputs = model.generate(inputs, max_length = 512)
        preds.append(tokenizer.decode(outputs[0], skip_special_tokens = True))
    return preds

# Get the predictions
# time the evaluation
preds = evaluate(model, test_dataset)

In [None]:
preds

In [None]:
import re
from sklearn.metrics import accuracy_score

# Extract the predictions
preds_clean = [re.search(r'<\|im_start\|>assistant\n(.*)', pred).group(1) for pred in preds]

print(test["label"][0:5])
print(preds_clean[0:5])
# Compute the accuracy
accuracy_score(test["label"], preds_clean)


In [None]:
preds_clean_df = pd.DataFrame(preds_clean, columns = ["preds"])

preds_clean_df["preds"].value_counts()

In [None]:
# Count unique values in list
from collections import Counter

Counter(test_dataset["label"])

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import numpy as np

def evaluate(y_true, y_pred):
    mapping = {0: "Not funny at all", 1: "Not funny", 2: "Funny", 3: "Very funny", 4: "Hilarious"}
    reverse_mapping = {v: k for k, v in mapping.items()}  # Reverse mapping for confusion matrix

    # Ensure y_true is string labels
    if isinstance(y_true[0], (int, np.integer)):
        map_func = np.vectorize(mapping.get)
        y_true = map_func(y_true)

    # Calculate accuracy
    accuracy = accuracy_score(y_true=y_true, y_pred=y_pred)
    print(f'Accuracy: {accuracy:.3f}')
    
    # Generate accuracy report
    unique_labels = set(y_true)  # Get unique labels
    
    for label in unique_labels:
        label_indices = [i for i in range(len(y_true)) if y_true[i] == label]
        label_y_true = [y_true[i] for i in label_indices]
        label_y_pred = [y_pred[i] for i in label_indices]
        accuracy = accuracy_score(label_y_true, label_y_pred)
        print(f'Accuracy for label {label}: {accuracy:.3f}')
        
    # Generate classification report
    class_report = classification_report(y_true=y_true, y_pred=y_pred)
    print('\nClassification Report:')
    print(class_report)
    
    # Generate confusion matrix
    y_true_num = np.vectorize(reverse_mapping.get)(y_true)  # Convert back to numerical labels
    y_pred_num = np.vectorize(reverse_mapping.get)(y_pred)
    conf_matrix = confusion_matrix(y_true=y_true_num, y_pred=y_pred_num, labels=list(mapping.keys()))
    print('\nConfusion Matrix:')
    print(conf_matrix)

evaluate(test_dataset["label"], preds_clean)