In [1]:
# Google Colab setup
!pip install transformers datasets peft accelerate bitsandbytes scikit-learn -q

# Check GPU availability
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'None'}")

# Authenticate with Hugging Face to use Gemma
from huggingface_hub import login
login()  # You'll need a HF token with access to Gemma

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.1/59.1 MB[0m [31m18.8 MB/s[0m eta [36m0:00:00[0m
[?25hCUDA available: True
GPU: Tesla T4


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [26]:
import pandas as pd
import json
from datasets import load_dataset
from sklearn.model_selection import train_test_split

ds = load_dataset("HuggingFaceH4/MATH-500", split="test")
df = ds.to_pandas()

# Split into train and test
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)

df_train.head()
# df_test.head()

Unnamed: 0,problem,solution,answer,subject,level,unique_id
249,Find the unique $\textbf{odd}$ integer $t$ suc...,We could find the answer by trial and error --...,17.0,Number Theory,4,test/number_theory/1065.json
433,Convert $\frac{57}{160}$ to a terminating deci...,A terminating decimal can be written in the fo...,0.35625,Number Theory,2,test/number_theory/410.json
19,Let $a$ be a positive real number such that al...,Note that $x = -1$ is always a root of $x^3 + ...,3.0,Intermediate Algebra,3,test/intermediate_algebra/1000.json
322,Let $f(x) = x - 3$ and $q(x) = bx +1$. If $f(...,"We have $q(1) = b\cdot 1 + 1 = b+1$, so $f(q(1...",-1.0,Algebra,3,test/algebra/1936.json
332,"Let $x,$ $y,$ and $z$ be positive real numbers...",We can write $(x + y)(y + z)$ as $xz + y(x + y...,2.0,Intermediate Algebra,4,test/intermediate_algebra/190.json


In [27]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

model_id="google/gemma-3-1b-it"
device="cuda"

tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.pad_token = tokenizer.eos_token   # required for Gemma
tokenizer.padding_side = "right"

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    dtype=torch.float32 if device == "cpu" else torch.bfloat16,
    device_map="auto"
)

model.config.use_cache = False

print(f"dtype: {model.dtype}")

dtype: torch.bfloat16


In [28]:
SYSTEM_PROMPT = {
    "role": "system",
    "content": "You are a helpful math assistant that solves problems step by step."
}

USER_MESSAGES = [
    {
        "role": "user",
        "content": row.problem
    }
    for _, row in df_train.iloc[:5].iterrows()
]

POST_MESSAGE = {
        "role": "assistant",
        "content": "Solution: "
    }


PROMPTS = [
    [SYSTEM_PROMPT, USER_MSG, POST_MESSAGE]
    for USER_MSG in USER_MESSAGES
]

print(json.dumps(PROMPTS[0], indent=4))

tokenized = tokenizer.apply_chat_template(
    PROMPTS,
    continue_final_message=True,
    padding=True,
    return_tensors="pt"
).to(device)

out = model.generate(tokenized, max_new_tokens=14)

decoded = tokenizer.batch_decode(out)
print(decoded)

labels = [(d.split("\nSolution:")[-1]).strip() for d in decoded]
print(labels)

[
    {
        "role": "system",
        "content": "You are a helpful math assistant that solves problems step by step."
    },
    {
        "role": "user",
        "content": "Find the unique $\\textbf{odd}$ integer $t$ such that $0<t<23$ and $t+2$ is the inverse of $t$ modulo $23$."
    },
    {
        "role": "assistant",
        "content": "Solution: "
    }
]
['<bos><start_of_turn>user\nYou are a helpful math assistant that solves problems step by step.\n\nFind the unique $\\textbf{odd}$ integer $t$ such that $0<t<23$ and $t+2$ is the inverse of $t$ modulo $23$.<end_of_turn>\n<start_of_turn>model\nSolution:<eos><eos><eos><eos><eos><eos><eos><eos>preprocessing:\nLet $t$ be an odd integer such that $', '<bos><start_of_turn>user\nYou are a helpful math assistant that solves problems step by step.\n\nConvert $\\frac{57}{160}$ to a terminating decimal.<end_of_turn>\n<start_of_turn>model\nSolution:<eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><eos><

In [29]:
def generate_training_sample(conversation, target_response, max_length=512):
    """
    Generate a single training sample with proper label masking.

    For causal LM fine-tuning, we want:
    - input_ids: the full sequence (prompt + response)
    - labels: same as input_ids, but with -100 for tokens we don't want to compute loss on (the prompt)

    Args:
        conversation: list of message dicts with role/content (the prompt)
        target_response: the expected model output
        max_length: maximum sequence length

    Returns:
        dict with input_ids, attention_mask, labels
    """
    prompt_text = tokenizer.apply_chat_template(
        conversation,
        continue_final_message=True,
        tokenize=False,
        add_generation_prompt=False
    )

    full_text = prompt_text + target_response + tokenizer.eos_token

    full_encoding = tokenizer(
        full_text,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=max_length,
        add_special_tokens=False  # chat_template already added special tokens
    )

    # Tokenize just the prompt to find where labels should start
    prompt_encoding = tokenizer(
        prompt_text,
        return_tensors="pt",
        add_special_tokens=False
    )
    prompt_length = prompt_encoding["input_ids"].shape[1]

    # Create labels: -100 for prompt tokens (ignore in loss), actual tokens for response
    labels = full_encoding["input_ids"].clone()
    labels[0, :prompt_length] = -100  # Mask the prompt
    labels[labels == tokenizer.pad_token_id] = -100  # Also mask padding

    return {
        "input_ids": full_encoding["input_ids"],
        "attention_mask": full_encoding["attention_mask"],
        "labels": labels
    }


In [30]:
test_conversation = [
    {"role": "system", "content": "You are a helpful math assistant that solves problems step by step."},
    {"role": "user", "content": df_train.iloc[0].problem},
    {"role": "assistant", "content": "Solution: "}
]
test_response = df_train.iloc[0].solution

sample = generate_training_sample(test_conversation, test_response)
print(f"Input IDs shape: {sample['input_ids'].shape}")
print(f"Labels shape: {sample['labels'].shape}")
print(f"Non-masked label tokens: {(sample['labels'] != -100).sum().item()}")

Input IDs shape: torch.Size([1, 512])
Labels shape: torch.Size([1, 512])
Non-masked label tokens: 238


In [31]:
from torch.utils.data import Dataset, DataLoader

class MathDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=512):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]

        conversation = [
            {"role": "system", "content": "You are a helpful math assistant that solves problems step by step."},
            {"role": "user", "content": row.problem},
            {"role": "assistant", "content": "Solution: "}
        ]

        target = row.solution

        sample = generate_training_sample(conversation, target, self.max_length)

        return {
            "input_ids": sample["input_ids"].squeeze(0),
            "attention_mask": sample["attention_mask"].squeeze(0),
            "labels": sample["labels"].squeeze(0)
        }

# Create dataset
dataset = MathDataset(df_train, tokenizer, max_length=512)
print(f"Dataset size: {len(dataset)}")

# Test one sample
test_item = dataset[0]
print(f"Sample input_ids shape: {test_item['input_ids'].shape}")

Dataset size: 400
Sample input_ids shape: torch.Size([512])


In [32]:
from peft import LoraConfig, get_peft_model, TaskType

# Configure LoRA for Gemma
lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=8,  # Rank of the update matrices (lower = fewer params, higher = more capacity)
    lora_alpha=16,  # Scaling factor
    lora_dropout=0.1,  # Dropout for LoRA layers
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],  # Attention projection layers
    bias="none",
)

peft_model = get_peft_model(model, lora_config)
peft_model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": False})
peft_model.print_trainable_parameters()


trainable params: 1,490,944 || all params: 1,001,376,896 || trainable%: 0.1489


In [33]:
# Training configuration
from torch.optim import AdamW
from tqdm import tqdm

# Hyperparameters
BATCH_SIZE = 1
LEARNING_RATE = 2e-4
NUM_EPOCHS = 3
GRADIENT_ACCUMULATION_STEPS = 8  # Effective batch size = BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS

train_loader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True
)

optimizer = AdamW(
    filter(lambda p: p.requires_grad, peft_model.parameters()),
    lr=LEARNING_RATE,
    weight_decay=0.01
)

print(f"Training samples: {len(dataset)}")
print(f"Batch size: {BATCH_SIZE}")
print(f"Steps per epoch: {len(train_loader)}")
print(f"Total training steps: {len(train_loader) * NUM_EPOCHS}")


Training samples: 400
Batch size: 1
Steps per epoch: 400
Total training steps: 1200


In [34]:
import gc
gc.collect()
torch.cuda.empty_cache()

# Training loop
peft_model.train()

for epoch in range(NUM_EPOCHS):
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{NUM_EPOCHS}")

    for step, batch in enumerate(progress_bar):
        # Move batch to device
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        # Forward pass
        outputs = peft_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss / GRADIENT_ACCUMULATION_STEPS
        total_loss += loss.item() * GRADIENT_ACCUMULATION_STEPS

        # Backward pass
        loss.backward()

        # Update weights every GRADIENT_ACCUMULATION_STEPS
        if (step + 1) % GRADIENT_ACCUMULATION_STEPS == 0:
            optimizer.step()
            optimizer.zero_grad()

        progress_bar.set_postfix({"loss": f"{loss.item() * GRADIENT_ACCUMULATION_STEPS:.4f}"})

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch + 1} - Average Loss: {avg_loss:.4f}")

print("Training complete!")


Epoch 1/3: 100%|██████████| 400/400 [08:59<00:00,  1.35s/it, loss=0.8034]


Epoch 1 - Average Loss: nan


Epoch 2/3: 100%|██████████| 400/400 [08:58<00:00,  1.35s/it, loss=0.3065]


Epoch 2 - Average Loss: nan


Epoch 3/3: 100%|██████████| 400/400 [08:58<00:00,  1.35s/it, loss=0.4674]

Epoch 3 - Average Loss: nan
Training complete!





In [36]:
# Mount Google Drive and save there
from google.colab import drive
drive.mount('/content/drive')

OUTPUT_DIR = "/content/drive/MyDrive/gemma-math-lora"
peft_model.save_pretrained(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print(f"Model saved to {OUTPUT_DIR}")

Mounted at /content/drive
Model saved to /content/drive/MyDrive/gemma-math-lora


In [37]:
# Test inference with the fine-tuned model
peft_model.eval()

def generate_response(problem, max_new_tokens=256):
    """Generate a response for a given math problem."""
    conversation = [
        {"role": "system", "content": "You are a helpful math assistant that solves problems step by step."},
        {"role": "user", "content": problem},
        {"role": "assistant", "content": "Solution: "}
    ]

    prompt = tokenizer.apply_chat_template(
        conversation,
        continue_final_message=True,
        tokenize=False
    )

    inputs = tokenizer(prompt, return_tensors="pt", add_special_tokens=False).to(device)

    with torch.no_grad():
        outputs = peft_model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,  # Deterministic for testing
            pad_token_id=tokenizer.pad_token_id
        )

    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    # Extract just the assistant's response
    return response.split("Solution: ")[-1]

# Test with a sample from the MATH-500 dataset
test_problem = df.iloc[0].problem
print(f"Problem: {test_problem}\n")
print(f"Expected answer: {df.iloc[0].answer}\n")
print(f"Model response:\n{generate_response(test_problem)}")


The following generation flags are not valid and may be ignored: ['top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Problem: Convert the point $(0,3)$ in rectangular coordinates to polar coordinates.  Enter your answer in the form $(r,\theta),$ where $r > 0$ and $0 \le \theta < 2 \pi.$

Expected answer: \left( 3, \frac{\pi}{2} \right)

Model response:
user
You are a helpful math assistant that solves problems step by step.

Convert the point $(0,3)$ in rectangular coordinates to polar coordinates.  Enter your answer in the form $(r,\theta),$ where $r > 0$ and $0 \le \theta < 2 \pi.$
model
Solution:We have $r = \sqrt{0^2 + 3^2} = \sqrt{9} = 3$ and $\theta = \boxed{\frac{\pi}{2}}.$  Therefore, the point $(0,3)$ is represented by the point $(3,\frac{\pi}{2}).$

Final Answer:The final answer is $\boxed{\left(3,\frac{\pi}{2}\right)}$
