# VLM Finetuning Microservices Workflow with TAO 

[NVIDIA TAO](https://docs.nvidia.com/tao/tao-toolkit/text/overview.html) is a framework for customizing and optimizing vision-related models, to achieve higher accuracy and better performance. In TAO 6.25.10 release, we introduce VLM into our Finetuning Microservices (FTMS). This allows customers to finetune pre-trained VLMs like **Cosmos Reason**, with video/image-text data at scale.

This Notebook will go over the steps to **finetune [Cosmos Reason](https://huggingface.co/nvidia/Cosmos-Reason1-7B) with [TAO FTMS](https://docs.nvidia.com/tao/tao-toolkit/text/tao_toolkit_api/api_overview.html)**. And how to achieve higher accuracy with **AutoML**. 


For details on example fine-tuning use cases, please check out our two fine-tuning cookbooks for Cosmos Reason: [Reason for Visual Q&A in ITS](https://nvidia-cosmos.github.io/cosmos-cookbook/recipes/post_training/reason1/intelligent-transportation/post_training.html) and [Reason for Warehouse Safety](https://nvidia-cosmos.github.io/cosmos-cookbook/recipes/post_training/reason1/spatial-ai-warehouse/post_training.html). 

![Finetuning Workflow](../example_images/finetuning_workflow.png)




### Table of contents

1. [TAO FTMS Prerequisites](#head-1)
1. [Dataset Preparation for VLM Fine-tuning](#head-3)
1. [Experiments](#head-4)
1. [AutoML Configuration](#head-4-4) 
1. [Launch Fine-Tuning](#head-5)
1. [Model Evaluate](#head-6)
1. [Model Quantization](#head-7)
1. [Inference and Inference Microservice](#head-8)
1. [Finish Experiment and Cleanup](#head-9)
1. [Model Deployment](#head-10)

## 1. Prerequisites <a class="anchor" id="head-1"></a>

### 1.1 TAO API service

The TAO API is a cloud service for end‚Äëto‚Äëend model development. With a few calls you can import cloud datasets, pull pretrained models and default specs from the Nvidia Cloud Registry (NGC), train, evaluate, optimize, and export models for edge/cloud deployment‚Äî all on GPU‚Äëpowered, multi‚Äënode clusters.

To get started with TAO APIs:

**Hardware and Software Minimum Requirements:**

- Minimum 8x A100 GPUs with at least 80 GiB GPU memory.
- OS: Ubuntu 22.04+
- Drivers: 570+
- CUDA: 12.8+
- Python: 3.12+

**Setup TAO APIs**

- Follow [TAO API deployment steps](https://docs.nvidia.com/tao/tao-toolkit/text/tao_toolkit_api/api_setup.html#deployment-steps) 1-8
- After successfully installing these steps, you will have a server setup with Kubernetes (K8s) and TAO APIs

### 1.2 Set Required Parameters

Before running this notebook, ensure you have the following information:

1. **Host URL:** The host URL is the external access point to a Kubernetes service, constructed using the node‚Äôs IP address and the service‚Äôs exposed NodePort. Example: http://<ip_address>:<port_number>
1. **Finetuning mode:**: Full SFT or PEFT LORA
1. **NGC Key:** Your NGC (NVIDIA GPU Cloud) API key.
1. **Huggingface token:** Huggingface token obtained from [here](https://huggingface.co/settings/tokens).
1. **NGC Organization Name:** The name of your NGC organization.
1. **Cloud Storage Details:** Set your cloud storage details (e.g., bucket name, region).
1. **Datasets Path:** The path of datasets relative to the cloud storage bucket.

Replace the **FIXME** placeholders in the code cells below with the appropriate values.

In [None]:
import json
import requests
import time
from IPython.display import clear_output

In [None]:
%store -r model_name
%store -r finetuning_mode
%store -r automl_algorithm
%store -r automl_max_recommendations
%store -r base_url
%store -r headers
%store -r workspace_id
%store -r train_dataset_id
%store -r eval_dataset_id
%store -r experiment_id
%store -r job_map

In [None]:
model_name = "cosmos-rl"
finetuning_mode = "lora" # FIXME2. lora or full
%store model_name
%store finetuning_mode

#### Configure AutoML Parameters

[AutoML documentation](https://docs.nvidia.com/tao/tao-toolkit/text/automl/automl.html#getting-started)


In [None]:
# AutoML Configuration
automl_algorithm = "bayesian"
automl_max_recommendations = 5  # Number of AutoML experiments to run
%store automl_algorithm
%store automl_max_recommendations

#### 1.2.2 Set API service's host information

The steps in 1.1 will install a k8 server and TAO APIs, and once that is installed, you will need a host_url to call the APIs running on the current host.
To access the host_url: 

In the host machine, node ip_address and port number can be obtained as follows,
- **ip_address**: hostname -i
- **port_number**: kubectl get service ingress-nginx-controller -o jsonpath='{.spec.ports[0].nodePort}'

In [None]:
# Port number is 32080 for K8s deployment and 8090 for docker-compose deployment
# IP address is the IP address of the host machine for K8s deployment and localhost for docker-compose deployment
host_url = "http://<ip_address>:<port_number>" # FIXME1. eg: https://10.137.149.22:32080

#### 1.2.3 Set NGC Personal key for authentication and NGC org to access API services

- **ngc_key**: [How to access NGC key](https://docs.nvidia.com/ai-enterprise/deployment/spark-rapids-accelerator/latest/appendix-ngc.html#ngc-api-key)
- **ngc_org_name**: [How to access NGC org Name](https://docs.nvidia.com/ngc/gpu-cloud/ngc-user-guide/index.html#accessing-ngc-org)

In [None]:
ngc_key = "<ngc_personal_key>" # FIXME2. Make sure to add NGC Personal key
hf_token = "<huggingface_token>" # FIXME3. Add your Huggingface token - needed so that Huggingface doesn't rate limit you.
ngc_org_name = "nvstaging" # FIXME4. Add your NGC ORG

## 2. Login to the host <a class="anchor" id="head-2"></a>
The JWT (JSON Web Token) is a secure authentication mechanism used by the TAO Finetuning Microservices. When you authenticate with your NGC credentials, the API returns this token, which is then used for all subsequent API calls. This token has a limited lifetime and represents your authenticated session.

The following cell ensures you are able to access the service and generate a JWT Token

In [None]:
# Validate NGC_PERSONAL_KEY
data = json.dumps({"ngc_org_name": ngc_org_name,
                   "ngc_key": ngc_key})
response = requests.post(f"{host_url}/api/v1/login", data=data)
token = response.json()["token"]
print("JWT",token)

# Set base URL
base_url = f"{host_url}/api/v1/orgs/{ngc_org_name}"
print("API Calls will be forwarded to",base_url)

headers = {"Authorization": f"Bearer {token}"}
%store base_url
%store headers

### 2.1 Create cloud workspace
This creates a workspace that links your TAO Finetuning Microservices session to your cloud storage. The API will use these credentials to:

- Pull datasets from your bucket
- Store training results and checkpoints
- Upload evaluation results

If you want to have different workspaces for datasets and experiments, duplicate the workspace creation part and adjust the metadata accordingly.

In [None]:
# FIXME5 Cloud bucket details to access datasets and store experiment results
cloud_metadata = {
    "name": "tao_workspace",
    "cloud_type": "aws",
    "cloud_specific_details": {
        "cloud_region": "us-west-1",
        "cloud_bucket_name": "",
        "access_key": "",
        "secret_key": "",
    }
}

In [None]:
data = json.dumps(cloud_metadata)

endpoint = f"{base_url}/workspaces"

response = requests.post(endpoint,data=data,headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

workspace_id = response.json()["id"]
%store workspace_id

## 3.1 Dataset Preparation <a class="anchor" id="head-3"></a>

For Cosmos-RL finetuning, we expect the directory tree to follow this structure:

```
<any folder in cloud bucket>/
‚îú‚îÄ‚îÄ images.tar.gz
‚îú‚îÄ‚îÄ annotations.tar.gz
```

### Data Preparation

For this experiment, we will demonstrate on the [Physical AI Spatial Intelligence Warehouse dataset](https://huggingface.co/datasets/nvidia/PhysicalAI-Spatial-Intelligence-Warehouse/tree/main). This is a completely synthetic dataset of a warehouse with 95K images along with around 500k annotations : Q&A pairs with related meta information in LLaVA format for VLM training.  Tasks included distance, counting, multiple-choice grounding, and spatial relation reasoning.

Below example shows the RGB frame, depth map, annotated regions, the corresponding question, and sample answers.
The distribution of question types demonstrated the diversity of reasoning skills required across tasks.

<img src="assets/data_overview.png" width="960"/>

<br>

#### Sample JSON Entry

Specifically, the annotation contains several additional attributes compared to general [LLaVa format](https://github.com/haotian-liu/LLaVA/blob/main/docs/Finetune_Custom_Data.md):

- **normalized_answer** field for quantitative evaluation with accuracy and error metrics between ground-truth and predicted answer.
- **freeform_answer** field, which is the original answer from 'gpt'.
- **rle** denotes the corresponding masks per object in pycoco format.
- **category** denotes the question category. The categories are left_right, multi_choice_question(mcq), distance, and count.

Here's an example of the annotation format:

```json
{
    "id": "9d17ba0ab1df403db91877fe220e4658",
    "image": "000190.png",
    "conversations": [
      {
        "from": "human",
        "value": "<image>\nCould you measure the distance between the pallet <mask> and the pallet <mask>?"
      },
      {
        "from": "gpt",
        "value": "The pallet [Region 0] is 6.36 meters from the pallet [Region 1]."
      }
    ],
    "rle": [
      {
      "size": [
          1080,
          1920
      ],
      "counts": "bngl081MYQ19010ON2jMDmROa0ol01_RO2^m0`0PRODkm0o0bQOUO[n0U2N2M3N2N2N3L3N2N1N1WO_L]SO"
      },
      {
      "size": [
          1080,
          1920
      ],
      "counts": "^PmU1j1no000000000000000000001O0000000000001O0000000000001O0000000000001O0000000000"
      }
    ],
    "category": "distance",
    "normalized_answer": "6.36",
    "freeform_answer": "The pallet [Region 0] is 6.36 meters from the pallet [Region 1]."
}
```


Follow the data pre-processing scripts [here](https://github.com/nvidia-cosmos/cosmos-cookbook/blob/main/docs/recipes/post_training/reason1/spatial-ai-warehouse/post_training.md#data-preprocessing) and then transfer the processed data onto your cloud storage for both your train and evaluation dataset

In [None]:
#FIXME6: Set paths relative to cloud bucket
train_dataset_path =  "/data/cosmos_rl_warehouse_train" # example train data is at workspace_dir/data/
eval_dataset_path = "/data/cosmos_rl_warehouse_eval"

### 3.2 Set dataset formats

The dataset format parameters define how your data is structured:
- `ds_type = "vlm"`: Indicates this is a Vision-Language Model dataset
- `ds_format = "llava"`: Uses the default format expected by Cosmos-RL

The "default" format for VLM datasets typically includes:
- Video files or image sequences
- Text annotations/captions

In [None]:
ds_type = "vlm"
ds_format = "llava"

### 3.3 Create and pull train dataset

Add all the training dataset metadata to *datasets* api.

In [None]:
# Create train dataset
train_dataset_metadata = {"type": ds_type,
                          "format": ds_format,
                          "workspace":workspace_id,
                          "cloud_file_path": train_dataset_path,
                          "use_for": ["training"]
                          }

data = json.dumps(train_dataset_metadata)

endpoint = f"{base_url}/datasets"

response = requests.post(endpoint,data=data,headers=headers)
print(response)
print(json.dumps(response.json(), indent=4))

train_dataset_id = response.json()["id"]
%store train_dataset_id

Below cell checks the download progress till the dataset pull is complete.

In [None]:
# Check progress
endpoint = f"{base_url}/datasets/{train_dataset_id}"

while True:
    clear_output(wait=True)
    response = requests.get(endpoint, headers=headers)

    print(response)
    print(json.dumps(response.json(), indent=4))
    if response.json().get("status") == "invalid_pull":
        raise ValueError("Dataset pull failed")
    if response.json().get("status") == "pull_complete":
        break
    time.sleep(5)

### 3.4 Create and pull evaluation dataset

Similar to training data, add eval dataset metadata as well. 

In [None]:
# Create validation dataset
eval_dataset_metadata = {"type": ds_type,
                          "format": ds_format,
                          "workspace":workspace_id,
                          "cloud_file_path": eval_dataset_path,
                          "use_for": ["evaluation"]
                          }

data = json.dumps(eval_dataset_metadata)

endpoint = f"{base_url}/datasets"

response = requests.post(endpoint,data=data,headers=headers)
print(response)
print(json.dumps(response.json(), indent=4))

eval_dataset_id = response.json()["id"]
%store eval_dataset_id

Below cell checks the download progress till the dataset pull is complete.

In [None]:
# Check progress
endpoint = f"{base_url}/datasets/{eval_dataset_id}"

while True:
    clear_output(wait=True)
    response = requests.get(endpoint, headers=headers)

    print(response)
    print(json.dumps(response.json(), indent=4))
    if response.json().get("status") == "invalid_pull":
        raise ValueError("Dataset pull failed")
    if response.json().get("status") == "pull_complete":
        break
    time.sleep(5)

##### 3.5 List the created datasets <a class="anchor" id="head-7"></a>

In [None]:
endpoint = f"{base_url}/datasets"

response = requests.get(endpoint, headers=headers)
print(response)

datasets = response.json()["datasets"]
for rsp in datasets:
    rsp_keys = rsp.keys()

# print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose list output
print("id\t\t\t\t\t type\t\t\t format\t\t name")
for rsp in datasets:
    print(rsp["id"],"\t",rsp["type"],"\t",rsp["format"],"\t\t",rsp["name"])

## 4. Experiments <a class="anchor" id="head-4"></a>

In the TAO finetuning microservices, experiments are used for workflow management with the following key features:
- **Workflow Chaining:** Chain multiple model actions together with defined dependencies
- **Dependency Management:** Create structured workflows with clear dependencies between actions
- **Metadata Configuration:** Each experiment can incorporate various metadata:
    - Docker environment variables
    - Cloud workspace assignment for storing model action results
    - Pretrained model to be used in the workflow
    - Datasets that are to be used in the workflow

### 4.1 Create experiment for VLM workflow

Define the experiment arguments

- network_arch
- workspace id 

In [None]:
data = json.dumps({"network_arch":model_name,
                   "workspace": workspace_id})

endpoint = f"{base_url}/experiments"

response = requests.post(endpoint,data=data,headers=headers)
print(response)
print(json.dumps(response.json(), indent=4))

experiment_id = response.json()["id"]
%store experiment_id

### 4.2 List experiments

Validate that the experiment is created

In [None]:
endpoint = f"{base_url}/experiments"
params = {"network_arch": "cosmos-rl"}
response = requests.get(endpoint, params=params, headers=headers)

print(response)
# print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose list output
print("model id\t\t\t     network architecture")
for rsp in response.json()["experiments"]:
    rsp_keys = rsp.keys()
    print(rsp["name"], rsp["id"],rsp["network_arch"])


### 4.3 Assign train, eval datasets

Set dataset configuration for training and evaluation.

- Set the docker env variable: we are using HF token to pull the evalution dataset from HF.
- Define train_datasets and eval_datasets from above train and eval data ids (check section 3.3 and 3.4)
- add dataset_information to *experiments id*

In [None]:
docker_env_vars = {"HF_TOKEN": hf_token}
dataset_information = {"train_datasets":[train_dataset_id],
                       "eval_dataset": eval_dataset_id,
                       "calibration_dataset": eval_dataset_id,
                       "docker_env_vars": docker_env_vars
                       }
data = json.dumps(dataset_information)

endpoint = f"{base_url}/experiments/{experiment_id}"

response = requests.patch(endpoint, data=data, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

#### 4.4 Update Experiment with AutoML Paramaters <a class="anchor" id="head-4-4"></a>

##### 4.4.1 View hyperparameters that are enabled for AutoML by default <a class="anchor" id="head-14"></a>

In [None]:
# Get default spec schema
endpoint = f"{base_url}/experiments/{experiment_id}/specs/train/schema"

while True:
    response = requests.get(endpoint, headers=headers)
    if response.status_code == 404:
        if "Base spec file download state is " in response.json()["error_desc"]:
            print("Base experiment spec file is being downloaded")
            time.sleep(2)
            continue
        else:
            break
    else:
        break

assert response.status_code in (200, 201)
assert "automl_default_parameters" in response.json().keys()
automl_params = response.json()["automl_default_parameters"]
print(json.dumps(automl_params, sort_keys=True, indent=4))

#### AutoML Parameters Configuration

The `automl_params` list retrieved above contains the **default AutoML hyperparameters** that have been carefully chosen for this model architecture.

**Key Points:**

- ‚úÖ **Recommended**: Use the default parameters as-is ‚Äî they are handpicked and validated for optimal performance
- üîß **Customizable**: You can add, modify, or remove parameters based on your specific requirements
- ‚ö†Ô∏è **Important**: Any modifications to this list will directly affect the AutoML experiment behavior

1. **`custom.vision.fps`**
   - Video sampling rate in frames per second for vision-language models. Higher FPS captures more temporal information but increases memory usage.
   - Valid range: 1-3

1. **`train.epoch`**
   - Total number of training epochs (complete passes through the dataset).
   - Valid range: 10-20

1. **`train.optm_lr`**
   - Peak learning rate for optimizer. Actual LR follows warmup and cosine decay schedule.
   - Valid range: 0-inf
   - Default: 1e-06

1. **`policy.lora.r`**
   - LoRA rank (dimension of low-rank decomposition matrices). Higher rank = more expressive but more parameters to train.
   - Lower values (4-8) for efficient fine-tuning, higher values (16-64) for better performance on complex tasks.
   - Valid range: 1-256
   - Default: 8

1. **`policy.lora.lora_alpha`**
   - LoRA scaling factor that controls the magnitude of LoRA updates. Typically set equal to or double the rank.
   - Formula: effective_lr = lora_alpha / r (or lora_alpha / sqrt(r) if use_rslora=True)
   - Valid range: 1-256
   - Default: 8.0

1. **`policy.lora.lora_dropout`**
   - Dropout probability applied to LoRA layers for regularization. Helps prevent overfitting.
   - 0.0 = no dropout (recommended for small datasets), 0.05-0.1 for larger datasets
   - Valid range: 0.0-0.5
   - Default: 0.0

1. **`train.optm_decay_type`**
    - Type of learning rate decay schedule after warmup phase completes.
    - **linear**: Linear decay from peak LR to min LR (weight: 0.1)
    - **sqrt**: Square root decay, slower initial decay (weight: 0.1)
    - **cosine**: Cosine annealing, smooth decay (weight: 0.4, recommended)
    - **none**: No decay, constant LR after warmup (weight: 0.4)
    - Valid options: linear, sqrt, cosine, none
    - Default: linear
    - **Recommendation**: Use 'cosine' (40% weight) or 'none' (40% weight) for most tasks

#### Next Steps

The final `automl_params` configuration (after any modifications) will be used to launch the AutoML training experiments in the subsequent cells.

1. **Review Available Parameters** (next cell)
   - Lists all trainable parameters you can add to `automl_params`
   - Helps you discover additional hyperparameters to tune

2. **Get Parameter Details** (optional)
   - Use the `:get_automl_param_details` endpoint to inspect specific parameters
   - View valid ranges, data types, and default values before tuning

3. **Update Parameter Ranges** (optional)
   - Use the `:update_automl_param_ranges` endpoint to customize search spaces
   - Narrow or expand ranges based on your domain knowledge

4. **Launch AutoML Experiment** (upcoming cells)
   - The AutoML system will:
     - Sample hyperparameter combinations from the defined ranges
     - Train multiple models in parallel
     - Track performance metrics for each configuration
     - Identify the best performing model

5. **Monitor AutoML Progress**
   - Use the `:automl_details` endpoint to track experiment status
   - View results, configurations, and performance for each trial
   - Identify which hyperparameters impact performance most

In [None]:
import copy
def flatten_dict(nested_dict, parent_key='', sep='.'):
    items = []
    for key, value in nested_dict.items():
        new_key = f"{parent_key}{sep}{key}" if parent_key else key
        if isinstance(value, dict):
            items.extend(flatten_dict(value, new_key, sep=sep).items())
        else:
            items.append((new_key, value))
    return dict(items)

default_train_spec = copy.deepcopy(response.json()["default"])
param = flatten_dict(default_train_spec)
for k, v in param.items():
    print(k)

#### Get AutoML Parameter Details

Retrieve detailed information about specific parameters that can be tuned by AutoML. This endpoint returns the parameter's data type, default values, valid ranges, and constraints.

##### Request Parameters

- **parameters**: Comma-separated list of parameter paths to query (e.g., `"train.optm_betas,train.epoch"`)

##### Response Structure

For each parameter, you'll receive:
- **parameter**: The full config path
- **value_type**: Data type (e.g., `list_2`, `int`, `float`, `str`)
- **default**: Default configuration including:
  - `default_value`: The default value(s)
  - `valid_min`: Minimum allowed value(s)
  - `valid_max`: Maximum allowed value(s)
  - `valid_options`: List of allowed discrete values (if applicable)
  - `math_cond`: Mathematical constraints
  - `depends_on`: Parameters this depends on
- **custom**: Any custom overrides applied

##### Example Output

```json
{
    "parameter": "train.optm_betas",
    "value_type": "list_2",
    "default": {
        "default_value": [0.9, 0.999],
        "valid_min": [0.8, 0.9],
        "valid_max": [0.95, 0.999]
    }
}
```

This information helps you understand valid ranges before setting up AutoML experiments.

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}:get_automl_param_details"

params = {"parameters": "train.optm_decay_type"}

response = requests.get(endpoint, headers=headers, params=params)
print(response)
print(json.dumps(response.json(), indent=4))

#### Update AutoML Parameter Ranges

Define the hyperparameter search space for AutoML optimization. This allows you to customize which parameters AutoML will tune and their allowed ranges.

##### Configurable Parameters

The `parameter_ranges` array allows you to specify:
- **parameter**: The config path (e.g., `train.optm_betas`, `train.epoch`)
- **valid_min**: Minimum value(s) for the parameter
- **valid_max**: Maximum value(s) for the parameter

**Note:** Array parameters like `optm_betas` require array values for min/max, while scalar parameters use single values.

In [None]:
automl_custom_range = [
    {
        "parameter": "train.optm_lr",
        "valid_min": 5e-06,
        "valid_max": 2e-04
    },
    {
        "parameter": "train.epoch",
        "valid_min": 1,
        "valid_max": 3
    },
    {
        "parameter": "train.optm_betas",
        "valid_min": [0.9, 0.995],
        "valid_max": [0.95, 0.999]
    },
    {
        "parameter": "train.optm_decay_type",
        "valid_options": ["cosine", "none"],
        "option_weights": [0.7, 0.3]
    },
    {
        "parameter": "policy.lora.r",
        "valid_min": 4,
        "valid_max": 64
    },
    {
        "parameter": "policy.lora.lora_alpha",
        "valid_min": 128,
        "valid_max": 1024
    },
    {
        "parameter": "policy.lora.lora_dropout",
        "valid_min": 0.03,
        "valid_max": 0.1
    },
    {
        "parameter": "custom.vision.fps",
        "valid_min": 1,
        "valid_max": 2
    }
]

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}:update_automl_param_ranges"

data =  {
        "parameter_ranges": automl_custom_range
    }

response = requests.patch(endpoint, headers=headers, json=data)
print(response)
print(json.dumps(response.json(), indent=4))

##### Update the experiment metadata with automl parameters to run experiments on <a class="anchor" id="head-14"></a>

In [None]:
automl_information = {
    "automl_enabled": False,
    "automl_algorithm": automl_algorithm,
    "automl_max_recommendations": 2,
    "automl_hyperparameters": str(automl_params)
}
data = json.dumps({"metric":"kpi", "automl_settings": automl_information})

endpoint = f"{base_url}/experiments/{experiment_id}"

response = requests.patch(endpoint, headers=headers, data=data)
assert response.status_code in (200, 201)

print(json.dumps(response.json(), sort_keys=True, indent=4))

## 5. Launch Fine-tuning <a class="anchor" id="head-5"></a>

For all **Actions**:
1. Get default spec schema and derive the default values
2. Modify defaults if needed
3. Post spec dictionary to the service
4. Run model action
5. Monitor job using retrieve
6. Download results using job download endpoint (if needed)

**Note** Here Actions stand for TAO Apis for: *train/eval/infer/..*

In [None]:
job_map = {}

### 5.1 Get default spec schema

List all the possible configuration needed for finetuning: 

In [None]:
# Get default spec schema
endpoint = f"{base_url}/experiments/{experiment_id}/specs/train/schema"

while True:
    response = requests.get(endpoint, headers=headers)
    if response.status_code == 404:
        if "Base spec file download state is " in response.json()["error_desc"]:
            print("Base experiment spec file is being downloaded")
            time.sleep(2)
            continue
        else:
            break
    else:
        break

print(response)
#print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose schema 
train_specs = response.json()["default"]
print(json.dumps(train_specs, sort_keys=True, indent=4))

### 5.2 Customize train model configuration

Override any of the configration for example update the batch size for training or num of gpus etc.
- `dp_shard_size` is the number of GPUs to be used for training
- For OOM issues
  - Try reducing the following
    - `dataloader_num_workers`
    - `dataloader_prefetch_factor`
    - `mini_batch`
    - `total_pixels`
    - `fps`
    - `train_batch_per_replica`
    - `model_max_length`
  - Disable
    - `enable_dataset_cache`
  

In [None]:
train_specs["train"]["epoch"] = 10
train_specs["train"]["ckpt"]["save_freq_in_epoch"] = train_specs["train"]["epoch"]
train_specs["validation"]["freq_in_epoch"] = train_specs["train"]["epoch"]
train_specs["validation"]["batch_size"] = 1

train_specs["train"]["ckpt"]["save_mode"] = "sync"

train_specs["train"]["train_policy"] = {
    "dataset":{
        "name":"sdg",
        "test_size":1,
    },
    "type":"sft",
    "enable_dataset_cache":True,
    "dataloader_num_workers":8,
    "dataloader_prefetch_factor":8,
    "conversation_column_name":"conversations",
    "mini_batch":1,
}

if "max_pixels" in train_specs["custom"]["vision"]:
    del train_specs["custom"]["vision"]["max_pixels"]
train_specs["custom"]["vision"]["total_pixels"] = 3136000
train_specs["custom"]["vision"]["fps"] = 1.0

train_specs["policy"]["parallelism"]["dp_shard_size"] = 2

train_specs["train"]["train_batch_per_replica"] = 32
train_specs["policy"]["model_max_length"] = 8192
train_specs["custom"]["system_prompt"] = "Answer the questions."

if finetuning_mode != "lora" and "lora" in train_specs["policy"]:
    del train_specs["policy"]["lora"]
elif finetuning_mode == "lora":
    train_specs["policy"]["lora"]["lora_alpha"] = 16
    train_specs["policy"]["lora"]["lora_r"] = 16
    train_specs["policy"]["lora"]["lora_dropout"] = 0.05

print(json.dumps(train_specs, sort_keys=True, indent=4))

### 5.3 Run *train* Action 

Run *train* action with the configurations defined above

In [None]:
action = "train"
train_request_body = {
    "parent_job_id":None,
    "action":action,
    "specs":train_specs}
data = json.dumps(train_request_body)

endpoint = f"{base_url}/experiments/{experiment_id}/jobs"

response = requests.post(endpoint, data=data, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

job_map["train"] = response.json()
print(job_map)
%store job_map

### 5.4 Monitor Job Status

The cell below will continuously monitor your training job and display real-time progress updates. This monitoring loop will automatically refresh until the job completes, fails, or is manually stopped.

#### Expected Training Times

- **Baseline**: Each experiment running for **1 epoch** takes approximately **55 minutes on 8x A100 GPUs**
- **Scaling**: Training time scales linearly with the number of epochs
  - SFT
    - 5 epochs ‚âà 4.5 hours
    - 10 epochs ‚âà 9 hours
    - 20 epochs ‚âà 18 hours
  - LORA takes around 65% of the SFT training time

#### Job Logs

This cell will give you the logs on the current running AutoML experiment in a loop until all automl experiments are completed

Preceding the logs, will the AutoML brain info, like how many more epochs in the total AutoML need to be completed, what is the current experiment ID, eta for total AutoML completion etc

- **Individual Experiment Log**:
  - Unique `job_id` for each AutoML experiment
  - Current hyperparameter configuration being tested
  - Per-experiment metrics and status
  
- **AutoML Brain Summary**:
  - Number of experiments remaining in the AutoML search
  - Estimated Time to Completion (ETA)
  - Current best metric value across all experiments
  - Recommendation progress and performance trends

<div class="alert alert-block alert-success">
    <b>Note:</b> To stop the training job at any time, refer to the instructions in the next cell below.
</div>


In [None]:
job_id = job_map["train"]
job_metadata_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}"

while True:
    clear_output(wait=True)
    job_metadata_response = requests.get(job_metadata_endpoint, headers=headers)
    if "error_desc" in job_metadata_response.json().keys() and job_metadata_response.json()["error_desc"] in ("Job trying to retrieve not found", "No AutoML run found"):
        print("Job is being created")
        time.sleep(5)
        continue

    automl_experiment_index = None
    automl_brain_info = job_metadata_response.json().get("job_details", {}).get(job_id, {}).get("automl_brain_info")

    if automl_brain_info is not None:
        for metric in automl_brain_info:
            print(f'{metric["metric"]}: {metric["value"]}')
            if metric["metric"] == "Current experiment id":
                automl_experiment_index = int(metric["value"]) - 1

    params = {}
    if automl_experiment_index is not None and automl_experiment_index >= 0:
        params = {"automl_experiment_index":automl_experiment_index}
    
    job_logs_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/logs"
    job_logs_response = requests.get(job_logs_endpoint, headers=headers, params=params)
    print("\nLogs of job: \n",job_logs_response.text)

    if job_metadata_response.json().get("status") in ["Done","Error", "Canceled", "Paused"] or job_metadata_response.status_code not in (200,201):
        print(json.dumps(job_metadata_response.json(), sort_keys=True, indent=4))
        break
    time.sleep(15)

#### You can fetch job of individual AutoML experiments by their index one time below
 - -1 index for AutoML brain logs
 - For non automl jobs ignore the params field

In [None]:
# job_id = job_map["train"]
# params = {"automl_experiment_index":-1}
# job_logs_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/logs"
# job_logs_response = requests.get(job_logs_endpoint, headers=headers, params=params)
# print("\nLogs of job: \n",job_logs_response.text)

#### You can fetch job metadata one time below
  - Unique `job_id` for each AutoML experiment
  - Per-experiment metrics and status

In [None]:
# job_id = job_map["train"]
# job_metadata_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}"
# print(json.dumps(job_metadata_response.json(), sort_keys=True, indent=4))

### To Stop the finetuning Job
1. Stop code cell in step 5.4 (the cell right before this cell) manually
2. Uncomment the snippet in the next cell and run the cell

In [None]:
# job_id = job_map["train"]
# endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}:pause"

# data = {"graceful":False}

# response = requests.post(endpoint, headers=headers, json=data)

# print(response)
# print(json.dumps(response.json(), indent=4))

### Resume Finetuning job

Uncomment the below snippet if you want to resume an already stopped finetuning job and then run code cell in step **5.3 Monitor job status**

In [None]:
# job_id = job_map["train"]
# endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}:resume"

# data = json.dumps({"parent_job_id":None, "specs":train_specs})
# response = requests.post(endpoint, data=data, headers=headers)

# print(response)
# print(json.dumps(response.json(), indent=4))

## 6. Evaluate <a class="anchor" id="head-6"></a>

Once the model is finetuned, we start evaluation. The model used in evaluation will be as per the predefined checkpoint chosen method.

In [None]:
# Get default spec schema
endpoint = f"{base_url}/experiments/{experiment_id}/specs/evaluate/schema"

while True:
    response = requests.get(endpoint, headers=headers)
    if response.status_code == 404:
        if "Base spec file download state is " in response.json()["error_desc"]:
            print("Base experiment spec file is being downloaded")
            time.sleep(2)
            continue
        else:
            break
    else:
        break

print(response)
#print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose schema
evaluate_specs = response.json()["default"]["evaluate"]
print(json.dumps(evaluate_specs, sort_keys=True, indent=4))

In [None]:
evaluate_specs["vision"]["fps"] = 1.0
evaluate_specs["vision"]["total_pixels"] = 3136000
if finetuning_mode == "lora":
    evaluate_specs["model"]["enable_lora"] = True
    evaluate_specs["model"]["base_model_path"] = "hf_model://nvidia/Cosmos-Reason1-7B"
print(json.dumps(evaluate_specs, sort_keys=True, indent=4))

### 6.1 Run *evaluate* action

In [None]:
parent = job_map["train"]
action = "evaluate"
data = json.dumps({"parent_job_id":parent,"action":action,"specs":evaluate_specs})

endpoint = f"{base_url}/experiments/{experiment_id}/jobs"

response = requests.post(endpoint, data=data, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

job_map["evaluate"] = response.json()
print(job_map)
%store job_map

In [None]:
# 'evaluate': '8a50a050-55c1-4fb1-93f5-84315c448a00'}

#### Monitor job status by repeatedly running this cell. Stop the cell when you are done

In [None]:
job_id = job_map["evaluate"]
job_metadata_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}"
job_logs_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/logs"

while True:
    clear_output(wait=True)
    job_metadata_response = requests.get(job_metadata_endpoint, headers=headers)
    
    job_logs_response = requests.get(job_logs_endpoint, headers=headers)
    print(job_logs_response.text)
    
    if job_metadata_response.json().get("status") in ["Done","Error", "Canceled", "Paused"] or job_metadata_response.status_code not in (200,201):
        print(json.dumps(job_metadata_response.json(), indent=4))
        break
    time.sleep(15)

## 7. Quantization <a class="anchor" id="head-7"></a>

- Run FP8 quantization of the LLM modules of the Cosmos-Reason finetuned model created at Step 5

#### 7.1 Customize quantization action specs

In [None]:
# Get default spec schema
endpoint = f"{base_url}/experiments/{experiment_id}/specs/quantize/schema"

while True:
    response = requests.get(endpoint, headers=headers)
    if response.status_code == 404:
        if "Base spec file download state is " in response.json()["error_desc"]:
            print("Base experiment spec file is being downloaded")
            time.sleep(2)
            continue
        else:
            break
    else:
        break

print(response)
#print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose schema
quantize_specs = response.json()["default"]["quantize"]
print(json.dumps(quantize_specs, sort_keys=True, indent=4))

In [None]:
# Apply changes to the specs dictionary if necessary
if finetuning_mode == "lora":
    quantize_specs["model"]["enable_lora"] = True
    quantize_specs["model"]["base_model_path"] = "hf_model://nvidia/Cosmos-Reason1-7B"

#### 7.2 Run *quantize* Action

In [None]:
parent = job_map["train"]
action = "quantize"
data = json.dumps({"parent_job_id":parent,"action":action,"specs":quantize_specs})

endpoint = f"{base_url}/experiments/{experiment_id}/jobs"

response = requests.post(endpoint, data=data, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

job_map["quantize"] = response.json()
print(job_map)
%store job_map

#### Monitor job status by repeatedly running this cell. Stop the cell when you are done checking

In [None]:
job_id = job_map['quantize']
job_metadata_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}"
job_logs_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/logs"

while True:
    clear_output(wait=True)
    job_metadata_response = requests.get(job_metadata_endpoint, headers=headers)

    job_logs_response = requests.get(job_logs_endpoint, headers=headers)
    print(job_logs_response.text)

    if job_metadata_response.json().get("status") in ["Done","Error", "Canceled", "Paused"] or job_metadata_response.status_code not in (200,201):
        print(json.dumps(job_metadata_response.json(), indent=4))
        break
    time.sleep(15)

## 8. Inference <a class="anchor" id="head-7"></a>

- Run inference on a set of images using the quantized model created at Step 7

### 8.1 Inference as a one-time Job <a class="anchor" id="head-7"></a>
If you just want to run inference on one set of inputs, run section 8.1, if you wanna try multiple times with different settings and inputs, jump to inference microservices in step 8.2

#### 8.1.1 Customize inference action specs

In [None]:
# Get default spec schema
endpoint = f"{base_url}/experiments/{experiment_id}/specs/inference/schema"

while True:
    response = requests.get(endpoint, headers=headers)
    if response.status_code == 404:
        if "Base spec file download state is " in response.json()["error_desc"]:
            print("Base experiment spec file is being downloaded")
            time.sleep(2)
            continue
        else:
            break
    else:
        break

print(response)
#print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose schema
inference_specs = response.json()["default"]["inference"]
print(json.dumps(inference_specs, sort_keys=True, indent=4))

In [None]:
# Apply changes to the specs dictionary if necessary
inference_specs["prompt"] = "When does something happen in the video?"
inference_specs["media"] = "aws://nvcf-storage-handling/data/vlm_inference/videos/test_video.mp4" # Format: cloud_type://bucket_name/video_file_path
# # Uncomment this if parent job is train instead of quantize
# if finetuning_mode == "lora":
#     inference_specs["enable_lora"] = True
#     inference_specs["base_model_path"] = "hf_model://nvidia/Cosmos-Reason1-7B"
print(json.dumps(inference_specs, sort_keys=True, indent=4))

#### 8.1.2 Run *inference* Action

In [None]:
parent = job_map["quantize"]
action = "inference"
data = json.dumps({"parent_job_id":parent,"action":action,"specs":inference_specs})

endpoint = f"{base_url}/experiments/{experiment_id}/jobs"

response = requests.post(endpoint, data=data, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

job_map["inference"] = response.json()
print(job_map)
%store job_map

#### Monitor job status by repeatedly running this cell. Stop the cell when you are done checking

In [None]:
job_id = job_map['inference']
job_metadata_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}"
job_logs_endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/logs"

while True:
    clear_output(wait=True)
    job_metadata_response = requests.get(job_metadata_endpoint, headers=headers)

    job_logs_response = requests.get(job_logs_endpoint, headers=headers)
    print(job_logs_response.text)

    if job_metadata_response.json().get("status") in ["Done","Error", "Canceled", "Paused"] or job_metadata_response.status_code not in (200,201):
        print(json.dumps(job_metadata_response.json(), indent=4))
        break
    time.sleep(15)

### 8.2 Inference Microservice <a class="anchor" id="head-7"></a>


#### Start inference microservice

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}/inference_microservice/start"
data = {
    "parent_id": job_map["quantize"],
    # "model_path": "" # By default, we use the finetuned model. If you want to use any other model other than the finetuned model, you can specify the model path in the cloud here
    # Uncomment the below if parent job is train instead of quantize
    # "enable_lora": True,
    # "base_model_path": "hf_model://nvidia/Cosmos-Reason1-7B"
}

response = requests.post(endpoint, json=data, headers=headers)
job_map["inference_microservice"] = response.json()["job_id"]

print(response)
print(json.dumps(response.json(), indent=4))
%store job_map

#### Get the status of inference microservice, wait until the status is ready before proceeding to inference

In [None]:
job_id = job_map['inference_microservice']
endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/inference_microservice/status"

while True:
    clear_output(wait=True)
    response = requests.get(endpoint,headers=headers)
    if response.json().get("status") in ["Done","Error", "Canceled", "Paused"] or response.status_code not in (200,201):
        print("Inference microservice is not errored")
        break
    if response.json().get("model_loaded") == True:
        print("Inference microservice is ready")
        break
    print(response)
    print(json.dumps(response.json(), indent=4))
    time.sleep(15)
print(response)
print(json.dumps(response.json(), indent=4))

#### 8.2.1 Customize inference action specs

In [None]:
# Get default spec schema
endpoint = f"{base_url}/experiments/{experiment_id}/specs/inference/schema"

while True:
    response = requests.get(endpoint, headers=headers)
    if response.status_code == 404:
        if "Base spec file download state is " in response.json()["error_desc"]:
            print("Base experiment spec file is being downloaded")
            time.sleep(2)
            continue
        else:
            break
    else:
        break

print(response)
#print(json.dumps(response.json(), indent=4)) ## Uncomment for verbose schema
inference_specs = response.json()["default"]["inference"]
print(json.dumps(inference_specs, sort_keys=True, indent=4))

In [None]:
# Apply changes to the specs dictionary if necessary
inference_specs["prompt"] = "When does something happen in the video?"
inference_specs["media"] = "aws://nvcf-storage-handling/data/vlm_inference/videos/test_video.mp4" # Format: cloud_type://bucket_name/video_file_path
print(json.dumps(inference_specs, sort_keys=True, indent=4))

#### 8.2.2 Run inference using *inference-microservice*

In [None]:
job_id = job_map['inference_microservice']
endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/inference_microservice/inference"

response = requests.post(endpoint, json=inference_specs, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

#### 8.2.2 Stop the microservice after all your inference tryouts are completed

In [None]:
job_id = job_map['inference_microservice']
endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}/inference_microservice/stop"

response = requests.post(endpoint, headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

## 9. Finish Experiment and Cleanup <a class="anchor" id="head-7"></a>

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}"

response = requests.delete(endpoint,headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

### 9.1 Delete dataset
#### Train dataset

In [None]:
endpoint = f"{base_url}/datasets/{train_dataset_id}"

response = requests.delete(endpoint,headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

#### Validation dataset

In [None]:
endpoint = f"{base_url}/datasets/{eval_dataset_id}"

response = requests.delete(endpoint,headers=headers)

print(response)
print(json.dumps(response.json(), indent=4))

## 10. Model Deployment <a class="anchor" id="head-8"></a>

To deploy a post-trained checkpoint, refer to the [Model Deployment session in Cosmos Cookbook](https://nvidia-cosmos.github.io/cosmos-cookbook/recipes/post_training/reason1/intelligent-transportation/post_training.html#model-deployment). It has instructions on deploying with NIM and NVIDIA VSS blueprint
