In [1]:
# Install required Python packages
%pip install \
  huggingface_hub \
  "transformers>=4.36.0" \
  peft \
  datasets \
  trl \
  jsonschema \
  litellm \
  "jinja2>=3.1.0" \
  "torch>=2.0.0" \
  openai \
  jupyterlab \
  requests 

Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install --upgrade git+https://github.com/meta-llama/llama-stack-client-python.git@main

Collecting git+https://github.com/meta-llama/llama-stack-client-python.git@main
  Cloning https://github.com/meta-llama/llama-stack-client-python.git (to revision main) to /private/var/folders/n9/jvwmjx1j6vn5njz069y5lcn40000gn/T/pip-req-build-jru5edi4
  Running command git clone --filter=blob:none --quiet https://github.com/meta-llama/llama-stack-client-python.git /private/var/folders/n9/jvwmjx1j6vn5njz069y5lcn40000gn/T/pip-req-build-jru5edi4
  Resolved https://github.com/meta-llama/llama-stack-client-python.git to commit 254c646645024c4d51df9ae33a6f0992471a3e1a
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import sys
import json
import random
import requests
from pprint import pprint
from typing import Any, Dict, List, Union
from time import sleep, time

import numpy as np
import torch
from datasets import load_dataset
from huggingface_hub import HfApi
from openai import OpenAI

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# Add parent directory to path to import config
sys.path.append(os.path.join(os.getcwd(), '..'))
from config import *

# LlamaStack Server endpoint
# LLAMASTACK_URL = "http://localhost:8321"

print("Configuration:")
print(f"LlamaStack Server: {LLAMASTACK_URL}")
print(f"Data Store: {NDS_URL}")
print(f"Entity Store: {ENTITY_STORE_URL}")
print(f"NIM: {NIM_URL}")
print(f"Namespace: {NMS_NAMESPACE}")
print(f"Base Model: {BASE_MODEL}")

Configuration:
LlamaStack Server: http://localhost:8321
Data Store: http://localhost:8001
Entity Store: http://localhost:8002
NIM: http://localhost:8006
Namespace: xlam-tutorial-ns
Base Model: meta/llama-3.2-1b-instruct


In [5]:
from llama_stack_client import LlamaStackClient
client = LlamaStackClient(base_url=LLAMASTACK_URL)
client._version

'0.4.0-alpha.1'

In [6]:
SEED = 1234
LIMIT_TOOL_PROPERTIES = 8  # WAR for NIM bug with large tool properties

torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
np.random.seed(SEED)
random.seed(SEED)

In [7]:
# Processed data will be stored here
DATA_ROOT = os.path.join(os.getcwd(), "data")
CUSTOMIZATION_DATA_ROOT = os.path.join(DATA_ROOT, "customization")
VALIDATION_DATA_ROOT = os.path.join(DATA_ROOT, "validation")
EVALUATION_DATA_ROOT = os.path.join(DATA_ROOT, "evaluation")

os.makedirs(DATA_ROOT, exist_ok=True)
os.makedirs(CUSTOMIZATION_DATA_ROOT, exist_ok=True)
os.makedirs(VALIDATION_DATA_ROOT, exist_ok=True)
os.makedirs(EVALUATION_DATA_ROOT, exist_ok=True)

print(f"Data directories created at: {DATA_ROOT}")

Data directories created at: /Users/hacohen/Desktop/repos/install-NeMo-on-OpenShift/llamastack/e2e-test/server/data


In [8]:
from config import HF_TOKEN

os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HF_ENDPOINT"] = "https://huggingface.co"

In [9]:
# Download from Hugging Face
dataset = load_dataset("Salesforce/xlam-function-calling-60k")

# Inspect a sample
example = dataset['train'][0]
pprint(example)

{'answers': '[{"name": "live_giveaways_by_type", "arguments": {"type": '
            '"beta"}}, {"name": "live_giveaways_by_type", "arguments": '
            '{"type": "game"}}]',
 'id': 0,
 'query': 'Where can I find live giveaways for beta access and games?',
 'tools': '[{"name": "live_giveaways_by_type", "description": "Retrieve live '
          'giveaways from the GamerPower API based on the specified type.", '
          '"parameters": {"type": {"description": "The type of giveaways to '
          'retrieve (e.g., game, loot, beta).", "type": "str", "default": '
          '"game"}}}]'}


In [10]:
def normalize_type(param_type: str) -> str:
    """
    Normalize Python type hints to OpenAI function spec types.
    """
    param_type = param_type.strip()

    if "," in param_type and "default" in param_type:
        param_type = param_type.split(",")[0].strip()

    if param_type.startswith("default="):
        return "string"

    param_type = param_type.replace(", optional", "").strip()

    if param_type.startswith("Callable"):
        return "string"
    if param_type.startswith("Tuple"):
        return "array"
    if param_type.startswith("List["):
        return "array"
    if param_type.startswith("Set") or param_type == "set":
        return "array"

    type_mapping: Dict[str, str] = {
        "str": "string",
        "int": "integer",
        "float": "number",
        "bool": "boolean",
        "list": "array",
        "dict": "object",
        "List": "array",
        "Dict": "object",
        "set": "array",
        "Set": "array"
    }

    if param_type in type_mapping:
        return type_mapping[param_type]
    else:
        print(f"Unknown type: {param_type}")
        return "string"


