<a target="_blank" href="https://colab.research.google.com/github/autoharness/CarTool-Instruct/blob/main/fine_tuning/Fine_Tuning_Car_Tool_Instruct_with_Hugging_Face.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Fine-tune for CarTool-Instruct

This notebook provides a workflow for fine-tuning large language models on the CarTool-Instruct dataset to enhance function-calling capabilities.

## Environment Setup

In [None]:
%pip install torch
%pip install -U transformers==4.57.1 trl==0.25.1 datasets==4.4.1 peft==0.7.0

You need to create a new Colab secret in the left toolbar. Specify `HF_TOKEN` as the 'Name', add your unique token as the 'Value', and toggle 'Notebook access' on.

In [None]:
from google.colab import userdata
from huggingface_hub import login

# Login into Hugging Face Hub
hf_token = userdata.get('HF_TOKEN')
login(hf_token)

In [None]:
import json
import re
import pandas as pd
import gc
import os
import torch
from abc import ABC, abstractmethod
from huggingface_hub import hf_hub_download
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from transformers.pipelines.pt_utils import KeyDataset
from datasets import load_dataset, concatenate_datasets
from trl import SFTConfig, SFTTrainer
from peft import LoraConfig, PeftModel

USE_LORA = False # Full Fine-tuning or Low-Rank Adaptation
MODEL_NAME = "google/functiongemma-270m-it"
MAX_TRAINING_SAMPLES = 9509
OUTPUT_DIR = f"/content/{MODEL_NAME.replace("/", "_")}_{"lora" if USE_LORA else "fft"}_{MAX_TRAINING_SAMPLES}"
DATA_FILE = hf_hub_download(repo_id="autoharness/CarTool-Instruct", filename="dataset.jsonl", repo_type="dataset")
CHAT_TEMPLATE_PATH = None # "/content/chat_template_for_qwen.jinja"

## Training

In [None]:
base_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",
    dtype="auto")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

if CHAT_TEMPLATE_PATH and os.path.exists(CHAT_TEMPLATE_PATH):
    print(f"Loading custom chat template from: {CHAT_TEMPLATE_PATH}")
    with open(CHAT_TEMPLATE_PATH, 'r', encoding='utf-8') as f:
        custom_template = f.read()
        tokenizer.chat_template = custom_template.strip()
else:
    print(f"Using default model chat template.")

print(f"Device: {base_model.device}")
print(f"DType:  {base_model.dtype}")
print(f"Attention implementation: {base_model.config._attn_implementation}")

In [None]:
def apply_format(sample, tokenizer):
    template_inputs = json.loads(sample["text"])

    prompt_and_completion = tokenizer.apply_chat_template(
        template_inputs["messages"],
        tools=template_inputs["tools"],
        tokenize=False,
        add_generation_prompt=False,
    )

    prompt = tokenizer.apply_chat_template(
        template_inputs["messages"][:-1],
        tools=template_inputs["tools"],
        tokenize=False,
        add_generation_prompt=True,
    )

    completion = prompt_and_completion[len(prompt) :]

    return {
        "prompt": prompt,
        "completion": completion,
        "split": template_inputs["metadata"],
    }

In [None]:
dataset = load_dataset("text", data_files=DATA_FILE, encoding="utf-8")[
    "train"
].shuffle()
processed_dataset = dataset.map(lambda x: apply_format(x, tokenizer))

train_dataset = processed_dataset.filter(lambda example: example["split"] == "train")
train_dataset = train_dataset.select(range(min(len(train_dataset), MAX_TRAINING_SAMPLES)))
eval_dataset = processed_dataset.filter(lambda example: example["split"] == "eval")

print(f"Training samples: {len(train_dataset)}, Eval samples: {len(eval_dataset)}")

combined_subset = concatenate_datasets([train_dataset, eval_dataset])
longest_example = max(
    combined_subset,
    key=lambda example: len(example["prompt"] + example["completion"]),
)
longest_example_token_count = len(
    tokenizer.tokenize(longest_example["prompt"] + longest_example["completion"])
)
max_token_count = longest_example_token_count + 100

print(f"The longest example length is {len(longest_example['prompt'] + longest_example['completion'])} with {longest_example_token_count} tokens.")


In [None]:
if USE_LORA:
    print(">>> CONFIGURING FOR LoRA TRAINING")
    learning_rate = 2e-4
    peft_config = LoraConfig(
        r=8,
        lora_alpha=16,
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM",
        target_modules=[
            "q_proj", "k_proj", "v_proj", "o_proj",
            "gate_proj", "up_proj", "down_proj"
        ],
    )
else:
    print(">>> CONFIGURING FOR FULL FINE-TUNING")
    learning_rate = 1e-5
    peft_config = None

