# Instruction Finetuning using QLoRA

This notebook looks into how to perform instruction finetuning using QLoRA PEFT method. The task is to perform Supervised finetuning (SFT) of CodeLlama for function calling

In [None]:
import os
os.environ["WANDB_PROJECT"]="codellama_instruct_finetuning"

from enum import Enum
from functools import partial
import pandas as pd
import torch
import json

from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, BitsAndBytesConfig, set_seed
from datasets import load_dataset
from trl import SFTTrainer
from peft import get_peft_model, LoraConfig, TaskType

seed = 42
set_seed(seed)

## Data preprocessing

In [None]:
model_name = "Qwen/Qwen2.5-Coder-7B-Instruct"
dataset_name = "heegyu/glaive-function-calling-v2-formatted"
tokenizer = AutoTokenizer.from_pretrained(model_name)
template = """{% for message in messages %}\n{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% if loop.last and add_generation_prompt %}{{'<|im_start|>assistant\n' }}{% endif %}{% endfor %}"""
tokenizer.chat_template = template

def preprocess(samples):
    batch = []
    for system_prompt, function_desc, conversation in zip(samples["system_message"], samples["function_description"], samples["conversations"]):
        try:
            function_desc_formatted = json.dumps(json.loads(f"[{function_desc}]"), indent=2, sort_keys=True)
        except:
            function_desc_formatted = f"[{function_desc}]"
        system_message = {"role": "system", "content": f"{system_prompt}\nfunctions: {function_desc_formatted}"}
        conversation.insert(0, system_message)
        batch.append(tokenizer.apply_chat_template(conversation, tokenize=False))
    return {"content": batch}

dataset = load_dataset(dataset_name)
dataset = dataset.map(
    preprocess,
    batched=True,
    remove_columns=dataset["train"].column_names
)
dataset = dataset["train"].train_test_split(0.1)
print(dataset)
print(dataset["train"][0])

In [None]:
from datasets import DatasetDict

# Assuming `dataset` is your DatasetDict
dataset = dataset.rename_columns({"content": "text"})

# Verify the change
print(dataset)

In [None]:
print(len(dataset["train"]))

## Create the PEFT model

In [None]:
peft_config = LoraConfig(r=8,
                         lora_alpha=16,
                         lora_dropout=0.1,
                         target_modules=["gate_proj","q_proj","lm_head","o_proj","k_proj","embed_tokens","down_proj","up_proj","v_proj"],
                         task_type=TaskType.CAUSAL_LM)

In [None]:
bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16,
            bnb_4bit_use_double_quant=True,
        )

In [None]:
class ChatmlSpecialTokens(str, Enum):
    user = "<|im_start|>user"
    assistant = "<|im_start|>assistant"
    system = "<|im_start|>system"
    function_call = "<|im_start|>function-call"
    function_response = "<|im_start|>function-response"
    eos_token = "<|im_end|>"
    bos_token = "<s>"
    pad_token = "<pad>"

    @classmethod
    def list(cls):
        return [c.value for c in cls]

tokenizer = AutoTokenizer.from_pretrained(
        model_name,
        pad_token=ChatmlSpecialTokens.pad_token.value,
        bos_token=ChatmlSpecialTokens.bos_token.value,
        eos_token=ChatmlSpecialTokens.eos_token.value,
        additional_special_tokens=ChatmlSpecialTokens.list(),
        trust_remote_code=True
    )
tokenizer.chat_template = template

model = AutoModelForCausalLM.from_pretrained(model_name,
                                             quantization_config=bnb_config,
                                             device_map="auto",
                                             attn_implementation="flash_attention_2")
model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=8)


In [None]:
model

## Training

In [None]:
output_dir = "Qwen2.5-Coder-7B_function_calling_instruct"
per_device_train_batch_size = 2
per_device_eval_batch_size = 2
gradient_accumulation_steps = 4
logging_steps = 5
learning_rate = 5e-4
max_grad_norm = 1.0
num_train_epochs=1
warmup_ratio = 0.1
lr_scheduler_type = "cosine"
max_seq_length = 2048

training_arguments = TrainingArguments(
    output_dir=output_dir,
    per_device_train_batch_size=per_device_train_batch_size,
    per_device_eval_batch_size=per_device_eval_batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,
    save_strategy="no",
    evaluation_strategy="epoch",
    logging_steps=logging_steps,
    learning_rate=learning_rate,
    max_grad_norm=max_grad_norm,
    weight_decay=0.1,
    warmup_ratio=warmup_ratio,
    lr_scheduler_type=lr_scheduler_type,
    bf16=True,
    report_to=["tensorboard", "wandb"],
    hub_private_repo=True,
    push_to_hub=True,
    num_train_epochs=num_train_epochs,
    gradient_checkpointing=True,
    gradient_checkpointing_kwargs={"use_reentrant": False}
)


