### MCP Server Usage (SSE and stdio)

This notebook showcases how to use the NVIDIA RAG MCP server via MCP transports instead of REST APIs. It covers:
- Launching the server (SSE and stdio)
- Connecting with the MCP Python client
- Listing tools
- Calling all MCP tools: `generate`, `search`, and `get_summary`

You can execute each cell in sequence to test the MCP server APIs.


#### 1. Install Dependencies

Purpose:
Install the libraries needed to run the MCP server and client locally in this notebook environment.

- Ensure your environment has:
  - `mcp`, `anyio`, `httpx`, `httpx-sse`, `uvicorn`
- If using Workbench/docker, these may already be installed.


In [None]:
# Optional installation (uncomment to run)
# %pip install -qq mcp anyio httpx httpx-sse uvicorn


#### 2. Setup Base Configuration

Purpose:
Capture API keys and environment variables (e.g., NVCF) that the server and client will rely on.

Configure keys and basic variables used by the rest of this notebook.


In [1]:
import os, subprocess, sys, time, atexit
from pathlib import Path

# Configure keys here for convenience (OPTIONAL: you can also rely on your shell env)
# If using NVCF, prefer NVCF_API_KEY and related env vars
NVCF_API_KEY = os.environ.get("NVCF_API_KEY", "")
NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "")
NVCF_ORG_ID = os.environ.get("NVCF_ORG_ID", "")
NVCF_TEAM_ID = os.environ.get("NVCF_TEAM_ID", "")
NVCF_REGION = os.environ.get("NVCF_REGION", "")
NVCF_BASE_URL = os.environ.get("NVCF_BASE_URL", "")

# Choose one usable key for demos (falls back from NVCF to NVIDIA)
API_KEY = NVCF_API_KEY or NVIDIA_API_KEY

print("Using key type:", "NVCF" if NVCF_API_KEY else ("NVIDIA" if NVIDIA_API_KEY else "<none>"))



Using key type: <none>


## Launch MCP Server (SSE)

Purpose:
Start the MCP server locally over SSE so that the client can connect and call tools.

This launches the MCP server on `http://127.0.0.1:8000`.
- You can pass the API key via `--api-key` or rely on env variables.
- The server also accepts `Authorization: Bearer ...` or `x-api-key: ...` headers for SSE requests.


In [None]:
# Kill any process listening on port 8000 to avoid port conflicts
import subprocess

PORT = 8000
print(f"Kill any process running on port {PORT} to start sse server in the next cell")

# Try fuser first (common on Linux)
try:
    subprocess.run(["fuser", "-k", f"{PORT}/tcp"], check=False)
except FileNotFoundError:
    print("'fuser' not found, skipping fuser-based cleanup.")
except Exception as e:
    print(f"Error while running fuser: {e}")

In [8]:
# Start SSE server in the background
sse_proc = None
try:
    env = dict(os.environ)
    # Suppress server-side INFO logs
    env["LOGLEVEL"] = "ERROR"
    if API_KEY:
        env["NVCF_API_KEY"] = API_KEY
        env.setdefault("NVIDIA_API_KEY", API_KEY)
    if NVCF_ORG_ID: env["NVCF_ORG_ID"] = NVCF_ORG_ID
    if NVCF_TEAM_ID: env["NVCF_TEAM_ID"] = NVCF_TEAM_ID
    if NVCF_REGION: env["NVCF_REGION"] = NVCF_REGION
    if NVCF_BASE_URL: env["NVCF_BASE_URL"] = NVCF_BASE_URL

    cmd = [
        sys.executable,
        "-m",
        "nvidia_rag.utils.mcp.mcp_server",
        "--transport",
        "sse",
        "--host",
        "127.0.0.1",
        "--port",
        "8000",
    ]
    # Also pass explicit --api-key if available
    if API_KEY:
        cmd += ["--api-key", API_KEY]

    print("Launching:", " ".join(cmd))
    sse_proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    atexit.register(lambda: sse_proc and sse_proc.poll() is None and sse_proc.terminate())
    time.sleep(2.0)
    print("SSE server PID:", sse_proc.pid)
except Exception as e:
    print("Failed to start SSE server:", e)



Launching: /home/niyati/anaconda3/bin/python -m nvidia_rag.utils.mcp.mcp_server --transport sse --host 127.0.0.1 --port 8000
SSE server PID: 1887615


## Connect with MCP Client (SSE), List Tools, and Call MCP Tools

Purpose:
Verify connectivity by listing available tools and invoking the MCP tools (`generate`, `search`, `get_summary`) using the SSE transport.

This uses the `mcp_client.py` CLI to connect over SSE, list tools, and invoke the RAG tools.