args = SFTConfig(
    output_dir=OUTPUT_DIR,
    num_train_epochs=2,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=32,
    logging_strategy="steps",
    logging_steps=50,
    eval_strategy="no",
    save_strategy="no",
    learning_rate=learning_rate,
    lr_scheduler_type="cosine",
    max_length=max_token_count,
    gradient_checkpointing=True,
    packing=False,
    optim="adamw_torch_fused",
    bf16=True,
    completion_only_loss=True,
    report_to="none",
)
base_model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
trainer = SFTTrainer(
    model=base_model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    peft_config=peft_config,
    processing_class=tokenizer,
)
trainer.train()

trainer.save_model(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)

print(f"Training finished. Model/Adapter saved to {OUTPUT_DIR}")

if USE_LORA:
    print("\n>>> STARTING LORA MERGE PROCESS...")

    # Clean up memory before reloading the model
    del trainer
    del base_model
    gc.collect()
    torch.cuda.empty_cache()

    print(f">>> Reloading base model: {MODEL_NAME}")
    base_model_reload = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        device_map="auto",
        torch_dtype="auto"
    )

    # Load the adapter
    model_to_merge = PeftModel.from_pretrained(base_model_reload, OUTPUT_DIR)

    # Merge and Unload
    print(">>> Merging...")
    merged_model = model_to_merge.merge_and_unload()

    # Save Merged Model
    merged_output_dir = os.path.join(OUTPUT_DIR, "merged")
    print(f">>> Saving merged model to: {merged_output_dir}")
    merged_model.save_pretrained(merged_output_dir)
    tokenizer.save_pretrained(merged_output_dir)

    print(">>> Merge Complete.")

In [None]:
from google.colab import files
from pathlib import Path

zip_filename = f"{Path(OUTPUT_DIR).name}.zip"

!zip -r -q {zip_filename} {OUTPUT_DIR}

files.download(zip_filename)

## Evaluation

In [None]:
def infer_value_type(value_str):
    """
    Parses a raw string value into its appropriate Python type.
    """
    value_str = value_str.strip()

    if value_str.startswith("<escape>") and value_str.endswith("<escape>"):
        return value_str[8:-8]  # Remove tags and return as string

    if value_str == "true":
        return True
    if value_str == "false":
        return False
    if value_str == "null":
        return None

    try:
        return int(value_str)
    except ValueError:
        pass

    try:
        return float(value_str)
    except ValueError:
        pass

    return value_str

def extract_function_call_json(model_output):
    """
    For models producing JSON blocks (e.g., Gemma).
    """
    results = []
    # Try to extract JSON from code blocks
    json_pattern = r"```tool_code\s*(.*?)\s*```"
    match = re.search(json_pattern, model_output, re.DOTALL)

    json_str = ""
    if match:
        json_str = match.group(1).strip()
    else:
        # Fallback: look for outer brackets
        start_array = model_output.find("[")
        start_obj = model_output.find("{")

        if start_array != -1 and (start_obj == -1 or start_array < start_obj):
            end_idx = model_output.rfind("]")
            if end_idx != -1:
                json_str = model_output[start_array : end_idx + 1]
        elif start_obj != -1:
            end_idx = model_output.rfind("}")
            if end_idx != -1:
                json_str = model_output[start_obj : end_idx + 1]

    if not json_str:
        return results

    try:
        data = json.loads(json_str)
        if isinstance(data, dict):
            data = [data]

        if isinstance(data, list):
            for item in data:
                func_name = item.get("name")
                params = item.get("arguments", {})
                if func_name:
                    results.append(
                        {"function": {"name": func_name, "arguments": params}}
                    )
    except json.JSONDecodeError:
        pass

    return results

def extract_function_call_functiongemma(model_output):
    """
    Parses a string containing specific function call markers and returns
    a list of function call objects. Here is an example of the obejct:

    ```
    call:open_map{query:San Francisco}
    ```

    Args:
        model_output (str): The model output string.

    Returns:
        list: A list of dictionaries representing the function calls.
    """
    results = []

    # Pattern to extract the full content of a single function call
    # Flags: DOTALL allows matching across newlines if necessary
    call_pattern = r"<start_function_call>(.*?)<end_function_call>"
    raw_calls = re.findall(call_pattern, model_output, re.DOTALL)
    for raw_call in raw_calls:
        # Check if the content starts with 'call:'
        if not raw_call.strip().startswith("call:"):
            continue

        # Extract function name
        # Expected format: call:func_name{...}
        try:
            # Split only on the first brace to separate name and args
            pre_brace, args_segment = raw_call.split("{", 1)

            function_name = pre_brace.replace("call:", "").strip()

            # Remove the trailing closing brace '}'
            args_content = args_segment.strip()
            if args_content.endswith("}"):
                args_content = args_content[:-1]

            arguments = {}

            arg_pattern = r"(?:^|,)\s*(?P<key>[^:]+):(?P<value><escape>.*?<escape>|[^,]*)"
            arg_matches = re.finditer(arg_pattern, args_content, re.DOTALL)
            for match in arg_matches:
                key = match.group("key").strip()
                raw_value = match.group("value").strip()

                arguments[key] = infer_value_type(raw_value)

            results.append(
                {"function": {"name": function_name, "arguments": arguments}}
            )

        except ValueError:
            # Handles cases where syntax might be malformed (e.g., missing '{')
            continue

    return results

