In [None]:
from datasets import Dataset, load_dataset
import evaluate
from transformers import (GPT2Tokenizer,
                          AutoTokenizer,
                          GPT2LMHeadModel,
                          AutoModelForSequenceClassification,
                          AutoModelForCausalLM,
                          TrainingArguments,
                          Trainer,
                          BitsAndBytesConfig,
                          pipeline,
                          logging)
from transformers import logging as hf_logging

import torch.nn.functional as F

from peft import prepare_model_for_kbit_training
from peft import LoraConfig, get_peft_model

import bitsandbytes as bnb

import pandas as pd
import numpy as np
import torch

from tqdm.auto import tqdm
from tqdm import tqdm

# from trl import SFTTrainer
# from trl import setup_chat_format

# import bitsandbytes
import os
import string
import json
import re
import time
import random
import warnings
import logging
from collections import defaultdict
from typing import List
from pathlib import Path

from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score,
                             f1_score,
                             classification_report,
                             confusion_matrix,
                             auc)

from huggingface_hub import login
from arabert.preprocess import ArabertPreprocessor

import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize, rgb2hex
from matplotlib.ticker import PercentFormatter 
from IPython.display import HTML
from pylab import rcParams
import seaborn as sns

print(f"pytorch version {torch.__version__}")

In [None]:
def random_seed(seed_value, use_cuda):
    np.random.seed(seed_value)  # CPU vars
    torch.manual_seed(seed_value)  # CPU vars
    random.seed(seed_value)  # Python random seed
    if use_cuda:
        torch.cuda.manual_seed(seed_value)
        torch.cuda.manual_seed_all(seed_value)  # GPU vars
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return metric.compute(predictions=predictions, references=labels)

