In [None]:
!pip install -q unsloth trl peft accelerate bitsandbytes

In [None]:
import json
import os
import re
import torch
import transformers
from datasets import load_dataset
from datasets import Dataset
from google.colab import userdata, files
from unsloth import FastLanguageModel
from unsloth.chat_templates import get_chat_template
from trl import SFTTrainer
from transformers import TrainingArguments

device = 'cuda'

hf_token = userdata.get('HF_TOKEN')

data_files = {
    "train": "train_2000.json",
    "validation": "validation.json",
    "test": "test.json",
}

dataset = load_dataset("json", data_files=data_files)
train_ds = dataset["train"]
val_ds   = dataset["validation"]
test_ds  = dataset["test"]


In [None]:
max_seq_length = 2048
dtype = None          # let Unsloth decide
dtype = None

model_name = "unsloth/Qwen2.5-Math-1.5B"

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = model_name,
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = True,
)

tokenizer = get_chat_template(tokenizer,chat_template = "qwen25",)


In [None]:
def format_prompt(example):
    q = example["question"]
    a = example["answer"]

    messages = [
        {
            "role": "system",
            "content": "please reason step by step and put final answer within \\boxed{}.",
        },
        {
            "role": "user",
            "content": q,
        },
        {
            "role": "assistant",
            "content": a,
        },
    ]

    text = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=False,)

    return {"text": text}

train_sft = train_ds.map(format_prompt)
val_sft   = val_ds.map(format_prompt)


In [None]:
def prompt_model(prompt):
  messages = [
    {"role": "system", "content": "Please reason step by step, and put your final answer within \\boxed{}."},
    {"role": "user", "content": prompt},
  ]

  text = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True,)

  model_inputs = tokenizer([text], return_tensors="pt").to(device)

  generated_ids = model.generate(**model_inputs,max_new_tokens=512)

  generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]

  response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

  return response

In [None]:
def extractAnswer(response):
    matches = re.findall(r"boxed\s*\{([^}]*)\}", response)
    for content in reversed(matches):
        content = content.strip()
        if content:
            return content
    return "Invalid Response Format"

def calculateAccuracy(predicted,labeled):
  correct = 0
  for i in range(len(predicted)):
    if predicted[i] == labeled[i]:
      correct += 1
  return (correct/len(predicted))

def model_assesment(hf_dataset, savefile_name):
    n = len(hf_dataset)
    responses = []
    labeled_answers = []
    predicted_answers = []

    for i, example in enumerate(hf_dataset):
        question = example["question"]
        label = example["answer"]

        # Save ground-truth
        actualAnswer = extractAnswer(label)
        labeled_answers.append(actualAnswer)

        # Run model
        response = prompt_model(question)
        responses.append(response)

        # Extract boxed answer
        predicted = extractAnswer(response)
        predicted_answers.append(predicted)

        print("\n")
        print("Predicted answer: ", predicted)
        print("actual answer: ", actualAnswer)
        print(f"progress: {i+1}/{n}")
        print("\n")

    accuracy = calculateAccuracy(predicted_answers, labeled_answers)

    results = {
        "responses": responses,
        "predicted": predicted_answers,
        "labeled": labeled_answers,
        "accuracy": accuracy,
    }

    with open(f"{savefile_name}.json", "w") as f:
        json.dump(results, f)

    print(f"Saved results to {savefile_name}.json")
    print("Accuracy:", accuracy)

In [None]:
model_assesment(test_ds,"test_baseline")

In [None]:
model_assesment(val_ds,"validation_baseline")

In [None]:
# Add LoRA to the base model
model = FastLanguageModel.get_peft_model(
    model,
    r = 32,
    target_modules = [
        "q_proj", "k_proj", "v_proj", "o_proj",
        "gate_proj", "up_proj", "down_proj",
    ],
    lora_alpha = 64,
    lora_dropout = 0.1,
    bias = "none",
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
    use_rslora = False,
    loftq_config = None,
)


In [None]:
training_args = TrainingArguments(
    per_device_train_batch_size = 2,
    gradient_accumulation_steps = 4,  # effective batch = 8
    warmup_steps = 10,
    num_train_epochs = 2,
    learning_rate = 2e-4,
    fp16 = not torch.cuda.is_bf16_supported(),
    bf16 = torch.cuda.is_bf16_supported(),
    logging_steps = 25,
    optim = "adamw_8bit",
    weight_decay = 0.01,
    lr_scheduler_type = "linear",
    seed = 3407,
    output_dir = "outputs",
    save_strategy = "epoch",
    save_total_limit = 2,
    dataloader_pin_memory = False,
    report_to = "none",
)

trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    train_dataset = train_sft,
    eval_dataset = val_sft,
    dataset_text_field = "text",
    max_seq_length = max_seq_length,
    args = training_args,
)


In [None]:
# Train the model
trainer_stats = trainer.train()

In [None]:
#validation
model_assesment(val_ds,"validation_finetuned")

In [None]:
#test
model_assesment(test_ds,"test_finetuned")

In [None]:
model.save_pretrained_gguf("gguf_model", tokenizer, quantization_method="q4_k_m")

In [None]:
gguf_files = [f for f in os.listdir("gguf_model") if f.endswith(".gguf")]
if gguf_files:
    gguf_file = os.path.join("gguf_model", gguf_files[0])
    print(f"Downloading: {gguf_file}")
    files.download(gguf_file)