def get_eval_logs(dataset, pipe, extract_fn):
    batch_size = 32
    logs = []

    print(f"Starting prediction on {len(dataset)} samples...")

    # Iterate over the pipeline results
    for i, output in enumerate(pipe(KeyDataset(dataset, "prompt"), batch_size=batch_size)):
        orig_data = dataset[i]["text"]
        messages = json.loads(orig_data)["messages"]
        user_message = messages[1]
        assistant_first_message = messages[2]
        input_prompt = dataset[i]["prompt"]

        # Extract generated text.
        model_output_only = output[0]["generated_text"][len(input_prompt):].strip()

        output_fc = extract_fn(model_output_only)

        logs.append(
            {
                "user": user_message["content"],
                "target_fc": assistant_first_message.get("tool_calls", []),
                "target_text": assistant_first_message.get("content"),
                "output_fc": output_fc,
                "output_text": model_output_only,
            }
        )

        if (i + 1) % batch_size == 0:
            print(f"Eval process: {(i + 1) * 100.0 / len(dataset):.2f}%")

    return logs

def get_scored_data_frame(dataset, pipe, extract_fn):
    logs = get_eval_logs(dataset, pipe, extract_fn)
    logs_df = pd.DataFrame.from_records(logs)

    scored = pd.DataFrame()
    scored["user"] = logs_df["user"]
    scored["target_names"] = logs_df["target_fc"].apply(
        lambda x: [fc["function"]["name"] for fc in x]
    )
    scored["output_names"] = logs_df["output_fc"].apply(
        lambda x: [fc["function"]["name"] for fc in x]
    )

    # Sort arguments for consistent comparison
    scored["target_arguments"] = logs_df["target_fc"].apply(
        lambda x: [dict(sorted(fc["function"]["arguments"].items())) for fc in x]
    )
    scored["output_arguments"] = logs_df["output_fc"].apply(
        lambda x: [dict(sorted(fc["function"]["arguments"].items())) for fc in x]
    )

    scored["target_text"] = logs_df["target_text"]
    scored["output_text"] = logs_df["output_text"]

    scored["correct_names"] = scored["target_names"] == scored["output_names"]
    scored["correct_arguments"] = (
        scored["target_arguments"] == scored["output_arguments"]
    )
    scored["correct"] = scored["correct_names"] & scored["correct_arguments"]

    return scored

def review(scored):
    scored["incorrect_names"] = scored["target_names"] != scored["output_names"]
    scored["incorrect_arguments"] = (
        scored["target_arguments"] != scored["output_arguments"]
    )
    scored["incorrect"] = scored["incorrect_names"] | scored["incorrect_arguments"]

    print(f"\nTotal Incorrect: {len(scored[scored['incorrect']])}")
    for index, row in scored[scored["incorrect"]].iterrows():
        print(f"Sample #{index} Prompt: {row['user']}")
        print(f"Expected: {row['target_names']}, {row['target_arguments']}")
        print(f"Actual  : {row['output_names']}, {row['output_arguments']}")
        print("---------------")

In [None]:
if 'trainer' in globals(): del trainer
if 'base_model' in globals(): del base_model
if 'merged_model' in globals(): del merged_model
gc.collect()
torch.cuda.empty_cache()

if USE_LORA:
    eval_model_path = os.path.join(OUTPUT_DIR, "merged")
    print(f"Loading MERGED LoRA model from: {eval_model_path}")
else:
    eval_model_path = OUTPUT_DIR
    print(f"Loading Full Fine-Tuned model from: {eval_model_path}")

target_model = AutoModelForCausalLM.from_pretrained(
    eval_model_path,
    device_map="auto",
    torch_dtype="auto"
)
tokenizer = AutoTokenizer.from_pretrained(eval_model_path)

if "functiongemma" in MODEL_NAME:
    current_extractor = extract_function_call_functiongemma
else:
    current_extractor = extract_function_call_json

print(f"Running evaluation using extractor: {current_extractor.__name__}")

pipe = pipeline(
    "text-generation",
    model=target_model,
    tokenizer=tokenizer,
    temperature=0.001
)

print(f"Starting evaluation on {len(eval_dataset)} validation samples...")
model_scored = get_scored_data_frame(
    eval_dataset,
    pipe,
    extract_fn=current_extractor
)

corrected_values = model_scored["correct"]
accuracy = corrected_values.mean()

print(f"\n{'='*30}")
print(f"EVALUATION RESULTS")
print(f"{'='*30}")
print(f"Model Path: {eval_model_path}")
print(f"Final Accuracy: {accuracy:.2%}")
print(f"{'='*30}")

review(model_scored)