def convert_tools_to_openai_spec(tools: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
    if isinstance(tools, str):
        try:
            tools = json.loads(tools)
        except json.JSONDecodeError as e:
            print(f"Failed to parse tools string as JSON: {e}")
            return []

    if not isinstance(tools, list):
        print(f"Expected tools to be a list, but got {type(tools)}")
        return []

    openai_tools: List[Dict[str, Any]] = []
    for tool in tools:
        if not isinstance(tool, dict):
            print(f"Expected tool to be a dictionary, but got {type(tool)}")
            continue

        if not isinstance(tool.get("parameters"), dict):
            print(f"Expected 'parameters' to be a dictionary for tool: {tool}")
            continue

        normalized_parameters: Dict[str, Dict[str, Any]] = {}
        for param_name, param_info in tool["parameters"].items():
            if not isinstance(param_info, dict):
                print(f"Expected parameter info to be a dictionary for: {param_name}")
                continue

            param_dict = {
                "description": param_info.get("description", ""),
                "type": normalize_type(param_info.get("type", "")),
            }

            default_value = param_info.get("default")
            if default_value is not None and default_value != "":
                param_dict["default"] = default_value

            normalized_parameters[param_name] = param_dict

        openai_tool = {
            "type": "function",
            "function": {
                "name": tool["name"],
                "description": tool["description"],
                "parameters": {"type": "object", "properties": normalized_parameters},
            },
        }
        openai_tools.append(openai_tool)
    return openai_tools


def save_jsonl(filename, data):
    """Write a list of json objects to a .jsonl file"""
    with open(filename, "w") as f:
        for entry in data:
            f.write(json.dumps(entry) + "\n")


def convert_tool_calls(xlam_tools):
    """Convert XLAM tool format to OpenAI's tool schema."""
    tools = []
    for tool in json.loads(xlam_tools):
        tools.append({"type": "function", "function": {"name": tool["name"], "arguments": tool.get("arguments", {})}})
    return tools


def convert_example(example, dataset_type='single'):
    """Convert an XLAM dataset example to OpenAI format."""
    obj = {"messages": []}

    obj["messages"].append({"role": "user", "content": example["query"]})

    if example.get("tools"):
        obj["tools"] = convert_tools_to_openai_spec(example["tools"])

    assistant_message = {"role": "assistant", "content": ""}
    if example.get("answers"):
        tool_calls = convert_tool_calls(example["answers"])
        
        if dataset_type == "single":
            if len(tool_calls) == 1:
                assistant_message["tool_calls"] = tool_calls
            else:
                return None
        else:
            assistant_message["tool_calls"] = tool_calls
                
    obj["messages"].append(assistant_message)

    return obj


def convert_example_eval(entry):
    """Convert a single entry to the evaluator format"""
    # WAR for NIM bug with too many tool properties
    for tool in entry["tools"]:
        if len(tool["function"]["parameters"]["properties"]) > LIMIT_TOOL_PROPERTIES:
            return None
    
    new_entry = {
        "messages": [],
        "tools": entry["tools"],
        "tool_calls": []
    }
    
    for msg in entry["messages"]:
        if msg["role"] == "assistant" and "tool_calls" in msg:
            new_entry["tool_calls"] = msg["tool_calls"]
        else:
            new_entry["messages"].append(msg)
    
    return new_entry


def convert_dataset_eval(data):
    """Convert the entire dataset for evaluation."""
    return [result for entry in data if (result := convert_example_eval(entry)) is not None]


def read_jsonl(file_path):
    """Reads a JSON Lines file and yields parsed JSON objects"""
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if not line:
                continue
            try:
                yield json.loads(line)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
                continue

print("Data transformation functions loaded successfully")

Data transformation functions loaded successfully


In [11]:
# Test conversion on the example
converted_example = convert_example(example)
print("Converted example:")
pprint(converted_example)

Converted example:
None


In [12]:
# Convert all examples
all_examples = []
with open(os.path.join(DATA_ROOT, "xlam_openai_format.jsonl"), "w") as f:
    for example in dataset["train"]:
        converted = convert_example(example)
        if converted is not None:
            all_examples.append(converted)
            f.write(json.dumps(converted) + "\n")

print(f"Converted {len(all_examples)} examples")

Converted 28461 examples


In [13]:
# Configure dataset size
NUM_EXAMPLES = 5000

assert NUM_EXAMPLES <= len(all_examples), \
    f"{NUM_EXAMPLES} exceeds the total number of available ({len(all_examples)}) data points"

# Randomly sample and split
sampled_examples = random.sample(all_examples, NUM_EXAMPLES)

train_size = int(0.7 * len(sampled_examples))
val_size = int(0.15 * len(sampled_examples))

train_data = sampled_examples[:train_size]
val_data = sampled_examples[train_size : train_size + val_size]
test_data = sampled_examples[train_size + val_size :]

# Save splits
save_jsonl(os.path.join(CUSTOMIZATION_DATA_ROOT, "training.jsonl"), train_data)
save_jsonl(os.path.join(VALIDATION_DATA_ROOT, "validation.jsonl"), val_data)

# Convert test data for evaluation
test_data_eval = convert_dataset_eval(test_data)
save_jsonl(os.path.join(EVALUATION_DATA_ROOT, "xlam-test-single.jsonl"), test_data_eval)

print(f"Dataset split complete:")
print(f"  Training: {len(train_data)} examples")
print(f"  Validation: {len(val_data)} examples")
print(f"  Test: {len(test_data_eval)} examples")

Dataset split complete:
  Training: 3500 examples
  Validation: 750 examples
  Test: 713 examples


In [14]:
def create_namespaces(entity_host, ds_host, namespace):
    # Create namespace in Entity Store
    entity_store_url = f"{entity_host}/v1/namespaces"
    resp = requests.post(entity_store_url, json={"id": namespace})
    assert resp.status_code in (200, 201, 409, 422), \
        f"Unexpected response from Entity Store: {resp.status_code}"
    print(f"Entity Store: {resp.status_code}")

    # Create namespace in Data Store
    nds_url = f"{ds_host}/v1/datastore/namespaces"
    resp = requests.post(nds_url, data={"namespace": namespace})
    assert resp.status_code in (200, 201, 409, 422), \
        f"Unexpected response from Data Store: {resp.status_code}"
    print(f"Data Store: {resp.status_code}")

create_namespaces(entity_host=ENTITY_STORE_URL, ds_host=NDS_URL, namespace=NMS_NAMESPACE)

Entity Store: 200
Data Store: 201


In [15]:
# Verify namespaces
res = requests.get(f"{NDS_URL}/v1/datastore/namespaces/{NMS_NAMESPACE}")
print(f"Data Store: {res.status_code}")
print(json.dumps(res.json(), indent=2))

res = requests.get(f"{ENTITY_STORE_URL}/v1/namespaces/{NMS_NAMESPACE}")
print(f"\nEntity Store: {res.status_code}")
print(json.dumps(res.json(), indent=2))

Data Store: 201
{
  "namespace": "xlam-tutorial-ns",
  "created_at": "2025-11-19T09:42:59Z",
  "updated_at": "2025-11-19T09:42:59Z"
}

Entity Store: 200
{
  "id": "xlam-tutorial-ns",
  "created_at": "2025-11-19T09:42:59.402383",
  "updated_at": "2025-11-19T09:42:59.402385",
  "description": null,
  "project": null,
  "custom_fields": {},
  "ownership": null
}


In [16]:
repo_id = f"{NMS_NAMESPACE}/{DATASET_NAME}"
print(f"Repository ID: {repo_id}")

hf_api = HfApi(endpoint=f"{NDS_URL}/v1/hf", token="")

# Create repo
hf_api.create_repo(
    repo_id=repo_id,
    repo_type='dataset',
)

print(f"Dataset repository created: {repo_id}")

Repository ID: xlam-tutorial-ns/xlam-ft-dataset
Dataset repository created: xlam-tutorial-ns/xlam-ft-dataset


In [17]:
# Upload dataset files
train_fp = f"{CUSTOMIZATION_DATA_ROOT}/training.jsonl"
val_fp = f"{VALIDATION_DATA_ROOT}/validation.jsonl"
test_fp = f"{EVALUATION_DATA_ROOT}/xlam-test-single.jsonl"

hf_api.upload_file(
    path_or_fileobj=train_fp,
    path_in_repo="training/training.jsonl",
    repo_id=repo_id,
    repo_type='dataset',
)

hf_api.upload_file(
    path_or_fileobj=val_fp,
    path_in_repo="validation/validation.jsonl",
    repo_id=repo_id,
    repo_type='dataset',
)

hf_api.upload_file(
    path_or_fileobj=test_fp,
    path_in_repo="testing/xlam-test-single.jsonl",
    repo_id=repo_id,
    repo_type='dataset',
)

print("Dataset files uploaded successfully")

training.jsonl: 100%|██████████| 6.06M/6.06M [00:04<00:00, 1.32MB/s]
validation.jsonl: 100%|██████████| 1.30M/1.30M [00:00<00:00, 7.37MB/s]
xlam-test-single.jsonl: 100%|██████████| 1.19M/1.19M [00:00<00:00, 6.81MB/s]


Dataset files uploaded successfully


In [18]:
# Register dataset via LlamaStack
# Note: We register with localfs provider, but also need Entity Store for customizer
# So we do both: LlamaStack registration + Entity Store registration

# First: Register via LlamaStack client (for LlamaStack tracking)
try:
    response = client.beta.datasets.register(
        dataset_id=f"{NMS_NAMESPACE}/{DATASET_NAME}",
        purpose="post-training/messages",
        source={
            "type": "uri",
            "uri": f"hf://datasets/{NMS_NAMESPACE}/{DATASET_NAME}"
        },
        metadata={
            "description": "Tool calling xLAM dataset in OpenAI ChatCompletions format",
            "project": "tool_calling",
            "provider_id": "nvidia"  # Hint for nvidia provider
        }
    )
    print("✅ Dataset registered via LlamaStack client!")
    print(f"   ID: {response.identifier}")
    print(f"   Provider: {response.provider_id}")
except Exception as e:
    if '409' in str(e) or 'already exists' in str(e).lower():
        print("⚠️ Dataset already exists in LlamaStack - continuing...")
    else:
        print(f"Warning: {e}")

# Second: ALSO register in Entity Store (required for nvidia customizer)
# The customizer queries Entity Store directly, not LlamaStack
import requests
response = requests.post(
    f"{ENTITY_STORE_URL}/v1/datasets",
    json={
        "name": DATASET_NAME,
        "namespace": NMS_NAMESPACE,
        "description": "Tool calling xLAM dataset in OpenAI ChatCompletions format",
        "files_url": f"hf://datasets/{repo_id}",
        "project": "tool_calling",
    },
)

if response.status_code in (200, 201):
    print("\n✅ Dataset also registered in Entity Store (for customizer)!")
elif response.status_code == 409:
    print("\n⚠️ Dataset already exists in Entity Store - continuing...")
else:
    print(f"\n⚠️ Entity Store registration failed: {response.status_code}")
    print(f"   This may cause fine-tuning to fail.")

print(f"\n✅ Dataset ready for fine-tuning: {NMS_NAMESPACE}/{DATASET_NAME}")

  response = client.beta.datasets.register(
INFO:httpx:HTTP Request: POST http://localhost:8321/v1beta/datasets "HTTP/1.1 500 Internal Server Error"



✅ Dataset also registered in Entity Store (for customizer)!

✅ Dataset ready for fine-tuning: xlam-tutorial-ns/xlam-ft-dataset


# Part III: Fine-Tuning via LlamaStack

Train a LoRA adapter using NeMo Customizer through the LlamaStack post-training API.

## Create Fine-Tuning Job

In [19]:
# Create unique job ID
import uuid
from time import time
unique_suffix = int(time())
job_uuid = f"xlam-ft-{unique_suffix}"
print(f"Creating fine-tuning job: {job_uuid}")

# Submit fine-tuning job via LlamaStack client
ft_job = client.alpha.post_training.supervised_fine_tune(
    job_uuid=job_uuid,
    model=f"{BASE_MODEL}@v1.0.0+A100",
    training_config={
        "n_epochs": 1,
        "data_config": {
            "batch_size": 8,
            "dataset_id": DATASET_NAME,
            "shuffle": True,
            "data_format": "instruct"
        },
        "optimizer_config": {
            "optimizer_type": "adamw",
            "lr": 0.0001,
            "weight_decay": 0.01,
            "num_warmup_steps": 100
        }
    },
    hyperparam_search_config={},
    logger_config={},
    algorithm_config={
        "type": "LoRA",
        "rank": 32,
        "alpha": 16,
        "lora_attn_modules": [],
        "apply_lora_to_mlp": True,
        "apply_lora_to_output": False,
        "use_dora": False,
        "quantize_base": False
    },
    checkpoint_dir=""
)

print("✓ Fine-tuning job created successfully via LlamaStack client!")
print(f"Job UUID: {ft_job.job_uuid}")

# Store for later use - USE THE ACTUAL OUTPUT MODEL FROM RESPONSE
JOB_ID = ft_job.job_uuid

# Check if response has output_model field
if hasattr(ft_job, 'output_model') and ft_job.output_model:
    CUSTOMIZED_MODEL = ft_job.output_model
    print(f"Output Model: {CUSTOMIZED_MODEL}")
else:
    # Fallback to constructed name
    CUSTOMIZED_MODEL = f"{NMS_NAMESPACE}/llama-3.2-1b-xlam@{job_uuid}"
    print(f"Output Model (constructed): {CUSTOMIZED_MODEL}")
    print(f"⚠️ No output_model in response - using constructed name")

Creating fine-tuning job: xlam-ft-1763545400


INFO:httpx:HTTP Request: POST http://localhost:8321/v1alpha/post-training/supervised-fine-tune "HTTP/1.1 200 OK"


✓ Fine-tuning job created successfully via LlamaStack client!
Job UUID: cust-SicAnj7TGgfbjBGCnKMEW1
Output Model: default/test-example-model@v1


## Monitor Fine-Tuning Progress

In [20]:
def wait_customization_job(job_uuid: str, polling_interval: int = 30, timeout: int = 3600):
    """
    Monitor a fine-tuning job until completion using LlamaStack client.
    """
    start_time = time()
    print(f"Monitoring fine-tuning job: {job_uuid}")
    
    while True:
        elapsed = time() - start_time
        if elapsed > timeout:
            raise RuntimeError(f"Job {job_uuid} exceeded timeout of {timeout}s")
        
        # Get job status via LlamaStack client
        jobs_response = client.alpha.post_training.job.list()
        
        # Handle both list and object with .data attribute
        if isinstance(jobs_response, list):
            jobs_list = jobs_response
        elif hasattr(jobs_response, 'data'):
            jobs_list = jobs_response.data
        else:
            jobs_list = [jobs_response]
        
        # Find our job
        job_data = next((j for j in jobs_list if j.job_uuid == job_uuid), None)
        
        if not job_data:
            print(f"Warning: Job {job_uuid} not found")
            sleep(polling_interval)
            continue
        
        job_status = job_data.status
        
        # Print progress
        if hasattr(job_data, 'status_details') and job_data.status_details:
            details = job_data.status_details
            steps = details.get('steps_completed', 0)
            total_steps = details.get('steps_per_epoch', 1)
            epochs = details.get('epochs_completed', 0)
            print(f"[{elapsed:.0f}s] Status: {job_status} | Epoch: {epochs} | Steps: {steps}/{total_steps}")
        else:
            print(f"[{elapsed:.0f}s] Status: {job_status}")
        
        # Check if job is complete
        if job_status not in ["scheduled", "in_progress", "created", "running"]:
            print(f"\n✓ Job completed with status: {job_status}")
            return job_status
        
        sleep(polling_interval)

# Start monitoring
final_status = wait_customization_job(JOB_ID)

Monitoring fine-tuning job: cust-SicAnj7TGgfbjBGCnKMEW1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[0s] Status: in_progress | Epoch: 0 | Steps: 0/1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[30s] Status: in_progress | Epoch: 0 | Steps: 0/1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[61s] Status: in_progress | Epoch: 0 | Steps: 0/1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[91s] Status: in_progress | Epoch: 0 | Steps: 0/1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[122s] Status: in_progress | Epoch: 0 | Steps: 0/1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[152s] Status: in_progress | Epoch: 0 | Steps: 0/1


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[183s] Status: in_progress | Epoch: 0 | Steps: 0/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[213s] Status: in_progress | Epoch: 0 | Steps: 0/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[244s] Status: in_progress | Epoch: 0 | Steps: 100/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[274s] Status: in_progress | Epoch: 0 | Steps: 100/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[305s] Status: in_progress | Epoch: 0 | Steps: 100/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[335s] Status: in_progress | Epoch: 0 | Steps: 200/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[366s] Status: in_progress | Epoch: 0 | Steps: 200/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[396s] Status: in_progress | Epoch: 0 | Steps: 200/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[427s] Status: in_progress | Epoch: 0 | Steps: 300/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[457s] Status: in_progress | Epoch: 0 | Steps: 300/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[488s] Status: in_progress | Epoch: 0 | Steps: 300/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[518s] Status: in_progress | Epoch: 0 | Steps: 400/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[548s] Status: in_progress | Epoch: 0 | Steps: 400/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[579s] Status: in_progress | Epoch: 1 | Steps: 438/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[609s] Status: in_progress | Epoch: 1 | Steps: 438/438


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/post-training/jobs "HTTP/1.1 200 OK"


[640s] Status: completed | Epoch: 1 | Steps: 438/438

✓ Job completed with status: completed


## Verify Customized Model

In [21]:
# List all available models via LlamaStack
models_response = client.models.list()

# Handle both list and object responses
if isinstance(models_response, list):
    models_list = models_response
elif hasattr(models_response, 'data'):
    models_list = models_response.data
else:
    models_list = [models_response]

print("Available models:")
found_custom_model = False
for model in models_list:
    print(f"  - {model.identifier}")
    # Check for customized model with or without nvidia/ prefix
    if (CUSTOMIZED_MODEL in str(model.identifier) or 
        f"nvidia/{CUSTOMIZED_MODEL}" in str(model.identifier)):
        print(f"    ✓ Found our customized model!")
        found_custom_model = True

if not found_custom_model:
    print(f"\n⚠️ Customized model '{CUSTOMIZED_MODEL}' not yet in model list.")
    print(f"   This is normal - the model may still be loading or may not appear in the list.")
    print(f"   The nvidia provider can serve models not in the registry.")
    print(f"\n   Let's test if it works for inference...")

INFO:httpx:HTTP Request: GET http://localhost:8321/v1/models "HTTP/1.1 200 OK"


Available models:
  - nvidia/nv-rerank-qa-mistral-4b:1
  - nvidia/nvidia/nv-rerankqa-mistral-4b-v3
  - nvidia/nvidia/llama-3.2-nv-rerankqa-1b-v2

⚠️ Customized model 'default/test-example-model@v1' not yet in model list.
   This is normal - the model may still be loading or may not appear in the list.
   The nvidia provider can serve models not in the registry.

   Let's test if it works for inference...


In [22]:
# Check if model is registered in Entity Store
import requests

print(f"Checking Entity Store for model: {CUSTOMIZED_MODEL}\n")

# List all models in Entity Store
response = requests.get(f"{ENTITY_STORE_URL}/v1/models")
if response.status_code == 200:
    models_data = response.json()
    
    # Handle different response formats
    if isinstance(models_data, dict) and 'data' in models_data:
        models = models_data['data']
    elif isinstance(models_data, list):
        models = models_data
    else:
        models = [models_data]
    
    print(f"Found {len(models)} models in Entity Store")
    
    # Look for our model
    found = False
    for model in models:
        # Handle both string IDs and object responses
        if isinstance(model, str):
            model_id = model
            # Check if our model name is in the ID
            if CUSTOMIZED_MODEL in model_id or JOB_ID in model_id:
                print(f"\n✅ Found customized model in Entity Store!")
                print(f"   Model ID: {model_id}")
                found = True
                
                # Get detailed info
                detail_resp = requests.get(f"{ENTITY_STORE_URL}/v1/models/{model_id}")
                if detail_resp.status_code == 200:
                    print(f"\n   Model details:")
                    print(json.dumps(detail_resp.json(), indent=2))
                break
        elif isinstance(model, dict):
            model_name = model.get('name', model.get('id', ''))
            model_ns = model.get('namespace', '')
            full_name = f"{model_ns}/{model_name}" if model_ns else model_name
            
            if CUSTOMIZED_MODEL in full_name or JOB_ID in str(model):
                print(f"\n✅ Found customized model in Entity Store!")
                print(f"   Name: {model_name}")
                print(f"   Namespace: {model_ns}")
                print(f"   Status: {model.get('artifact', {}).get('status')}")
                print(f"\n   Model details:")
                print(json.dumps(model, indent=2))
                found = True
                break
    
    if not found:
        print(f"\n❌ Customized model NOT found in Entity Store!")
        print(f"\n   Looking for: {CUSTOMIZED_MODEL}")
        print(f"   Or job ID: {JOB_ID}")
        print(f"\n   Available models:")
        for idx, model in enumerate(models[:20]):  # Show first 20
            if isinstance(model, str):
                print(f"     {idx+1}. {model}")
            else:
                model_id = model.get('name', model.get('id', 'unknown'))
                print(f"     {idx+1}. {model_id}")
        
        print(f"\n   ⚠️ The Customizer may not have registered the model yet.")
        print(f"   Check the customization job status and logs.")
else:
    print(f"❌ Failed to query Entity Store: {response.status_code}")
    print(f"   Response: {response.text}")

Checking Entity Store for model: default/test-example-model@v1

Found 1 models in Entity Store

✅ Found customized model in Entity Store!
   Name: test-example-model@v1
   Namespace: default
   Status: upload_completed

   Model details:
{
  "created_at": "2025-11-19T09:43:21.838315",
  "updated_at": "2025-11-19T09:43:21.838318",
  "name": "test-example-model@v1",
  "namespace": "default",
  "description": "None",
  "spec": {
    "num_parameters": 1000000000,
    "context_size": 4096,
    "num_virtual_tokens": 0,
    "is_chat": true
  },
  "artifact": {
    "gpu_arch": "Ampere",
    "precision": "bf16-mixed",
    "tensor_parallelism": 1,
    "backend_engine": "nemo",
    "status": "upload_completed",
    "files_url": "hf://default/test-example-model@v1"
  },
  "base_model": "meta/llama-3.2-1b-instruct",
  "peft": {
    "finetuning_type": "lora"
  },
  "schema_version": "1.0",
  "project": "llamastack-project",
  "custom_fields": {}
}


In [None]:
# FIX: Update CUSTOMIZED_MODEL to use the actual output model
if our_job and hasattr(our_job, 'output_model'):
    CUSTOMIZED_MODEL = our_job.output_model
    print(f"✅ Updated CUSTOMIZED_MODEL to actual output: {CUSTOMIZED_MODEL}")
else:
    print(f"⚠️ Could not get output_model from job")
    print(f"   Current CUSTOMIZED_MODEL: {CUSTOMIZED_MODEL}")

In [23]:
CUSTOMIZED_MODEL

'default/test-example-model@v1'

# Part IV: Model Evaluation via LlamaStack

Evaluate both the base model and fine-tuned model using NeMo Evaluator through LlamaStack.

## Register Benchmark Configuration

### Register Base Model in Entity Store

**Required for Evaluation**: The NeMo Evaluator fetches model information from Entity Store.
We need to register the base model before running evaluations.

In [24]:
# Register base model in Entity Store (required for evaluator)
import requests

print(f"Registering base model in Entity Store: {BASE_MODEL}\n")

response = requests.post(
    f"{ENTITY_STORE_URL}/v1/models",
    json={
        "name": BASE_MODEL.replace('/', '-'),  # Entity Store doesn't allow '/' in names
        "namespace": "default",
        "description": "Base Llama 3.2 1B Instruct model",
        "project": "tool_calling",
        "spec": {
            "num_parameters": 1000000000,
            "context_size": 4096,
            "num_virtual_tokens": 0,
            "is_chat": True
        },
        "artifact": {
            "gpu_arch": "Ampere",
            "precision": "bf16-mixed",
            "tensor_parallelism": 1,
            "backend_engine": "nemo",
            "status": "upload_completed",
            "files_url": f"nim://{BASE_MODEL}"
        }
    }
)

if response.status_code in (200, 201):
    print("✅ Base model registered in Entity Store!")
    print(json.dumps(response.json(), indent=2))
elif response.status_code == 409:
    print("⚠️ Base model already exists in Entity Store - continuing...")
else:
    print(f"❌ Failed to register base model: {response.status_code}")
    print(f"Response: {response.text}")
    print("\n⚠️ Evaluation may fail without base model registration")

Registering base model in Entity Store: meta/llama-3.2-1b-instruct

✅ Base model registered in Entity Store!
{
  "created_at": "2025-11-19T09:55:16.562102",
  "updated_at": "2025-11-19T09:55:16.562104",
  "name": "meta-llama-3.2-1b-instruct",
  "namespace": "default",
  "description": "Base Llama 3.2 1B Instruct model",
  "spec": {
    "num_parameters": 1000000000,
    "context_size": 4096,
    "num_virtual_tokens": 0,
    "is_chat": true
  },
  "artifact": {
    "gpu_arch": "Ampere",
    "precision": "bf16-mixed",
    "tensor_parallelism": 1,
    "backend_engine": "nemo",
    "status": "upload_completed",
    "files_url": "nim://meta/llama-3.2-1b-instruct"
  },
  "base_model": null,
  "api_endpoint": null,
  "peft": null,
  "prompt": null,
  "guardrails": null,
  "schema_version": "1.0",
  "project": "tool_calling",
  "custom_fields": {},
  "ownership": null
}


In [25]:
# Create benchmark for tool calling evaluation
benchmark_id = "xlam-tool-calling-eval"

benchmark_metadata = {
    "type": "custom",
    "params": {"parallelism": 8},
    "tasks": {
        "tool-calling-accuracy": {
            "type": "chat-completion",
            "params": {
                "template": {
                    "messages": "{{ item.messages | tojson}}",
                    "tools": "{{ item.tools | tojson }}",
                    "tool_choice": "auto"
                }
            },
            "dataset": {
                "files_url": f"hf://datasets/{NMS_NAMESPACE}/{DATASET_NAME}/testing/xlam-test-single.jsonl",
                "limit": 50
            },
            "metrics": {
                "tool-calling-accuracy": {
                    "type": "tool-calling",
                    "params": {"tool_calls_ground_truth": "{{ item.tool_calls | tojson }}"}
                }
            }
        }
    }
}

# Register benchmark via LlamaStack
benchmark = client.alpha.benchmarks.register(
    benchmark_id=benchmark_id,
    dataset_id=f"{NMS_NAMESPACE}/{DATASET_NAME}",
    scoring_functions=[],
    metadata=benchmark_metadata
)

print(f"✓ Benchmark registered: {benchmark_id}")

  benchmark = client.alpha.benchmarks.register(
INFO:httpx:HTTP Request: POST http://localhost:8321/v1alpha/eval/benchmarks "HTTP/1.1 200 OK"


✓ Benchmark registered: xlam-tool-calling-eval


In [40]:
def wait_eval_job(benchmark_id: str, job_id: str, polling_interval: int = 10, timeout: int = 600):
    """
    Monitor an evaluation job until completion.
    """
    start_time = time()
    print(f"Monitoring evaluation job: {job_id}\n")
    
    while True:
        elapsed = time() - start_time
        if elapsed > timeout:
            raise RuntimeError(f"Evaluation {job_id} exceeded timeout of {timeout}s")
        
        # Get job status via LlamaStack
        job_status = client.alpha.eval.jobs.status(
            benchmark_id=benchmark_id,
            job_id=job_id
        )
        
        status = job_status.status
        print(f"[{elapsed:.0f}s] Status: {status}")
        
        if status not in ["scheduled", "in_progress"]:
            print(f"\n✓ Evaluation completed with status: {status}")
            
            # Retrieve results
            results = client.alpha.eval.jobs.retrieve(
                benchmark_id=benchmark_id,
                job_id=job_id
            )
            return results
        
        sleep(polling_interval)


## Evaluate Fine-Tuned Model

In [28]:
# Run evaluation on fine-tuned model
print(f"Starting evaluation of fine-tuned model: {CUSTOMIZED_MODEL}")

ft_eval = client.alpha.eval.run_eval(
    benchmark_id=benchmark_id,
    benchmark_config={
        "eval_candidate": {
            "type": "model",
            "model": CUSTOMIZED_MODEL,
            "sampling_params": {
                "temperature": 0.1,
                "top_p": 0.7,
                "max_tokens": 512
            }
        }
    }
)

ft_eval_job_id = ft_eval.job_id
print(f"✓ Fine-tuned model evaluation started: {ft_eval_job_id}")

Starting evaluation of fine-tuned model: default/test-example-model@v1


INFO:httpx:HTTP Request: POST http://localhost:8321/v1alpha/eval/benchmarks/xlam-tool-calling-eval/jobs "HTTP/1.1 200 OK"


✓ Fine-tuned model evaluation started: eval-9x4GYzFoYhvYEG1rEzok7g


In [29]:
# Wait for fine-tuned model evaluation
ft_results = wait_eval_job(benchmark_id, ft_eval_job_id)

INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/eval/benchmarks/xlam-tool-calling-eval/jobs/eval-9x4GYzFoYhvYEG1rEzok7g "HTTP/1.1 200 OK"


Monitoring evaluation job: eval-9x4GYzFoYhvYEG1rEzok7g

[0s] Status: in_progress


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/eval/benchmarks/xlam-tool-calling-eval/jobs/eval-9x4GYzFoYhvYEG1rEzok7g "HTTP/1.1 200 OK"


[10s] Status: in_progress


INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/eval/benchmarks/xlam-tool-calling-eval/jobs/eval-9x4GYzFoYhvYEG1rEzok7g "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://localhost:8321/v1alpha/eval/benchmarks/xlam-tool-calling-eval/jobs/eval-9x4GYzFoYhvYEG1rEzok7g/result "HTTP/1.1 200 OK"


[21s] Status: completed

✓ Evaluation completed with status: completed


In [30]:
# Extract and display fine-tuned model metrics
# The results are in .scores, not .metrics
ft_scores = ft_results.scores[benchmark_id]
ft_aggregated = ft_scores.aggregated_results

print("\n=== Fine-Tuned Model Results ===")
print(json.dumps(ft_aggregated, indent=2))

# Extract accuracy metrics
task_results = ft_aggregated['tasks']['tool-calling-accuracy']
metrics = task_results['metrics']['tool-calling-accuracy']['scores']

ft_fn_accuracy = metrics['function_name_accuracy']['value']
ft_fn_args_accuracy = metrics['function_name_and_args_accuracy']['value']

print(f"\nFine-Tuned Model Performance:")
print(f"  Function Name Accuracy: {ft_fn_accuracy:.1%}")
print(f"  Function + Args Accuracy: {ft_fn_args_accuracy:.1%}")


=== Fine-Tuned Model Results ===
{
  "created_at": "2025-11-19T09:55:51.921981",
  "updated_at": "2025-11-19T09:55:51.921982",
  "id": "evaluation_result-9USmfdMrzcyMd3HuwbMHX8",
  "job": "eval-9x4GYzFoYhvYEG1rEzok7g",
  "tasks": {
    "tool-calling-accuracy": {
      "metrics": {
        "tool-calling-accuracy": {
          "scores": {
            "function_name_accuracy": {
              "value": 0.96,
              "stats": {
                "count": 50,
                "sum": 48.0,
                "mean": 0.96
              }
            },
            "function_name_and_args_accuracy": {
              "value": 0.66,
              "stats": {
                "count": 50,
                "sum": 33.0,
                "mean": 0.66
              }
            }
          }
        }
      }
    }
  },
  "groups": {},
  "namespace": "default",
  "custom_fields": {}
}

Fine-Tuned Model Performance:
  Function Name Accuracy: 96.0%
  Function + Args Accuracy: 66.0%


# Part V: Inference & Safety via LlamaStack

Test the fine-tuned model with tool calling and apply safety guardrails.

## Sample Inference with Tool Calling

In [37]:
# Load test samples
test_samples = list(read_jsonl(test_fp))
print(f"Loaded {len(test_samples)} test samples")

# Pick a random sample
test_sample = random.choice(test_samples)
print("\nTest query:")
print(f"  {test_sample['messages'][0]['content']}")

print("\nAvailable tools:")
for tool in test_sample['tools']:
    print(f"  - {tool['function']['name']}: {tool['function']['description'][:60]}...")

Loaded 713 test samples

Test query:
  List the latest sold artworks from the 'Art Blocks' gallery, sorted by date sold.

Available tools:
  - expiry_date_enquiry: Queries the expiry date of a vehicle's license registration ...
  - latest_sold: Fetches a list of recently sold artworks, with optional filt...
  - search: Search for individuals by their last and optionally first na...


In [38]:
# Run inference on fine-tuned model via LlamaStack
response = client.chat.completions.create(
    model=f"nvidia/{CUSTOMIZED_MODEL}",
    messages=test_sample["messages"],
    tools=test_sample["tools"],
    tool_choice="auto",
    temperature=0.1,
    top_p=0.7,
    max_tokens=512,
    stream=False
)

print("\n=== Model Prediction ===")
if response.choices[0].message.tool_calls:
    for tc in response.choices[0].message.tool_calls:
        print(f"Tool: {tc.function.name}")
        print(f"Arguments: {tc.function.arguments}")
else:
    print("No tool calls generated")

print("\n=== Ground Truth ===")
for tc in test_sample['tool_calls']:
    print(f"Tool: {tc['function']['name']}")
    print(f"Arguments: {tc['function']['arguments']}")

INFO:httpx:HTTP Request: POST http://localhost:8321/v1/chat/completions "HTTP/1.1 200 OK"



=== Model Prediction ===
Tool: latest_sold
Arguments: {"gallery": "Art Blocks", "sort": "date_sold"}

=== Ground Truth ===
Tool: latest_sold
Arguments: {'gallery': 'Art Blocks', 'sort': 'date_sold'}
