In [None]:
!pip install torch pandas tqdm transformers datasets trl peft bitsandbytes scikit-learn openpyxl

In [2]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import bitsandbytes as bnb
import torch
import torch.nn as nn
import transformers
from datasets import Dataset
from peft import LoraConfig, PeftConfig
from trl import SFTTrainer
from trl import setup_chat_format
from transformers import (AutoModelForCausalLM, 
                          AutoTokenizer, 
                          BitsAndBytesConfig, 
                          TrainingArguments, 
                          pipeline, 
                          logging)
from sklearn.metrics import (accuracy_score, 
                             classification_report, 
                             confusion_matrix)
from sklearn.model_selection import train_test_split

In [None]:
# Load dataset
from datasets import load_dataset
dataset = load_dataset('Dataset_Name')

# Dataset features
train_data = dataset['train']
valid_data = dataset['valid']
test_data = dataset['test']

In [None]:
train_data

In [28]:
train_data['query'][0]

'Analyze the sentiment of this statement extracted from a financial news article. Provide your answer as either negative, positive, or neutral.\nText: The five-storey , eco-efficient building will have a gross floor area of about 15,000 sq m. It will also include apartments .\nAnswer:'

In [29]:
def generate_prompt(example):
    return f"""{example['query']} {example['answer']}""".strip()

def generate_test_prompt(example):
    return f""" {example['query']} """.strip()

In [30]:
train_df = train_data.to_pandas()
test_df = test_data.to_pandas()
val_df = valid_data.to_pandas()

In [41]:
y_true=test_df['answer']

In [31]:
train_df['prompt'] = train_df.apply(generate_prompt, axis=1)
test_df['prompt'] = test_df.apply(generate_test_prompt, axis=1)
val_df['prompt'] = val_df.apply(generate_prompt, axis=1)

In [32]:
train_data = Dataset.from_pandas(train_df[['prompt']])
valid_data = Dataset.from_pandas(val_df[['prompt']])
test_data = Dataset.from_pandas(test_df[['prompt']])

In [33]:
train_data['prompt'][0]

'Analyze the sentiment of this statement extracted from a financial news article. Provide your answer as either negative, positive, or neutral.\nText: The five-storey , eco-efficient building will have a gross floor area of about 15,000 sq m. It will also include apartments .\nAnswer: neutral'

In [34]:
train_data

Dataset({
    features: ['prompt'],
    num_rows: 3100
})

In [None]:
base_model_name = "Model_Name"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype="float16",
)

model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    device_map="auto",
    torch_dtype="float16",
    quantization_config=bnb_config, 
)

model.config.use_cache = False
model.config.pretraining_tp = 1

tokenizer = AutoTokenizer.from_pretrained(base_model_name)

tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
from trl import SFTConfig, SFTTrainer, DataCollatorForCompletionOnlyLM
def formatting_prompts_func(example):
    output_texts = []
    for i in range(len(example['prompt'])):
        output_texts.append(example['prompt'][i])

    return output_texts


response_template = "### Answer:"
collator = DataCollatorForCompletionOnlyLM(response_template, tokenizer=tokenizer)

In [None]:
def predict(X_test, model, tokenizer):
    y_pred = []
    for i in tqdm(range(len(X_test))):
        prompt = X_test["prompt"][i]
        pipe = pipeline(task="text-generation", 
                        model=model, 
                        tokenizer=tokenizer,
                        max_new_tokens = 7, 
                        temperature = 0.1,
                       )
        result = pipe(prompt, pad_token_id=pipe.tokenizer.eos_token_id)
        # print(result)
        answer = result[0]['generated_text'].split("Answer:")[-1].lower()
        # print(f"answer: {answer}")
        if "neutral" in answer:
            y_pred.append("neutral")
        elif "positive" in answer:
            y_pred.append("positive")
        else: 
            y_pred.append("negative")

        # print(f"y_pred{y_pred}")
    return y_pred

y_pred = predict(test_data, model, tokenizer)

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

def evaluate(y_true, y_pred):
    # Calculate overall accuracy
    accuracy = accuracy_score(y_true=y_true, y_pred=y_pred)
    print(f'Overall Accuracy: {accuracy:.3f}')
    
    # Generate accuracy for each label
    unique_labels = set(y_true)  # Get unique labels
    
    for label in unique_labels:
        # Get true and predicted values for the current label
        true_label = [1 if y == label else 0 for y in y_true]
        pred_label = [1 if y == label else 0 for y in y_pred]
        label_accuracy = accuracy_score(true_label, pred_label)
        print(f'Accuracy for label "{label}": {label_accuracy:.3f}')
        
    # Generate classification report
    class_report = classification_report(y_true=y_true, y_pred=y_pred)
    print('\nClassification Report:')
    print(class_report)
    
  
# Assuming y_true and y_pred are your true and predicted labels
evaluate(y_true, y_pred)


In [43]:
def find_all_linear_names(model):
    cls = bnb.nn.Linear4bit
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)
modules = find_all_linear_names(model)
modules

['o_proj', 'qkv_proj', 'gate_up_proj', 'down_proj']

In [44]:
from transformers import TrainerCallback

class CustomEvalCallback(TrainerCallback):
    def on_evaluate(self, args, state, control, model, **kwargs):
        print("\nEvaluating model...\n")
        
        # Generate predictions on X_test
        y_pred = predict(test_data, model, tokenizer)
        
        # Evaluate predictions
        evaluate(y_true, y_pred)

In [None]:
from trl import SFTTrainer, SFTConfig
from transformers import TrainingArguments, DataCollatorForSeq2Seq

output_dir="out"

peft_config = LoraConfig(
    lora_alpha=16,
    lora_dropout=0,
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=modules,
)

training_arguments = SFTConfig(
    output_dir=output_dir,                    # directory to save and repository id
    num_train_epochs=10,                       # number of training epochs
    per_device_train_batch_size=4,            # batch size per device during training
    gradient_accumulation_steps=1,            # number of steps before performing a backward/update pass
    gradient_checkpointing=True,              # use gradient checkpointing to save memory
    optim="paged_adamw_32bit",
    logging_steps=1,                         
    learning_rate=2e-5,                       # learning rate, based on QLoRA paper
    weight_decay=0.001,
    fp16 = False,
    bf16 = False,
    max_grad_norm=0.3,                        # max gradient norm based on QLoRA paper
    max_steps=-1,
    warmup_ratio=0.03,                        # warmup ratio based on QLoRA paper
    group_by_length=True,
    lr_scheduler_type="cosine",               # use cosine learning rate scheduler
    # report_to="wandb",                  # report metrics to w&b
    eval_strategy="epoch",              # save checkpoint every epoch
    eval_steps = 0.2,            # perform evaluation at the end of each epoch
    save_strategy="epoch",                    # save model checkpoint at the end of each epoch
    # save_total_limit=3                        # limit the total number of saved checkpoints to avoid storage issues
)



In [None]:
trainer = SFTTrainer(
    model=model,
    # tokenizer=tokenizer,
    args=training_arguments,
    train_dataset=train_data,
    eval_dataset=valid_data,
    peft_config=peft_config,
    #dataset_text_field="Sentiment",
    formatting_func=formatting_prompts_func,
    # callbacks=[CustomEvalCallback()] 


)

In [None]:
trainer.train()

In [None]:
        # Generate predictions on X_test
y_pred = predict(test_data, model, tokenizer)
        
        # Evaluate predictions
evaluate(y_true, y_pred)