In [None]:
# Install required Python packages for this notebook environment
%pip install \
  huggingface_hub \
  "transformers>=4.36.0" \
  peft \
  datasets \
  trl \
  jsonschema \
  litellm \
  "jinja2>=3.1.0" \
  "torch>=2.0.0" \
  openai \
  jupyterlab \
  "llama-stack>=0.2.17" \
  "sqlalchemy" \
  "greenlet" \
  "faiss-cpu" \
  "aiosqlite" \
  "opentelemetry-sdk" \
  "opentelemetry-api" \
  "opentelemetry-exporter-otlp" \
  "mcp" 


In [None]:
import os
import json
import random
from pprint import pprint
from typing import Any, Dict, List, Union

import numpy as np
import torch
from datasets import load_dataset
import requests

In [None]:
SEED = 1234

# Limits to at most N tool properties
LIMIT_TOOL_PROPERTIES = 8

torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
np.random.seed(SEED)
random.seed(SEED)

In [None]:
# Processed data will be stored here
DATA_ROOT = os.path.join(os.getcwd(), "sample_data")
CUSTOMIZATION_DATA_ROOT = os.path.join(DATA_ROOT, "customization")
VALIDATION_DATA_ROOT = os.path.join(DATA_ROOT, "validation")
EVALUATION_DATA_ROOT = os.path.join(DATA_ROOT, "evaluation")

os.makedirs(DATA_ROOT, exist_ok=True)
os.makedirs(CUSTOMIZATION_DATA_ROOT, exist_ok=True)
os.makedirs(VALIDATION_DATA_ROOT, exist_ok=True)
os.makedirs(EVALUATION_DATA_ROOT, exist_ok=True)

In [None]:
from config import HF_TOKEN

os.environ["HF_TOKEN"] = HF_TOKEN
os.environ["HF_ENDPOINT"] = "https://huggingface.co"

In [None]:
# Download from Hugging Face
dataset = load_dataset("Salesforce/xlam-function-calling-60k")

# Inspect a sample
example = dataset['train'][0]
pprint(example)

In [None]:
def normalize_type(param_type: str) -> str:
    """
    Normalize Python type hints and parameter definitions to OpenAI function spec types.

    Args:
        param_type: Type string that could include default values or complex types

    Returns:
        Normalized type string according to OpenAI function spec
    """
    # Remove whitespace
    param_type = param_type.strip()

    # Handle types with default values (e.g. "str, default='London'")
    if "," in param_type and "default" in param_type:
        param_type = param_type.split(",")[0].strip()

    # Handle types with just default values (e.g. "default='London'")
    if param_type.startswith("default="):
        return "string"  # Default to string if only default value is given

    # Remove ", optional" suffix if present
    param_type = param_type.replace(", optional", "").strip()

    # Handle complex types
    if param_type.startswith("Callable"):
        return "string"  # Represent callable as string in JSON schema
    if param_type.startswith("Tuple"):
        return "array"  # Represent tuple as array in JSON schema
    if param_type.startswith("List["):
        return "array"
    if param_type.startswith("Set") or param_type == "set":
        return "array"  # Represent set as array in JSON schema

    # Map common type variations to OpenAI spec types
    type_mapping: Dict[str, str] = {
        "str": "string",
        "int": "integer",
        "float": "number",
        "bool": "boolean",
        "list": "array",
        "dict": "object",
        "List": "array",
        "Dict": "object",
        "set": "array",
        "Set": "array"
    }

    if param_type in type_mapping:
        return type_mapping[param_type]
    else:
        print(f"Unknown type: {param_type}")
        return "string"  # Default to string for unknown types