In [None]:
# Shuffle and select subsets
train_subset = dataset['train'].shuffle(seed=42).select(range(5000))
test_subset = dataset['test'].shuffle(seed=42).select(range(500))

# Create a new DatasetDict with the subsets
subset_dataset = DatasetDict({
    'train': train_subset,
    'test': test_subset
})

In [None]:
trainer = SFTTrainer(
    model=model,
    args=training_arguments,
    train_dataset=subset_dataset["train"],
    eval_dataset=subset_dataset["test"],
    tokenizer=tokenizer,
    # packing=True,
    # dataset_text_field="content",
    # max_seq_length=max_seq_length,
    peft_config=peft_config,
    # dataset_kwargs={
    #     "append_concat_token": False,
    #     "add_special_tokens": False,
    # },
)

In [None]:
trainer.train()
trainer.save_model()

In [None]:
!nvidia-smi

## Loading the trained model and getting the predictions of the trained model

In [None]:
from peft import PeftModel, PeftConfig
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datasets import load_dataset
import torch

bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16,
            bnb_4bit_use_double_quant=True,
        )

peft_model_id = "badribn/Qwen2.5-Coder-7B_function_calling_instruct"
device = "cuda"
config = PeftConfig.from_pretrained(peft_model_id)
model = AutoModelForCausalLM.from_pretrained(config.base_model_name_or_path,
                                             quantization_config=bnb_config,
                                             device_map="auto",
                                             attn_implementation="flash_attention_2")
tokenizer = AutoTokenizer.from_pretrained(peft_model_id)
model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=8)
model = PeftModel.from_pretrained(model, peft_model_id)
# model.to(torch.bfloat16)
# model.cuda()
model.eval()

In [None]:
system_prompt = """You are a helpful assistant with access to the following functions. You do a reasoning step before acting. Use the functions if required -
functions: [{
    "name": "get_current_location",
    "description": "Returns the current location. ONLY use this if the user has not provided an explicit location in the query.",
    "parameters": {}
},
{
    "name": "search",
    "description": "A search engine. 
    Useful for when you need to answer questions and provide information about real-time updates, current events and latest NEWS.
    Useful to answer general knowledge questions and provide information about people, places, companies, facts, historical events, or other subjects.
    Input should be a search query."
    "parameters": {
        "type": "object",
        "properties": {
            "search_query": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "description": "The search query"
            }
        },
        "required": [
            "search_query"
        ]
    }
},
{
    "name": "code_analysis",
    "description": "Useful when the user query can be solved by writing Python code. 
    This function generates high quality Python code and runs it to solve the user query and provide the output.
    Useful when user asks queries that can be solved with Python code. 
    Useful for sorting, generating graphs for data visualization and analysis, solving arthmetic and logical questions, data wrangling and data chrunching tasks for csv files etc.",
    "parameters": {
        "type": "object",
        "properties": {
            "text_prompt": {
                "type": "string",
                "description": "The description of the problem to be solved by writing python code."
            }
        },
        "required": [
            "text_prompt"
        ]
    }
},
{
    "name": "analyze_image",
    "description": "Analyze the contents of an image",
    "parameters": {
        "type": "object",
        "properties": {
            "image_url": {
                "type": "string",
                "description": "The URL of the image"
            }
        },
        "required": [
            "image_url"
        ]
    }
},
{
    "name": "generate_image",
    "description": "generate image based on the given description. ",
    "parameters": {
        "type": "object",
        "properties": {
            "text_prompt": {
                "type": "string",
                "description": "The description of the image to be generated"
            }
        },
        "required": [
            "text_prompt"
        ]
    }
}]"""

# Can you tell me what's in the image www.example.com/myimage.jpg?
# Please give me the receipe for kadhai paneer.
# Where am I?
# Generate an image of Indian festival Sankranthi where children are flying colourful kites on their terrace.
# What is the latest news of earthquakes in Japan?
# Sort the array [1,7,5,6].
# I want to know about tour packages from India to Maldives.

messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": "I want to know about tour packages from India to Maldives."},
]
text = tokenizer.apply_chat_template(messages, tokenize=False)
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs = {k: v.to("cuda") for k,v in inputs.items()}
with torch.autocast(dtype=torch.bfloat16, device_type="cuda"):
    outputs = model.generate(**inputs, 
                             max_new_tokens=128, 
                             do_sample=True, 
                             top_p=0.95, 
                             temperature=0.2, 
                             repetition_penalty=1.0, 
                             eos_token_id=tokenizer.eos_token_id)
print(tokenizer.decode(outputs[0]))