In [None]:
import warnings
warnings.filterwarnings("ignore")

import os
import torch
import numpy as np
import pandas as pd

from datasets import load_dataset, DatasetDict

In [None]:
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling,
    logging    
)

In [None]:
from peft import (
    LoraConfig,
    get_peft_model,
    TaskType
)

In [None]:
## Check if GPU is being used for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
model_name = "meta-llama/Llama-3-2-1B"
output_dir = "./llama-3.2-banking77-finetuned"

In [None]:
dataset = load_dataset("legacy-datasets/banking77")
print(f"Dataset loaded: {dataset}")

In [None]:
train_sampled = dataset['train']
val_sampled = dataset['test'].shuffle(seed=42).select(range(50))

test_dataset = dataset["test"]

In [None]:
dataset = DatasetDict({
    "train": train_sampled,
    "test": val_sampled
})

In [None]:
print(f"Dataset with train and val data: {dataset}")

In [None]:
## Convert labelled data into text as "Category xx"

label_description = {}
for i in range(77):
    label_description[i] = f"Category {i}"

In [None]:
## Importing huggingface token
from dotenv import load_dotenv
load_dotenv()

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

In [None]:
def preprocess_function(examples):
    formatted_inputs = []

    for text, label_id in zip(examples["text"], examples["label"]):
        label_text = label_description[label_id]

        prompt = f"### Instruction: Classify the following bank customer query into the appropriate category.\n\n### Input: {text}\n\n### Response: {label_text}"
        formatted_inputs.append(prompt)

    tokenized_inputs = tokenizer(
        formatted_inputs,
        padding="max_length",
        truncation=True,
        max_length=128,
        return_tensors=None
    )

    tokenized_inputs["labels"] = tokenized_inputs["input_ids"].copy()

    return tokenized_inputs

In [None]:
# Apply preprocessing to the dataset

tokenized_dataset = dataset.map(
    preprocess_function,
    batched=True,
    remove_columns=["text", "label"]
)

In [None]:
print(f"Tokenized Dataset with Train and Val data: {tokenized_dataset}")

In [None]:
## Load the LLAMA 3.2 1B Model for fine-tuning

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto",
    low_cpu_mem_usage=True,
    use_cache=False
)

In [None]:
# Configure LoRA
lora_config = LoraConfig(
    r=4,                    # rank dimension
    lora_alpha=8,           # scaling factor
    target_modules=["q_proj", "v_proj"],  # attention layers to fine-tune
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.CAUSAL_LM
)

In [None]:
import torch
torch.cuda.empty_cache()
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False

In [None]:
# Apply LoRA
model = get_peft_model(model, lora_config)
print(f"Model loaded with LoRA configuration: {model}")

In [None]:
# Print trainable parameters
def print_trainable_parameters(model):
    trainable_params = 0
    all_params = 0
    for _, param in model.named_parameters():
        all_params += param.numel()
        if param.requires_grad:
            trainable_params += param.numel()
    print(f"Trainable params: {trainable_params} ({100 * trainable_params / all_params:.2f}% of all params)")

print_trainable_parameters(model)

In [None]:
# Data collator
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False,  # we're doing causal language modeling not masked language modeling
    return_tensors="pt"
)

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir=output_dir,
    num_train_epochs=10,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    gradient_accumulation_steps=8,
    warmup_steps=100,
    weight_decay=0.01,
    learning_rate=2e-4,
    fp16=True,
    logging_steps=200,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    report_to="none"
)

In [None]:
# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    data_collator=data_collator
)

In [None]:
import torch
torch.cuda.empty_cache()

In [None]:
# Train the model
trainer.train()

In [None]:
## Save the model
trainer.save_model(output_dir)

#### Load the Saved model and run evaluation

In [None]:
base_model_name = "meta-llama/Llama-3-2-1B"
lora_adapter_path = "./llama-3.2-banking77-finetuned"

In [None]:
base_model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    torch_dtype=torch.float16,
    device_map="auto",
    low_cpu_mem_usage=True,
    use_cache=False
)

In [None]:
from peft import PeftModel
model = PeftModel.from_pretrained(base_model, lora_adapter_path)

