
# Discover Cost-Efficient AI Customer Service Agents with NVIDIA Data Flywheel Blueprint
[![ Click here to deploy.](https://brev-assets.s3.us-west-1.amazonaws.com/nv-lb-dark.svg)](https://brev.nvidia.com/launchable/deploy?launchableID=env-2wggjBvDlVp4pLQD8ytZySh5m8W)

In this notebook, you will learn learn how to use the Data Flywheel Foundational Blueprint to continuously discover and promote more cost-efficient agents for an [AI virtual customer service assistant](https://build.nvidia.com/nvidia/ai-virtual-assistant-for-customer-service).

### Data Flywheel Blueprint

![dfw_arch](../docs/images/data-flywheel-blueprint.png)


### AI Virtual Assistant for Customer Service

The primary customer service agent in the AI Virtual Assistant uses tool calling to route user queries to specialized assistants, including: 

- Product Q&A
- Order status verification
- Returns processing
- Small talk and casual engagement

These interactions generate logs and tool-calling data that you can use as both evaluation benchmarks and training data. In this tutorial, you'll use this information to drive the flywheel process, fine-tuning smaller LLMs (such as `meta/llama-3.2-1B-instruct`, `meta/llama-3.2-3B-instruct`, `meta/llama-3.1-8B-instruct`) to match accuracy of the currently deployed model (`meta/llama-3.3-70B-instruct`).



## Interfacing with the Blueprint

The following diagram illustrates how admin tools and applications interact with the Flywheel Blueprint, which orchestrates logging, processing, and model management to enable continuous optimization.

![Arch](./arch.png)

### Contents 

0. [Data Flywheel Setup](#0)
1. [Load Sample Data](#1)
2. [Create a Flywheel Job](#2)
3. [Monitor Job Status](#3)
4. [Optional: Show Continuous Improvement](#4)

<a id="0"></a>
## Data Flywheel Setup

### General Setup

In general, you can start the Data Flywheel service by following the instructions provided in the notebook [README.md](https://github.com/NVIDIA-AI-Blueprints/data-flywheel/blob/main/notebooks/README.md).

### Brev Launchable Setup

If you’re running this notebook from a [Brev Launchable](https://brev.nvidia.com/launchable/deploy/now?launchableID=env-2wggjBvDlVp4pLQD8ytZySh5m8W), please complete the following setup steps before proceeding:

**Step 1**: Set NGC API key following the instructions at [Generating NGC API Keys](https://docs.nvidia.com/ngc/gpu-cloud/ngc-private-registry-user-guide/index.html#generating-api-key).

In [None]:
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())

In [None]:
import os
if not os.environ.get('NGC_API_KEY'):
    os.environ['NGC_API_KEY'] = '<your_ngc_api_key>'
else:
    print("NGC_API_KEY already set")

In [None]:
import os
if not os.environ.get('WANDB_API_KEY'):
    os.environ['WANDB_API_KEY'] = '<your_wandb_api_key>'
else:
    print("WANDB_API_KEY already set")

**Step 2**: Clone the data flywheel repo and fetch data files.

In [4]:
# %%bash
# git clone https://github.com/NVIDIA-AI-Blueprints/data-flywheel.git
# cd data-flywheel
# sudo apt-get update && sudo apt-get install -y git-lfs
# git lfs install
# git-lfs pull

**Step 3**: Set up paths and installs python dependencies for notebook.

In [None]:
import sys
from pathlib import Path

notebook_dir = Path.cwd()
# project_root = notebook_dir / "data-flywheel"
# data_dir = project_root / "data"
project_root = notebook_dir.parent
data_dir = Path(project_root, "data")
sys.path.insert(0, str(project_root))
os.chdir(project_root)
print(f"Working directory changed to: {Path.cwd()}")

# user_site = Path.home() / ".local" / "lib" / f"python{sys.version_info.major}.{sys.version_info.minor}" / "site-packages"
# if str(user_site) not in sys.path:
#     sys.path.append(str(user_site))
#     print(f"Added user site-packages to sys.path: {user_site}")

# %pip install --user elasticsearch==8.17.2 pydantic-settings>=2.9.1 pandas>=2.2.3 matplotlib==3.10.3

**Step 4**: Update `config/config.yaml` to use remote LLM as judge. By default, data flywheel blueprint deploys `LLama-3.3-70B-instruct` locally for LLM as a judge, which requires 4 GPUs. But for the launchable, we will choose the remote LLM judge and use the `LLama-3.3-70B-instruct` NIM hosted on [build.nvidia.com](https://build.nvidia.com/meta/llama-3_3-70b-instruct).

By default, only `Llama-3.2-1b-instruct` will be used in the flywheel but you can uncomment other models in the yaml file to include in the flywheel run. You can also change other config settings such as data split and training hyperparameters as desired



In [6]:
import re

config_path = project_root / "config" / "config.yaml"
with open(config_path, "r") as f:
    original_yaml = f.read()

llm_judge_config_block = """llm_judge_config:
  type: "remote"
  url: "https://integrate.api.nvidia.com/v1/chat/completions"
  model_id: "meta/llama-3.3-70b-instruct"
  api_key_env: "NGC_API_KEY"
"""
updated_yaml = re.sub(
    r"llm_judge_config:.*?(?=\n\w|\Z)",  # stops at next top-level key
    llm_judge_config_block,
    original_yaml,
    flags=re.DOTALL
)

with open(config_path, "w") as f:
    f.write(updated_yaml)

**Step 5**: Start data flywheel service, which involves first deploying the Nemo Microservices Platform and then bring up the data flywheel service via docker compose. This step take some time.

In [None]:
import os

# How to remove all running containers
os.system("docker rm -f $(docker ps -q)")

# How to delete folders
if os.path.exists("nemo-microservices-helm-chart-25.4.0.tgz"):
    os.rmdir("nemo-microservices-helm-chart-25.4.0.tgz")

# How to delete minikube
os.system("minikube delete")


In [None]:
%%bash
set -e

log() {
  echo -e "\033[1;32m[INFO]\033[0m $1"
}

echo "$NGC_API_KEY" | docker login nvcr.io -u '$oauthtoken' --password-stdin
chmod +x scripts/deploy-nmp.sh scripts/run.sh

log "Starting Nemo Microservices Platform (NMP) deployment..."
./scripts/deploy-nmp.sh >> flywheel_deploy.log 2>&1
log "NMP deployed successfully!"

log "Starting data flywheel service..."
./scripts/run.sh >> flywheel_deploy.log 2>&1
log "Data flywheel service started successfully!"

---

<a id="1"></a>
## Step 1: Load Sample Data


First, we need to import required libraries and configure pandas display options for better readability in notebook outputs.

In [9]:
import sys
from pathlib import Path
import requests
import time
from datetime import datetime
import json
import pandas as pd
from IPython.display import display, clear_output

pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', None)        # Width of the display in characters
pd.set_option('display.max_colwidth', None)  # Show full content of each cell

Use the provided sample dataset from AI Virtual Assistant (`aiva`) (`data/aiva_primary_assistant_dataset.jsonl`) to simulate real user logs captured while an agentic customer service agent application is running. Each data point has the following schema:

| Field        | Type               | Description                                                         |
|--------------|--------------------|---------------------------------------------------------------------|
| `timestamp`  | `int` (epoch secs) | Time the request was issued                                         |
| `workload_id`| `str`              | Stable identifier for the logical task / route / agent node         |
| `client_id`  | `str`              | Identifier of the application or deployment that generated traffic  |
| `request`    | `dict`             | Exact [`openai.ChatCompletion.create`](https://platform.openai.com/docs/api-reference/chat/create) payload received by the model |
| `response`   | `dict`             | Exact `ChatCompletion` response returned by the model               |

The `reuqest` uses the OpenAI `ChatCompletions` request format and contains the following attributes:

- `model` includes the Model ID used to generate the response.
- `messages` includes a `system` message as well as a `user` query.
- `tools` includes a list of functions and parameters available to the LLM to choose from, as well as their parameters and descriptions.

In [None]:
DATA_PATH = data_dir / "aiva_primary_assistant_dataset.jsonl"

!head -n1 {DATA_PATH} | jq

The data points generated by AI Virtual Assistant in response to user queries are considered **ground truth**. 

Ground truth data points are used to **evaluate** and **customize** more efficient models that can perform similarly to the current model. This customization process is analogous to a student-teacher distillation setup, where synthetic data generated from the teacher model is used to fine-tune a student model.

Next, we'll load the data into Elasticsearch using a helper method `load_data_to_elasticsearch`, making it accessible to the Flywheel Orchestrator.

In [None]:
from src.scripts.load_test_data_es import load_data_to_elasticsearch

load_data_to_elasticsearch(file_path=DATA_PATH)

In [12]:
from src.scripts.load_test_data_weave import load_data_to_weave

# load_data_to_weave(file_path=DATA_PATH)


---

<a id="2"></a>
## Step 2: Create a Flywheel Job

Initiate a Flywheel job by sending a POST request to the `/jobs` API. This triggers the workflow asynchronously.

In production environments, you can automate this process to run at scheduled intervals, in response to specific events, or on demand.

For this tutorial, we will target the primary customer service agent by setting the `workload_id` to "primary_assistant" and we will set `client_id` to "aiva-1" which has 300 data points.

In [None]:
# Flywheel Orchestrator URL
API_BASE_URL = "http://0.0.0.0:8000"

response = requests.post(
    f"{API_BASE_URL}/api/jobs",
    json={"workload_id": "primary_assistant", "client_id": "aiva-1"}
)

response.raise_for_status()
job_id = response.json()["id"]

print(f"Created job with ID: {job_id}")

---

<a id="3"></a>
## Step 3: Monitor Job Status

Submit a GET request to `/jobs/{job_id}` to retrieve the current status.

In [14]:
def get_job_status(job_id):
    """Get the current status of a job."""
    response = requests.get(f"{API_BASE_URL}/api/jobs/{job_id}")
    response.raise_for_status()
    return response.json()

In [None]:
get_job_status(job_id)

To simplify the process, you can define utility functions that:

- Periodically retrieve the job status
- Format the output into a table

This makes it easier to compare and analyze the results.

In [16]:
# Polling interval (in seconds) for monitoring flywheel job
POLL_INTERVAL = 5

In [17]:
import matplotlib.pyplot as plt
import pandas as pd
import time
from datetime import datetime
from IPython.display import clear_output

def format_runtime(seconds):
    """Format runtime in seconds to a human-readable string."""
    if seconds is None:
        return "-"
    minutes, seconds = divmod(seconds, 60)
    if minutes > 0:
        return f"{int(minutes)}m {int(seconds)}s"
    return f"{int(seconds)}s"

def extract_main_score(score_str):
    try:
        first_score = score_str.split(";")[0]
        value = first_score.split(":")[1].strip()
        return float(value)
    except Exception:
        return 0

def create_results_table(job_data):
    """Create a pandas DataFrame from job data."""
    rows = []
    for nim in job_data["nims"]:
        model_name = nim["model_name"]
        for eval in nim["evaluations"]:
            score_str = "; ".join(f"{k}: {v}" for k, v in eval["scores"].items() if k != "function_name_and_args_accuracy")
            main_score = extract_main_score(score_str)
            rows.append({
                "Model": model_name,
                "Eval Type": eval["eval_type"].upper(),
                "Score": main_score,
                "Percent Done": eval["progress"],
                "Runtime": format_runtime(eval["runtime_seconds"]),
                "Status": "Completed" if eval["finished_at"] else "Running",
                "Started": datetime.fromisoformat(eval["started_at"]).strftime("%H:%M:%S"),
                "Finished": datetime.fromisoformat(eval["finished_at"]).strftime("%H:%M:%S") if eval["finished_at"] else "-"
            })
    if not rows:
        return pd.DataFrame(columns=["Model", "Eval Type", "Scores", "Percent Done", "Runtime", "Status", "Started", "Finished"])
    
    df = pd.DataFrame(rows)

    return df.sort_values(["Model", "Eval Type"])

def create_customization_table(job_data):
    """Create a pandas DataFrame from customization data."""
    customizations = []
    for nim in job_data["nims"]:
        model_name = nim["model_name"]
        for custom in nim["customizations"]:
            customizations.append({
                "Model": model_name,
                "Started": datetime.fromisoformat(custom["started_at"]).strftime("%H:%M:%S"),
                "Epochs Completed": custom["epochs_completed"],
                "Steps Completed": custom["steps_completed"],
                "Finished": datetime.fromisoformat(custom["finished_at"]).strftime("%H:%M:%S") if custom["finished_at"] else "-",
                "Status": "Completed" if custom["finished_at"] else "Running",
                "Runtime": format_runtime(custom["runtime_seconds"]),
                "Percent Done": custom["progress"],
            })
   
    if not customizations:
        customizations = pd.DataFrame(columns=["Model", "Started", "Epochs Completed", "Steps Completed", "Finished", "Runtime", "Percent Done"])
    customizations = pd.DataFrame(customizations)
    return customizations.sort_values(["Model"])

def monitor_job(job_id):
    """Monitor a job and display its progress in a table."""
    print(f"Monitoring job {job_id}...")
    print("Press Ctrl+C to stop monitoring")
    
    while True:
        try:
            clear_output(wait=True)

            fig, ax = plt.subplots(figsize=(10, 6))
            job_data = get_job_status(job_id)
            results_df = create_results_table(job_data)
            customizations_df = create_customization_table(job_data)
            clear_output(wait=True)
            print(f"Job Status: {job_data['status']}")
            print(f"Total Records: {job_data['num_records']}")
            print(f"Last Updated: {datetime.now().strftime('%H:%M:%S')}")
            print("\nResults:")
            display(results_df)
            print("\nCustomizations:")
            display(customizations_df)
            display(job_data)

            # Plot 1: Evaluation Scores
            ax.set_title("Evalulation Results", fontsize=14)
            if not results_df.empty:
                pivot_df = results_df.pivot(index="Model", columns="Eval Type", values="Score").fillna(0)
                pivot_df.plot(kind='bar', ax=ax)
                ax.set_ylabel("Eval Metrics")
                ax.set_ylim(0, 1)
                ax.legend(title="Eval Type")
                ax.grid(axis='y', linestyle='--', alpha=0.7)
            else:
                ax.text(0.5, 0.5, "No Evaluation Data", ha='center', va='center')

            plt.tight_layout()
            plt.show()                        
            time.sleep(POLL_INTERVAL)
            
        except KeyboardInterrupt:
            print("\nMonitoring stopped by user")
            break
        except Exception as e:
            print(f"\nError: {str(e)}")
            break

In [None]:
# Start monitoring the job
monitor_job(job_id)

You’ve now successfully completed a Flywheel run and can review the evaluation results to decide whether to promote the model. However, with only 300 data points, the customized `Llama-3.2-1B-instruct` is likely still limited in performance.

That said, the Data Flywheel operates as a self-reinforcing cycle—models continue to improve as more user interaction logs are collected. Below, we demonstrate how model performance improves incrementally with additional data.


![aiva1](./img/300dp.png)

**Flywheel run results at 300 data points**

![aiva2](./img/500dp.png)

**Flywheel run results at 500 data points**

![aiva3](./img/1000dp.png)

**Flywheel run results at 1,000 data points**

With the improvement results demonstrated, you can now move on to Step 4 to run the Flywheel with additional data yourself.

## Step 4: Show Continuous Improvement (Optional)

To extend the flywheel run with additional data, we’ll launch a new job using `client_id` set to "aiva-2", which includes **500** data points, to evaluate the impact of increased data volume on performance.

In [None]:
# response = requests.post(
#     f"{API_BASE_URL}/api/jobs",
#     json={"workload_id": "primary_assistant", "client_id": "aiva-2"}
# )

# response.raise_for_status()
# job_id = response.json()["id"]

# print(f"Created job with ID: {job_id}")

In [None]:
# get_job_status(job_id)

In [None]:
# monitor_job(job_id)

You should see some improvements of the customized model compared to the last run.

Assuming we have now collected even more data points, let's kick off another flywheel run by setting `client_id` to "aiva-3" which includes **1,000** records.

In [None]:
# response = requests.post(
#     f"{API_BASE_URL}/api/jobs",
#     json={"workload_id": "primary_assistant", "client_id": "aiva-3"}
# )

# response.raise_for_status()
# job_id = response.json()["id"]

# print(f"Created job with ID: {job_id}")

In [None]:
# monitor_job(job_id)

After the run with 1,000 data points, we should observe the customized model’s score approaching 1.0. This indicates that the `LLama-3.2-1B-instruct` model achieves accuracy comparable to the much larger `LLama-3.3-70B-instruct` base model deployed in AI Virtual Assistant, while significantly reducing latency and compute usage thanks to its smaller size.