#Tool-calling Agent

This is an auto-generated notebook created by an AI playground export. In this notebook, you will:
- Author a tool-calling [MLflow's `ResponsesAgent`](https://mlflow.org/docs/latest/api_reference/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ResponsesAgent) that uses the OpenAI client
- Manually test the agent's output
- Evaluate the agent with Mosaic AI Agent Evaluation
- Log and deploy the agent

This notebook should be run on serverless or a cluster with DBR<17.

 **_NOTE:_**  This notebook uses the OpenAI SDK, but AI Agent Framework is compatible with any agent authoring framework, including LlamaIndex or LangGraph. To learn more, see the [Authoring Agents](https://docs.databricks.com/generative-ai/agent-framework/author-agent) Databricks documentation.



In [0]:
%pip install -U -qqqq backoff databricks-openai uv databricks-agents mlflow-skinny[databricks]
dbutils.library.restartPython()

In [0]:
import mlflow
display(mlflow.__version__)

## Define the agent in code
Below we define our agent code in a single cell, enabling us to easily write it to a local Python file for subsequent logging and deployment using the `%%writefile` magic command.

For more examples of tools to add to your agent, see [docs](https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html).

In [0]:
%%writefile agent.py
import json
from typing import Any, Callable, Generator, Optional
from uuid import uuid4
import warnings

import backoff
import mlflow
import openai
from databricks.sdk import WorkspaceClient
from databricks_openai import UCFunctionToolkit, VectorSearchRetrieverTool
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel
from unitycatalog.ai.core.base import get_uc_function_client

############################################
# Define your LLM endpoint and system prompt
############################################
# LLM_ENDPOINT_NAME = "databricks-claude-sonnet-4-5"
LLM_ENDPOINT_NAME = "databricks-gpt-5"
INDEX_NAME = "fnma_product_catalog_jcg.default.product_catalog_vector_index" 

SYSTEM_PROMPT = """Role

You are an intelligent assistant designed to help business and data users discover the most relevant data products from a vector search index that contains embeddings of data product metadata, schemas, and descriptions.

Your job is to:
	•	Interpret the user's query.
	•	Search the vector database for relevant data products.
	•	Rank and summarize up to three of the most relevant ones.
	•	Present the results in a structured, business-friendly JSON format.

⸻

Core Objectives
	1.	Understand Intent:
Determine what kind of data the user is seeking (e.g., borrower info, credit risk, transactions).
	2.	Perform Vector Search:
Use the query to find semantically relevant data products.
3.	Rank Results by Relevance & Quality:
Rank retrieved products using:
	•	Semantic Relevance (40%) — closeness to the query
	•	Data Completeness (25%) — coverage of relevant attributes
	•	Freshness (20%) — based on last update or processing date
	•	Business Value (10%) — usefulness for decision-making
	•	Ownership Transparency (5%) — presence of a known owner/contact
4.	Summarize Each Data Product:
Provide a concise but meaningful business summary, including:
	•	Description
	•	Key fields
	•	Owner
	•	Freshness
	•	Link to the Unity Catalog for access
5.	Output in a clean, structured JSON template suitable for both human and system consumption. Only Output the json response.

⸻

Output Format Template

The response must always follow this structured format:

{
  "query_understanding": "<Short summary of what the user is asking for>",
  "search_query": "<The query text used in vector search>",
  "results": [
    {
      "rank": 1,
      "data_product_name": "<name of the data product>",
      "full_identifier": "<Populate as is from Product_Name column which is a metadata column in the vector search index table. Do not change the value>",
      "table_names": "<Populate as is from table_names column which is a metadata column in the vector search index table. Do not change the value>",
      "description": "<business-friendly explanation of what this data product contains and how it's used. Be concise and limit in max 3 sentences>",
      "completeness_score": "<High | Medium | Low>"
    },
    {
      "rank": 2,
      ...
    },
    {
      "rank": 3,
      ...
    }
  ],
  "recommended_action": "<Business-friendly suggestion for how to use the top data product(s) together or in context>"
}

Style and Tone
	•	For the metadata generation, use the exact data available in the vector search, do not make stuff up.
        •	Write in clear, executive-level English suitable for data analysts, product managers, or business stakeholders.
	•	Keep technical details concise and readable.
	•	Use simple explanations for field names and table purposes.
	•	Limit output to no more than three results to avoid cognitive overload.
"""


###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html
###############################################################################
class ToolInfo(BaseModel):
    """
    Class representing a tool for the agent.
    - "name" (str): The name of the tool.
    - "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
    - "exec_fn" (Callable): Function that implements the tool logic
    """

    name: str
    spec: dict
    exec_fn: Callable


def create_tool_info(tool_spec, exec_fn_param: Optional[Callable] = None):
    tool_spec["function"].pop("strict", None)
    tool_name = tool_spec["function"]["name"]
    udf_name = tool_name.replace("__", ".")

    # Define a wrapper that accepts kwargs for the UC tool call,
    # then passes them to the UC tool execution client
    def exec_fn(**kwargs):
        function_result = uc_function_client.execute_function(udf_name, kwargs)
        if function_result.error is not None:
            return function_result.error
        else:
            return function_result.value
    return ToolInfo(name=tool_name, spec=tool_spec, exec_fn=exec_fn_param or exec_fn)


TOOL_INFOS = []

# You can use UDFs in Unity Catalog as agent tools
UC_TOOL_NAMES = []

uc_toolkit = UCFunctionToolkit(function_names=UC_TOOL_NAMES)
uc_function_client = get_uc_function_client()
for tool_spec in uc_toolkit.tools:
    TOOL_INFOS.append(create_tool_info(tool_spec))


# Use Databricks vector search indexes as tools
# See [docs](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html) for details

# Use Databricks vector search indexes as tools
# See the [Databricks Documentation](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html) for details
VECTOR_SEARCH_TOOLS = []
VECTOR_SEARCH_TOOLS.append(
        VectorSearchRetrieverTool(
            index_name=INDEX_NAME,
            columns=["Product_Name", "Description", "table_names","TAG_NAME","TAG_VALUE"],
            # TODO: specify index description for better agent tool selection
            tool_description="Use this vector search index tool to search and identify relevant schema descriptions, data definitions, and domain context across mortgage, housing, property, servicing, fraud, investor, and credit-risk datasets. It helps the agent quickly retrieve structured knowledge for data discovery, analytics reasoning, and answering domain-specific questions"
        )
    )

for vs_tool in VECTOR_SEARCH_TOOLS:
    TOOL_INFOS.append(create_tool_info(vs_tool.tool, vs_tool.execute))



class ToolCallingAgent(ResponsesAgent):
    """
    Class representing a tool-calling Agent
    """

    def __init__(self, llm_endpoint: str, tools: list[ToolInfo]):
        """Initializes the ToolCallingAgent with tools."""
        self.llm_endpoint = llm_endpoint
        self.workspace_client = WorkspaceClient()
        self.model_serving_client: OpenAI = (
            self.workspace_client.serving_endpoints.get_open_ai_client()
        )
        self._tools_dict = {tool.name: tool for tool in tools}

    def get_tool_specs(self) -> list[dict]:
        """Returns tool specifications in the format OpenAI expects."""
        return [tool_info.spec for tool_info in self._tools_dict.values()]

    @mlflow.trace(span_type=SpanType.TOOL)
    def execute_tool(self, tool_name: str, args: dict) -> Any:
        """Executes the specified tool with the given arguments."""
        return self._tools_dict[tool_name].exec_fn(**args)

    def call_llm(self, messages: list[dict[str, Any]]) -> Generator[dict[str, Any], None, None]:
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", message="PydanticSerializationUnexpectedValue")
            for chunk in self.model_serving_client.chat.completions.create(
                model=self.llm_endpoint,
                messages=self.prep_msgs_for_cc_llm(messages),
                tools=self.get_tool_specs(),
                stream=True,
            ):
                chunk_dict = chunk.to_dict()
                if len(chunk_dict.get("choices", [])) > 0:
                    yield chunk_dict

    def handle_tool_call(
        self,
        tool_call: dict[str, Any],
        messages: list[dict[str, Any]],
    ) -> ResponsesAgentStreamEvent:
        """
        Execute tool calls, add them to the running message history, and return a ResponsesStreamEvent w/ tool output
        """
        args = json.loads(tool_call["arguments"])
        result = str(self.execute_tool(tool_name=tool_call["name"], args=args))

        tool_call_output = self.create_function_call_output_item(tool_call["call_id"], result)
        messages.append(tool_call_output)
        return ResponsesAgentStreamEvent(type="response.output_item.done", item=tool_call_output)

    def call_and_run_tools(
        self,
        messages: list[dict[str, Any]],
        max_iter: int = 10,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        for _ in range(max_iter):
            last_msg = messages[-1]
            if last_msg.get("role", None) == "assistant":
                return
            elif last_msg.get("type", None) == "function_call":
                yield self.handle_tool_call(last_msg, messages)
            else:
                yield from self.output_to_responses_items_stream(
                    chunks=self.call_llm(messages), aggregator=messages
                )

        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item("Max iterations reached. Stopping.", str(uuid4())),
        )

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
        return ResponsesAgentResponse(output=outputs, custom_outputs=request.custom_inputs)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        messages = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input])
        if SYSTEM_PROMPT:
            messages.insert(0, {"role": "system", "content": SYSTEM_PROMPT})
        yield from self.call_and_run_tools(messages=messages)