def convert_tools_to_openai_spec(tools: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
    # If tools is a string, try to parse it as JSON
    if isinstance(tools, str):
        try:
            tools = json.loads(tools)
        except json.JSONDecodeError as e:
            print(f"Failed to parse tools string as JSON: {e}")
            return []

    # Ensure tools is a list
    if not isinstance(tools, list):
        print(f"Expected tools to be a list, but got {type(tools)}")
        return []

    openai_tools: List[Dict[str, Any]] = []
    for tool in tools:
        # Check if tool is a dictionary
        if not isinstance(tool, dict):
            print(f"Expected tool to be a dictionary, but got {type(tool)}")
            continue

        # Check if 'parameters' is a dictionary
        if not isinstance(tool.get("parameters"), dict):
            print(f"Expected 'parameters' to be a dictionary, but got {type(tool.get('parameters'))} for tool: {tool}")
            continue



        normalized_parameters: Dict[str, Dict[str, Any]] = {}
        for param_name, param_info in tool["parameters"].items():
            if not isinstance(param_info, dict):
                print(
                    f"Expected parameter info to be a dictionary, but got {type(param_info)} for parameter: {param_name}"
                )
                continue

            # Create parameter info without default first
            param_dict = {
                "description": param_info.get("description", ""),
                "type": normalize_type(param_info.get("type", "")),
            }

            # Only add default if it exists, is not None, and is not an empty string
            default_value = param_info.get("default")
            if default_value is not None and default_value != "":
                param_dict["default"] = default_value

            normalized_parameters[param_name] = param_dict

        openai_tool = {
            "type": "function",
            "function": {
                "name": tool["name"],
                "description": tool["description"],
                "parameters": {"type": "object", "properties": normalized_parameters},
            },
        }
        openai_tools.append(openai_tool)
    return openai_tools


def save_jsonl(filename, data):
    """Write a list of json objects to a .jsonl file"""
    with open(filename, "w") as f:
        for entry in data:
            f.write(json.dumps(entry) + "\n")


def convert_tool_calls(xlam_tools):
    """Convert XLAM tool format to OpenAI's tool schema."""
    tools = []
    for tool in json.loads(xlam_tools):
        tools.append({"type": "function", "function": {"name": tool["name"], "arguments": tool.get("arguments", {})}})
    return tools


def convert_example(example, dataset_type='single'):
    """Convert an XLAM dataset example to OpenAI format."""
    obj = {"messages": []}

    # User message
    obj["messages"].append({"role": "user", "content": example["query"]})

    # Tools
    if example.get("tools"):
        obj["tools"] = convert_tools_to_openai_spec(example["tools"])

    # Assistant message
    assistant_message = {"role": "assistant", "content": ""}
    if example.get("answers"):
        tool_calls = convert_tool_calls(example["answers"])

        if dataset_type == "single":
            # Only include examples with a single tool call
            if len(tool_calls) == 1:
                assistant_message["tool_calls"] = tool_calls
            else:
                return None
        else:
            # For other dataset types, include all tool calls
            assistant_message["tool_calls"] = tool_calls

    obj["messages"].append(assistant_message)

    return obj

In [None]:
convert_example(example)

In [None]:
all_examples = []
with open(os.path.join(DATA_ROOT, "xlam_openai_format.jsonl"), "w") as f:
    for example in dataset["train"]:
        converted = convert_example(example)
        if converted is not None:
            all_examples.append(converted)
            f.write(json.dumps(converted) + "\n")


In [None]:
NUM_EXAMPLES = 5000

assert NUM_EXAMPLES <= len(all_examples), f"{NUM_EXAMPLES} exceeds the total number of available ({len(all_examples)}) data points"

In [None]:
sampled_examples = random.sample(all_examples, NUM_EXAMPLES)

# Split into 70% training, 15% validation, 15% testing
train_size = int(0.7 * len(sampled_examples))
val_size = int(0.15 * len(sampled_examples))

train_data = sampled_examples[:train_size]
val_data = sampled_examples[train_size : train_size + val_size]
test_data = sampled_examples[train_size + val_size :]

# Save the training and validation splits. We will use test split in the next section
save_jsonl(os.path.join(CUSTOMIZATION_DATA_ROOT, "training.jsonl"), train_data)
save_jsonl(os.path.join(VALIDATION_DATA_ROOT,"validation.jsonl"), val_data)

In [None]:
def convert_example_eval(entry):
    """Convert a single entry in the dataset to the evaluator format"""

    # Note: This is a WAR for a known bug with tool calling in NIM
    for tool in entry["tools"]:
        if len(tool["function"]["parameters"]["properties"]) > LIMIT_TOOL_PROPERTIES:
            return None

    new_entry = {
        "messages": [],
        "tools": entry["tools"],
        "tool_calls": []
    }

    for msg in entry["messages"]:
        if msg["role"] == "assistant" and "tool_calls" in msg:
            new_entry["tool_calls"] = msg["tool_calls"]
        else:
            new_entry["messages"].append(msg)

    return new_entry

def convert_dataset_eval(data):
    """Convert the entire dataset for evaluation by restructuring the data format."""
    return [result for entry in data if (result := convert_example_eval(entry)) is not None]

In [None]:
test_data_eval = convert_dataset_eval(test_data)
save_jsonl(os.path.join(EVALUATION_DATA_ROOT, "xlam-test-single.jsonl"), test_data_eval)

In [None]:
from config import *

In [None]:
# Metadata associated with Datasets and Customization Jobs
os.environ["NVIDIA_DATASET_NAMESPACE"] = NMS_NAMESPACE
os.environ["NVIDIA_PROJECT_ID"] = PROJECT_ID

## Inference env vars
os.environ["NVIDIA_BASE_URL"] = NIM_URL

# Data Store env vars
os.environ["NVIDIA_DATASETS_URL"] = ENTITY_STORE_URL

## Customizer env vars
os.environ["NVIDIA_CUSTOMIZER_URL"] = CUSTOMIZER_URL
os.environ["NVIDIA_OUTPUT_MODEL_DIR"] = CUSTOMIZED_MODEL_DIR

# Evaluator env vars
os.environ["NVIDIA_EVALUATOR_URL"] = EVALUATOR_URL

# Guardrails env vars
os.environ["GUARDRAILS_SERVICE_URL"] = GUARDRAILS_URL

In [None]:
train_fp = f"{CUSTOMIZATION_DATA_ROOT}/training.jsonl"
assert os.path.exists(train_fp), f"The training data at '{train_fp}' does not exist. Please ensure that the data was prepared successfully."

val_fp = f"{VALIDATION_DATA_ROOT}/validation.jsonl"
assert os.path.exists(val_fp), f"The validation data at '{val_fp}' does not exist. Please ensure that the data was prepared successfully."

test_fp = f"{EVALUATION_DATA_ROOT}/xlam-test-single.jsonl"
assert os.path.exists(test_fp), f"The test data at '{test_fp}' does not exist. Please ensure that the data was prepared successfully."


In [None]:
def create_namespaces(entity_host, ds_host, namespace):
    # Create namespace in Entity Store
    entity_store_url = f"{entity_host}/v1/namespaces"
    resp = requests.post(entity_store_url, json={"id": namespace})
    assert resp.status_code in (200, 201, 409, 422), \
        f"Unexpected response from Entity Store during namespace creation: {resp.status_code}"
    print(resp)

    # Create namespace in Data Store
    nds_url = f"{ds_host}/v1/datastore/namespaces"
    resp = requests.post(nds_url, data={"namespace": namespace})
    assert resp.status_code in (200, 201, 409, 422), \
        f"Unexpected response from Data Store during namespace creation: {resp.status_code}"
    print(resp)

In [None]:
create_namespaces(entity_host=ENTITY_STORE_URL, ds_host=DATA_STORE_URL, namespace=NMS_NAMESPACE)

In [None]:
res = requests.get(f"{DATA_STORE_URL}/v1/datastore/namespaces/{NMS_NAMESPACE}")
print(f"Data Store Status Code: {res.status_code}\nResponse JSON: {json.dumps(res.json(), indent=2)}")

# Verify Namespace in Entity Store
res = requests.get(f"{ENTITY_STORE_URL}/v1/namespaces/{NMS_NAMESPACE}")
print(f"Entity Store Status Code: {res.status_code}\nResponse JSON: {json.dumps(res.json(), indent=2)}")

In [None]:
repo_id = f"{NMS_NAMESPACE}/{DATASET_NAME}"
print(repo_id)

In [None]:
from huggingface_hub import HfApi

hf_api = HfApi(endpoint=f"{DATA_STORE_URL}/v1/hf", token="")

# Create repo
hf_api.create_repo(
    repo_id=repo_id,
    repo_type='dataset',
)

In [None]:
hf_api.upload_file(path_or_fileobj=train_fp,
    path_in_repo="training/training.jsonl",
    repo_id=repo_id,
    repo_type='dataset',
)

hf_api.upload_file(path_or_fileobj=val_fp,
    path_in_repo="validation/validation.jsonl",
    repo_id=repo_id,
    repo_type='dataset',
)

hf_api.upload_file(path_or_fileobj=test_fp,
    path_in_repo="testing/xlam-test-single.jsonl",
    repo_id=repo_id,
    repo_type='dataset',
)