**Note:** Ensure the SSE server is running from Cell 6 before executing this cell.


In [10]:
import json
import subprocess

# SSE connection configuration
SSE_URL = "http://127.0.0.1:8000"

print("="*80)
print("Listing available tools...")
print("="*80)
subprocess.run([
    sys.executable,
    "-m",
    "nvidia_rag.utils.mcp.mcp_client",
    "list",
    "--transport=sse",
    f"--url={SSE_URL}",
], stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("Calling 'generate' tool...")
print("="*80)
generate_args = json.dumps({
    "messages": [{"role": "user", "content": "Hello from SSE demo"}],
    "collection_name": "my_collection",
})
subprocess.run([
    sys.executable,
    "-m",
    "nvidia_rag.utils.mcp.mcp_client",
    "call",
    "--transport=sse",
    f"--url={SSE_URL}",
    "--tool=generate",
    f"--json-args={generate_args}",
], stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("Calling 'search' tool...")
print("="*80)
search_args = json.dumps({
    "query": "Tell me about Robert Frost's poems",
    "collection_name": "my_collection",
    "reranker_top_k": 2,
    "vdb_top_k": 5,
    "enable_query_rewriting": False,
    "enable_reranker": True,
})
subprocess.run([
    sys.executable,
    "-m",
    "nvidia_rag.utils.mcp.mcp_client",
    "call",
    "--transport=sse",
    f"--url={SSE_URL}",
    "--tool=search",
    f"--json-args={search_args}",
], stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("Calling 'get_summary' tool...")
print("="*80)
summary_args = json.dumps({
    "collection_name": "my_collection",
    "file_name": "document_name.pdf",
    "blocking": False,
    "timeout": 60,
})
subprocess.run([
    sys.executable,
    "-m",
    "nvidia_rag.utils.mcp.mcp_client",
    "call",
    "--transport=sse",
    f"--url={SSE_URL}",
    "--tool=get_summary",
    f"--json-args={summary_args}",
], stderr=subprocess.DEVNULL)



Listing available tools...
generate: Generate an answer using NVIDIA RAG (optionally with knowledge base). Provide chat messages and optional generation parameters.
search: Search the vector database and return citations for a given query.
get_summary: Retrieve the pre-generated summary for a document from a collection. Set blocking=true to wait up to timeout seconds for summary generation.

Calling 'generate' tool...
{
  "meta": null,
  "content": [
    {
      "type": "text",
      "text": "data: {\"id\":\"f44dcb6f-ba40-4793-bf69-fb5a1a1e413c\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"content\":\"Collection my_collection does not exist. Ensure a collection is created using POST /collection endpoint first and documents are uploaded using POST /document endpoint\"},\"delta\":{\"role\":null,\"content\":\"Collection my_collection does not exist. Ensure a collection is created using POST /collection endpoint first and documents are uploaded using POST /document endp

CompletedProcess(args=['/home/niyati/anaconda3/bin/python', '-m', 'nvidia_rag.utils.mcp.mcp_client', 'call', '--transport=sse', '--url=http://127.0.0.1:8000', '--tool=get_summary', '--json-args={"collection_name": "my_collection", "file_name": "document_name.pdf", "blocking": false, "timeout": 60}'], returncode=0)

## Launch MCP Server (streamable_http)

Purpose:
Start the MCP server locally using the **streamable_http** transport so that the client can connect and call tools.

This launches the MCP server with FastMCP's streamable-http support:
- Uses your configured API key (NVCF or NVIDIA) from the environment.
- Runs `nvidia_rag.utils.mcp.mcp_server` with `--transport streamable_http`.

**Note:** Run this cell once before executing the subsequent streamable_http client cells that list and call MCP tools.



In [None]:
# Kill any process listening on port 8000 to avoid port conflicts
import subprocess

PORT = 8000
print(f"Kill any process running on port {PORT} to start streamable_http server in the next cell")

# Try fuser first (common on Linux)
try:
    subprocess.run(["fuser", "-k", f"{PORT}/tcp"], check=False)
except FileNotFoundError:
    print("'fuser' not found, skipping fuser-based cleanup.")
except Exception as e:
    print(f"Error while running fuser: {e}")

In [16]:
# Start streamable_http MCP server in the background
stream_proc = None
try:
    env = dict(os.environ)
    env["LOGLEVEL"] = "ERROR"
    if API_KEY:
        env["NVCF_API_KEY"] = API_KEY
        env.setdefault("NVIDIA_API_KEY", API_KEY)
    if NVCF_ORG_ID: env["NVCF_ORG_ID"] = NVCF_ORG_ID
    if NVCF_TEAM_ID: env["NVCF_TEAM_ID"] = NVCF_TEAM_ID
    if NVCF_REGION: env["NVCF_REGION"] = NVCF_REGION
    if NVCF_BASE_URL: env["NVCF_BASE_URL"] = NVCF_BASE_URL

    cmd = [
        sys.executable,
        "-m",
        "nvidia_rag.utils.mcp.mcp_server",
        "--transport",
        "streamable_http",
    ]
    if API_KEY:
        cmd += ["--api-key", API_KEY]

    print("Launching streamable_http server:", " ".join(cmd))
    stream_proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    atexit.register(lambda: stream_proc and stream_proc.poll() is None and stream_proc.terminate())
    time.sleep(2.0)
    print("streamable_http server PID:", stream_proc.pid)
except Exception as e:
    print("Failed to start streamable_http server:", e)



Launching streamable_http server: /home/niyati/anaconda3/bin/python -m nvidia_rag.utils.mcp.mcp_server --transport streamable_http
streamable_http server PID: 1907814


## Connect with MCP Client (streamable_http), List Tools, and Call MCP Tools

Purpose:
Verify connectivity by listing available tools and invoking the MCP tools (`generate`, `search`, `get_summary`) using the streamable_http transport.

This uses the `mcp_client.py` CLI to connect over streamable_http, list tools, and invoke the RAG tools.

**Note:** Ensure the streamable_http server is running from the cell above before executing this cell.



In [17]:
import json
import subprocess

# Streamable HTTP connection configuration
STREAM_URL = "http://127.0.0.1:8000"  # base URL; client normalizes to /mcp

common_args = [sys.executable, "-m", "nvidia_rag.utils.mcp.mcp_client"]

print("="*80)
print("[streamable_http] Listing available tools...")
print("="*80)
list_args = common_args + [
    "list",
    "--transport",
    "streamable_http",
    "--url",
    STREAM_URL,
]
if API_KEY:
    list_args += ["--header", f"Authorization: Bearer {API_KEY}"]
subprocess.run(list_args, stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("[streamable_http] Calling 'generate' tool...")
print("="*80)
generate_args_payload = json.dumps({
    "messages": [{"role": "user", "content": "Hello from streamable_http demo"}],
    "collection_name": "my_collection",
})
call_generate_args = common_args + [
    "call",
    "--transport",
    "streamable_http",
    "--url",
    STREAM_URL,
    "--tool",
    "generate",
    "--json-args",
    generate_args_payload,
]
if API_KEY:
    call_generate_args += ["--header", f"Authorization: Bearer {API_KEY}"]
subprocess.run(call_generate_args, stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("[streamable_http] Calling 'search' tool...")
print("="*80)
search_args_payload = json.dumps({
    "query": "Tell me about Robert Frost's poems",
    "collection_name": "my_collection",
    "reranker_top_k": 2,
    "vdb_top_k": 5,
    "enable_query_rewriting": False,
    "enable_reranker": True,
})
call_search_args = common_args + [
    "call",
    "--transport",
    "streamable_http",
    "--url",
    STREAM_URL,
    "--tool",
    "search",
    "--json-args",
    search_args_payload,
]
if API_KEY:
    call_search_args += ["--header", f"Authorization: Bearer {API_KEY}"]
subprocess.run(call_search_args, stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("[streamable_http] Calling 'get_summary' tool...")
print("="*80)
summary_args_payload = json.dumps({
    "collection_name": "my_collection",
    "file_name": "document_name.pdf",
    "blocking": False,
    "timeout": 60,
})
call_summary_args = common_args + [
    "call",
    "--transport",
    "streamable_http",
    "--url",
    STREAM_URL,
    "--tool",
    "get_summary",
    "--json-args",
    summary_args_payload,
]
if API_KEY:
    call_summary_args += ["--header", f"Authorization: Bearer {API_KEY}"]
subprocess.run(call_summary_args, stderr=subprocess.DEVNULL)



[streamable_http] Listing available tools...
generate: Generate an answer using NVIDIA RAG (optionally with knowledge base). Provide chat messages and optional generation parameters.
search: Search the vector database and return citations for a given query.
get_summary: Retrieve the pre-generated summary for a document from a collection. Set blocking=true to wait up to timeout seconds for summary generation.

[streamable_http] Calling 'generate' tool...
{
  "meta": null,
  "content": [
    {
      "type": "text",
      "text": "data: {\"id\":\"4d45f450-1f6a-4b9c-bb2e-983dea8d65b8\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"content\":\"Collection my_collection does not exist. Ensure a collection is created using POST /collection endpoint first and documents are uploaded using POST /document endpoint\"},\"delta\":{\"role\":null,\"content\":\"Collection my_collection does not exist. Ensure a collection is created using POST /collection endpoint first and documents ar

CompletedProcess(args=['/home/niyati/anaconda3/bin/python', '-m', 'nvidia_rag.utils.mcp.mcp_client', 'call', '--transport', 'streamable_http', '--url', 'http://127.0.0.1:8000', '--tool', 'get_summary', '--json-args', '{"collection_name": "my_collection", "file_name": "document_name.pdf", "blocking": false, "timeout": 60}'], returncode=0)

## Launch Server via Client (stdio), List Tools, and Call MCP Tools

Purpose:
Demonstrate launching the server via stdio transport, listing tools, and calling the MCP tools (`generate`, `search`, `get_summary`), forwarding the API key to the child server process.

This uses the `mcp_client.py` CLI to launch the server via stdio and invoke the RAG tools. The API key is forwarded via server arguments.


In [None]:
import json
import subprocess

# Configure stdio command and arguments
STDIO_CMD = sys.executable
STDIO_ARGS = "-m nvidia_rag.utils.mcp.mcp_server"
if API_KEY:
    STDIO_ARGS += f" --api-key {API_KEY}"

print("="*80)
print("Listing available tools via stdio...")
print("="*80)
subprocess.run([sys.executable, "-m", "nvidia_rag.utils.mcp.mcp_client", "list", "--transport=stdio", f"--command={STDIO_CMD}", f"--args={STDIO_ARGS}"], stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("Calling 'generate' tool via stdio...")
print("="*80)
generate_args = json.dumps({
    "messages": [{"role": "user", "content": "Hello from stdio demo"}],
    "collection_name": "my_collection",
})
subprocess.run([sys.executable, "-m", "nvidia_rag.utils.mcp.mcp_client", "call", "--transport=stdio", f"--command={STDIO_CMD}", f"--args={STDIO_ARGS}", "--tool=generate", f"--json-args={generate_args}"], stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("Calling 'search' tool via stdio...")
print("="*80)
search_args = json.dumps({
    "query": "Tell me about Robert Frost's poems",
    "collection_name": "my_collection",
    "reranker_top_k": 2,
    "vdb_top_k": 5,
    "enable_query_rewriting": False,
    "enable_reranker": True,
})
subprocess.run([sys.executable, "-m", "nvidia_rag.utils.mcp.mcp_client", "call", "--transport=stdio", f"--command={STDIO_CMD}", f"--args={STDIO_ARGS}", "--tool=search", f"--json-args={search_args}"], stderr=subprocess.DEVNULL)

print("\n" + "="*80)
print("Calling 'get_summary' tool via stdio...")
print("="*80)
summary_args = json.dumps({
    "collection_name": "my_collection",
    "file_name": "document_name.pdf",
    "blocking": False,
    "timeout": 60,
})
subprocess.run([sys.executable, "-m", "nvidia_rag.utils.mcp.mcp_client", "call", "--transport=stdio", f"--command={STDIO_CMD}", f"--args={STDIO_ARGS}", "--tool=get_summary", f"--json-args={summary_args}"], stderr=subprocess.DEVNULL)



Listing available tools via stdio...
generate: Generate an answer using NVIDIA RAG (optionally with knowledge base). Provide chat messages and optional generation parameters.
search: Search the vector database and return citations for a given query.
get_summary: Retrieve the pre-generated summary for a document from a collection. Set blocking=true to wait up to timeout seconds for summary generation.

Calling 'generate' tool via stdio...
{
  "meta": null,
  "content": [
    {
      "type": "text",
      "text": "data: {\"id\":\"82284346-d3af-450b-bf6f-fff2537f5034\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"content\":\"Collection my_collection does not exist. Ensure a collection is created using POST /collection endpoint first and documents are uploaded using POST /document endpoint\"},\"delta\":{\"role\":null,\"content\":\"Collection my_collection does not exist. Ensure a collection is created using POST /collection endpoint first and documents are uploaded using

## Cleanup & Troubleshooting

Purpose:
Wrap up the session, stop background processes, and provide guidance for common errors (401/404) and environment/version mismatches.

- To stop the SSE server started above, restart the kernel or run the cell that terminates the `sse_proc`.
- If you see 401 Unauthorized:
  - Ensure your API key is valid and has access to the configured model endpoint.
  - For NVCF deployments, set `NVCF_API_KEY`, and usually `NVCF_ORG_ID`, `NVCF_TEAM_ID`, `NVCF_REGION`, `NVCF_BASE_URL`.
  - In stdio mode, forward env with `--env VAR=...` or use the server `--api-key` flag.
- If SSE returns 404, ensure you're connecting to the base URL (the client probes standard SSE endpoints).
- Ensure versions of `mcp`, `anyio`, and `uvicorn` match your environment constraints.