def format_duration(total_time):
    time_delta = timedelta(seconds=total_time)
    hours, remainder = divmod(time_delta.seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return "{} hours, {} minutes, {} seconds".format(hours, minutes, seconds)

In [None]:
 # Set the seed
SEED = 42
random_seed(SEED, torch.cuda.is_available())

# Set the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Device:", device)

In [None]:
model_id = "meta-llama/Llama-3.2-1B-Instruct"

# Dataset

In [None]:
class HardDataset:
    def __init__(self, data_path, seed=SEED):
        self.data_path=data_path
        self.seed=SEED
        self.df=None

    def load_data(self):
        """
        Load the data from a TSV file and retain only the 'rating' and 'review' columns.
        Also, transform the 'rating' to a binary label (positive/negative).
        """
        # Load the dataset
        self.df = pd.read_csv(self.data_path, delimiter='\t')

        # Keep only 'rating' and 'review' columns
        self.df = self.df[['rating', 'review']]

        # Code rating: positive (1) if rating > 3, negative (0) if rating < 3
        self.df['rating'] = self.df['rating'].apply(lambda x: 0 if x < 3 else 1)

        # Rename columns for consistency with standard text classification format
        self.df.columns = ['label', 'text']
        print(f"Initial dataset length: {len(self.df)}")
        return self.df

    def analyze_data(self):
        """
        Analyze the dataset: number of words,
        """
        if self.df is None:
            raise ValueError("Data not loaded. Please call 'load_data()' first.")

        # Show label distribution
        label_counts = self.df['label'].value_counts()
        print("\nLabel Distribution:")
        print(label_counts)

        # Visualize label distribution
        plt.figure(figsize=(6, 4))
        sns.barplot(x=label_counts.index, y=label_counts.values, palette="viridis")
        plt.title("Label Distribution")
        plt.xlabel("Label")
        plt.ylabel("Count")
        plt.xticks([0, 1], ['Negative', 'Positive'])
        plt.show()

        self.df["word_count"] = self.df.text.apply(lambda x:len(x.split(" ")))

        plt.hist(
            self.df.word_count, weights=np.ones(len(self.df.word_count)) / len(self.df.word_count), bins=30
         )
        plt.gca().yaxis.set_major_formatter(PercentFormatter(1))
        plt.xlabel("Words")
        plt.ylabel("Percentage")
        plt.ylim(0, 1)
        plt.show()

        print(self.df.shape)
        # Delete instances with words greater than 100
        len(self.df[self.df.word_count < 100]) / len(self.df) # 98% of examples left

        # Delete
        self.df = self.df[self.df.word_count < 100]
        print(self.df.shape)

        # Get maximum number of samples from each category to reduce resources
        max_samples = 7000
        # df = df.sample(frac=1, random_state=85).reset_index(drop=True).head(3000) # check if data is shuffled
        df_sampled = self.df.groupby("label")[["text", "label"]].apply(
            lambda x: x.sample(n=min(len(x), max_samples))
        )
        df_sampled = df_sampled.reset_index(drop=True)

        df_sampled['text_length'] = df_sampled['text'].apply(len)
        # Visualize text length distribution
        plt.figure(figsize=(10, 6))
        sns.histplot(df_sampled['text_length'], bins=30, kde=True, color="blue")
        plt.title("Text Length Distribution")
        plt.xlabel("Text Length (characters)")
        plt.ylabel("Frequency")
        plt.show()

        # Visualize boxplot of text lengths for each label
        plt.figure(figsize=(10, 6))
        sns.boxplot(x="label", y="text_length", data=df_sampled, palette="coolwarm")
        plt.title("Text Length Distribution by Label")
        plt.xlabel("Label")
        plt.ylabel("Text Length (characters)")
        plt.xticks([0, 1], ['Negative', 'Positive'])
        plt.show()

        df_sampled = df_sampled.drop('text_length', axis=1)
        # Split the data
        # create train and test set
        train_df, temp_df = train_test_split(
            df_sampled, test_size=0.2, random_state=SEED
        )

        val_df, test_df = train_test_split(
            temp_df, test_size=0.5, random_state=SEED
        )

        print(f"Train shape: {train_df.shape}")
        print(f"Validation shape: {val_df.shape}")
        print(f"Test shape: {test_df.shape}")

        return df_sampled, train_df, val_df, test_df

In [None]:
hardDataset = HardDataset("/kaggle/input/sa-hard-arabic/balanced-reviews-utf8.tsv")
df = hardDataset.load_data()
df_sampled, train_df, val_df, test_df = hardDataset.analyze_data()

In [None]:
def create_prompt(text: str, label: str = None):
    # Formulate the text with a specific instruction for classification
    prompt = """
You will be given an Arabic hotel review. Your task is to classify it as one of the labels in the list: positive, negative. Output the label only, and nothing else.
<text>
{text}
</text>
Answer: {label}
""".strip()
    return prompt.format(text=text, label=label)

# Mapping dictionary
def create_dataset(df):
    rows=[]
    for _,row in tqdm(df.iterrows()):
        rows.append(
            {
                "input": create_prompt(row.text, row.label),
                "output": row.label,
            }
        )
    return rows

In [None]:
# df_pandas_test['label'] = df_pandas_test['label'].replace({1: 'positive', 0: 'negative'})
train_rows = create_dataset(df_train)
Path("mednli_train_data.json").write_text(json.dumps(train_rows))

dev_rows = create_dataset(df_dev)
Path("mednli_dev_data.json").write_text(json.dumps(dev_rows))
print(dev_rows[0]["input"], "\n")

# Model Loading and Fine-tuning

In [None]:
# --- Configuration Functions ---
def setup_tokenizer(model_id: str):
    """Sets up the tokenizer for the model."""
    tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
    print("Vocabulary size:", len(tokenizer))
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"
    return tokenizer

def setup_quantization():
    """Sets up the quantization configuration."""
    nf4_config = BitsAndBytesConfig(
        load_in_4bit=True, # Enables 4-bit quantization of the base model to reduce memory usage.
        bnb_4bit_quant_type="nf4", #  The quantization type ("nf4" is a common choice)
        bnb_4bit_use_double_quant=True, # Enables nested quantization (double quantization) for further memory reduction.
        bnb_4bit_compute_dtype=torch.bfloat16 # Specifies the data type for computations during training (float16 in this case).
    )
    return nf4_config

def setup_model(model_id: str, nf4_config):
    """Loads the model with quantization and configuration."""
    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        quantization_config=nf4_config
    )
    model.config.use_cache = False
    model.gradient_checkpointing_enable()
    model = prepare_model_for_kbit_training(model)
    return model

def setup_lora(modules):
    """Sets up the LoRA configuration."""
    lora_config = LoraConfig(
        r=64,  # 8
        lora_alpha=16, # 32 Decrease if overfitting is observed or your dataset is small.
        target_modules=modules, # check target_modules for any model (see function target_modules)
        lora_dropout=0.1, # 0.05 increase (e.g., 0.2-0.3) for small datasets or noisy data.
        bias="none",
        task_type="CAUSAL_LM"
    )
    return lora_config

def print_trainable_parameters(model):
    """Prints the number of trainable parameters in the model."""
    trainable_params = 0
    all_param = 0
    for _, param in model.named_parameters():
        all_param += param.numel()
        if param.requires_grad:
            trainable_params += param.numel()
    print(f"Trainable params: {trainable_params} || Total params: {all_param} || Trainable%: {100 * trainable_params / all_param:.2f}")

# --- Main Training Function ---