In [None]:
model.eval()

In [None]:
def predict_category(text):
    # Format input just like during training
    prompt = f"### Instruction: Classify the following bank customer query into the appropriate category. \n\n### Input: {text}\n\n### Response:"

    # Tokenize input
    inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)

    with torch.no_grad():
        # Generate prediction
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_new_tokens=20,
            pad_token_id=tokenizer.eos_token_id,
            temperature=0.1,
        )

    # Decode the prediction
    decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Extract the actual response part (after "### Response:")
    try:
        response = decoded_output.split("### Response:")[1].strip()
    except:
        response = decoded_output  # Fallback if formatting is unexpected

    return response


In [None]:
correct = 0
total = 0

text_list = []
actual_category_list = []
predicted_category_list = []

In [None]:
for i, example in enumerate(test_dataset):
    text = example["text"]
    text_list.append(text)

    actual_category = label_description[example["label"]]
    actual_category_list.append(actual_category)

    # Get model prediction
    predicted_category = predict_category(text)
    predicted_category_list.append(predicted_category)

    # Count correct predictions
    if predicted_category.strip() == actual_category.strip():
        correct += 1
    total += 1

In [None]:
import pandas as pd
df = pd.DataFrame({"Text":text_list, "Actual_Category":actual_category_list, "Predicted_Category":predicted_category_list})

In [None]:
df.head()

In [None]:
import re

def extract_category(text):
    match = re.search(r'Category\s+(\d+\s*:?)', text)  # Match "Category xx" with 1 or 2 digits
    if match:
        category_match = match.group(1)
        category_clean = category_match.rstrip(': ')
        return f"Category {category_clean}"
    else:
        return None  # Return None if no match

# Create new column with extracted category
df['Final_Category'] = df['Predicted_Category'].apply(extract_category)

# Display result
df.head()


In [None]:
matches = df['Actual_Category'] == df["Final_Category"]
accuracy = matches.mean() * 100

total_records = len(df)
matching_records = matches.sum()

print(f"Matching records: {matching_records} out of {total_records}")
print(f"Accuracy: {accuracy:.2f}%")

In [None]:
df.to_csv("FinalOutput_01042025.csv", index=False)

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# Compute accuracy
accuracy = accuracy_score(df["Actual_Category"], df["Final_Category"])

# Compute precision, recall, and F1-score for each category
precision, recall, f1, _ = precision_recall_fscore_support(df["Actual_Category"], df["Final_Category"], average=None)

# Compute weighted precision and recall
weighted_precision, weighted_recall, _ = precision_recall_fscore_support(
    df["Actual_Category"], df["Final_Category"], average="weighted"
)

# Print results
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"Precision for each category: {dict(zip(sorted(df['Actual_Category'].unique()), precision))}")
print(f"Recall for each category: {dict(zip(sorted(df['Actual_Category'].unique()), recall))}")
print(f"Weighted Precision: {weighted_precision:.2f}")
print(f"Weighted Recall: {weighted_recall:.2f}")


In [15]:
df = pd.read_csv("Category.csv")

In [16]:
unique_categories = sorted(df["Actual_Category"].unique())

result_data = []

In [17]:
for category in unique_categories:
    category_count = (df["Actual_Category"] == category).sum()
    match_count = ((df["Actual_Category"] == category) & (df["Actual_Category"] == df["Final_Category"])).sum()
    match_percentage = (match_count / category_count) * 100 if category_count > 0 else 0

    result_data.append({
        "Category": category,
        "Number_of_Samples": category_count,
        "Match_count": match_count,
        "Match_Percent": round(match_percentage, 2)
    })


In [18]:
results_df = pd.DataFrame(result_data)

In [20]:
results_df.head(10)

Unnamed: 0,Category,Number_of_Samples,Match_count,Match_Percent
0,Category 0,40,37,92.5
1,Category 1,40,33,82.5
2,Category 10,40,33,82.5
3,Category 11,40,32,80.0
4,Category 12,40,36,90.0
5,Category 13,40,35,87.5
6,Category 14,40,35,87.5
7,Category 15,40,32,80.0
8,Category 16,40,34,85.0
9,Category 17,40,38,95.0