# Log the model using MLflow
mlflow.openai.autolog()
AGENT = ToolCallingAgent(llm_endpoint=LLM_ENDPOINT_NAME, tools=TOOL_INFOS)
mlflow.models.set_model(AGENT)

## Test the agent

Interact with the agent to test its output. Since we manually traced methods within `ResponsesAgent`, you can view the trace for each step the agent takes, with any LLM calls made via the OpenAI SDK automatically traced by autologging.

Replace this placeholder input with an appropriate domain-specific example for your agent.

In [0]:
import os, sys

# # If needed, replace with the directory that actually contains agent.py
# agent_dir = "/Workspace/Users/joy.garnett@databricks.com/Customer Questions/FNMA/App Setup Github Ready/"  

# if agent_dir not in sys.path:
#     sys.path.append(agent_dir)

from agent import AGENT

AGENT.predict({"input": [{"role": "user", "content": "borrowers"}]})

In [0]:
for chunk in AGENT.predict_stream(
    {"input": [{"role": "user", "content": "borower data"}]}
):
    print(chunk.model_dump(exclude_none=True))

### Log the `agent` as an MLflow model
Determine Databricks resources to specify for automatic auth passthrough at deployment time
- **TODO**: If your Unity Catalog Function queries a [vector search index](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html) or leverages [external functions](https://docs.databricks.com/generative-ai/agent-framework/external-connection-tools.html), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See [docs](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) for more details.

Log the agent as code from the `agent.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code).

In [0]:
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
import mlflow
from agent import UC_TOOL_NAMES, VECTOR_SEARCH_TOOLS, LLM_ENDPOINT_NAME
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint
from pkg_resources import get_distribution

resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]
for tool in VECTOR_SEARCH_TOOLS:
    resources.extend(tool.resources)
for tool_name in UC_TOOL_NAMES:
    # TODO: If the UC function includes dependencies like external connection or vector search, please include them manually.
    # See the TODO in the markdown above for more information.    
    resources.append(DatabricksFunction(function_name=tool_name))

input_example = {
    "input": [
        {
            "role": "user",
            "content": "identify data that contains borrowers profiles"
        }
    ]
}

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="agent",
        python_model="agent.py",
        input_example=input_example,
        pip_requirements=[
            "databricks-openai",
            "backoff",
            f"databricks-connect=={get_distribution('databricks-connect').version}",
        ],
        resources=resources,
    )

## Evaluate the agent with [Agent Evaluation](https://docs.databricks.com/mlflow3/genai/eval-monitor)

You can edit the requests or expected responses in your evaluation dataset and run evaluation as you iterate your agent, leveraging mlflow to track the computed quality metrics.

Evaluate your agent with one of our [predefined LLM scorers](https://docs.databricks.com/mlflow3/genai/eval-monitor/predefined-judge-scorers), or try adding [custom metrics](https://docs.databricks.com/mlflow3/genai/eval-monitor/custom-scorers).

In [0]:
import mlflow
from mlflow.genai.scorers import RelevanceToQuery, Safety, RetrievalRelevance, RetrievalGroundedness

eval_dataset = [
    {
        "inputs": {
            "input": [
                {
                    "role": "system",
                    "content": "Role\n\nYou are an intelligent assistant designed to help business and data users discover the most relevant data products from a vector search index that contains embeddings of data product metadata, schemas, and descriptions.\n\nYour job is to:\n\t•\tInterpret the user’s query.\n\t•\tSearch the vector database for relevant data products.\n\t•\tRank and summarize up to three of the most relevant ones.\n\t•\tPresent the results in a structured, business-friendly JSON format.\n\n⸻\n\nCore Objectives\n\t1.\tUnderstand Intent:\nDetermine what kind of data the user is seeking (e.g., borrower info, credit risk, transactions).\n\t2.\tPerform Vector Search:\nUse the query to find semantically relevant data products.\n3.\tRank Results by Relevance & Quality:\nRank retrieved products using:\n\t•\tSemantic Relevance (40%) — closeness to the query\n\t•\tData Completeness (25%) — coverage of relevant attributes\n\t•\tFreshness (20%) — based on last update or processing date\n\t•\tBusiness Value (10%) — usefulness for decision-making\n\t•\tOwnership Transparency (5%) — presence of a known owner/contact\n4.\tSummarize Each Data Product:\nProvide a concise but meaningful business summary, including:\n\t•\tDescription\n\t•\tKey fields\n\t•\tOwner\n\t•\tFreshness\n\t•\tLink to the Unity Catalog for access\n5.\tOutput in a clean, structured JSON template suitable for both human and system consumption. Only Output the json response.\n\n⸻\n\nOutput Format Template\n\nThe response must always follow this structured format:\n\n{\n  \"query_understanding\": \"<Short summary of what the user is asking for>\",\n  \"search_query\": \"<The query text used in vector search>\",\n  \"results\": [\n    {\n      \"rank\": 1,\n      \"data_product_name\": \"<name of the data product>\",\n      \"full_identifier\": \"<fully qualified name or URI>\",\n      \"type\": \"<table | stream | API | file | dashboard>\",\n      \"description\": \"<business-friendly explanation of what this data product contains and how it's used>\",\n      \"key_fields\": [\n        {\"field_name\": \"<name>\", \"type\": \"<type>\", \"description\": \"<short business meaning>\"},\n        ...\n      ],\n      \"owner\": \"<owner name or email>\",\n      \"last_updated\": \"<timestamp or relative freshness indicator>\",\n      \"completeness_score\": \"<High | Medium | Low>\",\n      \"business_value_summary\": \"<summary of why this data product matters and how it can be used>\",\n      \"unity_catalog_url\": \"<https://...>\"\n    },\n    {\n      \"rank\": 2,\n      ...\n    },\n    {\n      \"rank\": 3,\n      ...\n    }\n  ],\n  \"recommended_action\": \"<Business-friendly suggestion for how to use the top data product(s) together or in context>\"\n}\n\nStyle and Tone\n\t•\tFor the metadata generation, use the exact data available in the vector search, do not make stuff up.\n        •\tWrite in clear, executive-level English suitable for data analysts, product managers, or business stakeholders.\n\t•\tKeep technical details concise and readable.\n\t•\tUse simple explanations for field names and table purposes.\n\t•\tLimit output to no more than three results to avoid cognitive overload.\n\t•\tAlways include ownership and access link (Unity Catalog URL) if available."
                },
                {
                    "role": "user",
                    "content": "identify data that contains borrowers profiles"
                }
            ]
        },
        "expected_response": None
    }
]

eval_results = mlflow.genai.evaluate(
    data=eval_dataset,
    predict_fn=lambda input: AGENT.predict({"input": input}),
    scorers=[RelevanceToQuery(), Safety()], # add more scorers here if they're applicable
)

# Review the evaluation results in the MLfLow UI (see console output)

## Perform pre-deployment validation of the agent
Before registering and deploying the agent, we perform pre-deployment checks via the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See [documentation](https://docs.databricks.com/machine-learning/model-serving/model-serving-debug.html#validate-inputs) for details

In [0]:
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"input": [{"role": "user", "content": "borrower"}]},
    env_manager="uv",
)

## Register the model to Unity Catalog

Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog.

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = "fnma_product_catalog_jcg" 
schema = "default" 
model_name = "demo_data_dicovery_rag"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

## Deploy the agent

Wait 15 minutes while the endpoint is being created and the endpoint is updating. 


In [0]:
from databricks import agents
agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags = {"endpointSource": "playground"}, deploy_feedback_model = False)

## Creating an endpoint

This is where you wait 15 minutes after running the previous code set before continuing. 

In [0]:
from mlflow.deployments import get_deploy_client
from mlflow.exceptions import MlflowException
from requests.exceptions import HTTPError

client = get_deploy_client("databricks")

ENDPOINT_NAME = "jcg_fmna_data_endpoint"

endpoint_config = {
    "served_entities": [{
        "entity_name": UC_MODEL_NAME,
        "entity_version": str(uc_registered_model_info.version),
        "workload_size": "Small",
        "scale_to_zero_enabled": True,
    }],
    "traffic_config": {
        "routes": [{
            "served_model_name": f"{model_name}-{uc_registered_model_info.version}",
            "traffic_percentage": 100,
        }]
    },
}

def get_or_create_serving_endpoint():
    try:
        # Try to create the endpoint
        endpoint = client.create_endpoint(
            name=ENDPOINT_NAME,
            config=endpoint_config,
        )
        print(f"Created endpoint: {ENDPOINT_NAME}")
        return endpoint
    except (HTTPError, MlflowException) as e:
        # If it already exists, fall back to get_endpoint; otherwise re-raise
        msg = str(e)
        if "ALREADY_EXISTS" not in msg and "RESOURCE_ALREADY_EXISTS" not in msg:
            # Adjust checks if your workspace uses different error codes/messages
            raise

        print(f"Endpoint {ENDPOINT_NAME} already exists, using existing one.")
        return client.get_endpoint(endpoint=ENDPOINT_NAME)

endpoint = get_or_create_serving_endpoint()

In [0]:
from databricks import agents

def safe_deploy_agent(model_name: str, model_version: int, **kwargs):
    try:
        deployment = agents.deploy(
            model_name,
            str(model_version),
            **kwargs,
        )
        print("Deployed (or updated) endpoint:", deployment.endpoint_name)
        return deployment
    except ValueError as e:
        msg = str(e)
        # Adjust substring if your message text differs slightly
        if "already serves model" not in msg:
            raise

        print("Endpoint already serves this model/version; reusing existing deployment.")
        # Reconstruct the expected endpoint name if you want, or just return None
        # Often agents.deploy returns a Deployment with a query_endpoint; since it
        # already exists, you may already know its name:
        #   endpoint_name = f"agents_{model_name.replace('.', '_')}"
        return None

deployment = safe_deploy_agent(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    tags={"endpointSource": "playground"},
    streaming_enabled=True,
)


## Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See [docs](https://docs.databricks.com/generative-ai/deploy-agent.html) for details