def train_model(model, task: str, modules, train_dataset, val_dataset):
    """Handles the entire training process."""
    os.environ["WANDB_PROJECT"] = f"llama32_3B_{task}_3EPs"
    os.environ["WANDB_LOG_MODEL"] = "checkpoint"
    # Apply LoRA
    model.enable_input_require_grads()
    model = get_peft_model(model, lora_config)
    print_trainable_parameters(model)
    model = model.to(device)
    model.hf_device_map

    # Create Trainer instance
    trainer = SFTTrainer(
        model=model,
        tokenizer=tokenizer,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        args = SFTConfig(
            dataset_text_field ="input",
            # max_seq_length=512,
            output_dir=f"llama32_3B_{task}",
            eval_strategy="epoch",
            save_strategy="epoch", 
            # eval_steps = 0.2
            learning_rate=2e-4, # 2e-5 learning rate, based on QLoRA paper use 2e-4
            per_device_train_batch_size=4, #
            per_device_eval_batch_size=4,
            gradient_accumulation_steps=1,            # number of steps before performing a backward/update pass
            # gradient_checkpointing=True,              # use gradient checkpointing to save memory
            optim="paged_adamw_32bit",
            fp16=True,
            bf16=False,
            num_train_epochs=3, 
            weight_decay=0.001, # Lower (e.g., 0.001) if underfitting or noisy datasets.
            max_grad_norm=0.3,                        # max gradient norm based on QLoRA paper
            max_steps=-1,
            # group_by_length=False,
            lr_scheduler_type="cosine",               # use cosine learning rate scheduler
            warmup_ratio=0.03,                        # warmup ratio based on QLoRA paper
            logging_dir="./logs",
            logging_steps=50, # try 1
            report_to="wandb",
            push_to_hub=True,
            packing=False,
        ),
        peft_config=lora_config,
        # dataset_kwargs={
        #     "add_special_tokens": False,
        #     "append_concat_token": False
        # }
    )

    # Train the model
    start_time = time.time()
    trainer.train()
    end_time = time.time()

    total_time = end_time - start_time
    print(f"Training completed in {total_time:.2f} seconds")

In [None]:
# Building the model
def find_all_linear_names(model):
    cls = bnb.nn.Linear4bit
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)

In [None]:
if __name__ == "__main__":
    login(token="your_hf") # write token
    !wandb login your_wandb_token
    model_id = "meta-llama/Llama-3.2-3B-Instruct"
    task = "ASA" # Arabic Sentiment Analysis

    train_dataset = Dataset.from_list(train_rows)
    dev_dataset = Dataset.from_list(dev_rows)

    tokenizer = setup_tokenizer(model_id)
    nf4_config = setup_quantization()
    model = setup_model(model_id, nf4_config)
    model = model.to(device)
    modules = find_all_linear_names(model)
    lora_config = setup_lora(modules)
    # train_model(p_model, task, formatting_prompts_func_sst2, modules)
    train_model(model, task, modules, train_dataset, dev_dataset)

# Inference

In [None]:
# One instance
%%time
tokenizer.pad_token = tokenizer.eos_token
inp_test = test_rows[0]["input"]
messages = [{"role": "user", "content": inp_test}]
content = messages[0]["content"]
inputs = tokenizer(content, return_tensors="pt", padding=True, truncation=True).to(device)
outputs = model.generate(**inputs, max_new_tokens=1, temperature=0.000001, pad_token_id=tokenizer.eos_token_id)
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(generated_text)
parts = [part.strip() for part in generated_text.split("Answer:") if part.strip()]


regex = r"^\W+|\W+$"
prediction = re.sub(regex, "", parts[-1])
print(prediction)

In [None]:
%%time
tokenizer.pad_token = tokenizer.eos_token
predictions = []
true_values = []
for row in tqdm(test_rows):
    messages = [{"role": "user", "content": row["input"]}]
    content = messages[0]["content"]
    inputs = tokenizer(content, return_tensors="pt", padding=True, truncation=True).to(device)
    outputs = model.generate(**inputs, max_new_tokens=1, temperature=0.000001, pad_token_id=tokenizer.eos_token_id)
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    parts = [part.strip() for part in generated_text.split("Answer:") if part.strip()]
    predictions.append(parts[-1])
    true_values.append(row["output"])

regex = r"^\W+|\W+$"
predictions = [re.sub(regex, "", p) for p in predictions]
len(true_values), len(predictions)
pd.Series(predictions).value_counts()

In [None]:
# if the model generated classes that doesn't exist
class_names = ['positive', 'negative']
eval_df = pd.DataFrame().from_dict({"label":true_values, "prediction":predictions})
len(eval_df[~eval_df.prediction.isin(class_names)]) # check ...
eval_df = eval_df[eval_df.prediction.isin(class_names)] # remove if len > 0

In [None]:
print(accuracy_score(true_values, predictions))
print(classification_report(eval_df.label, eval_df.prediction))

In [None]:
# Generate the confusion matrix
cm = confusion_matrix(eval_df.label, eval_df.prediction, labels=class_names)

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()