# Mosaic AI Agent Framework: Author and deploy an MCP tool-calling OpenAI Responses API agent

https://docs.databricks.com/aws/en/notebooks/source/generative-ai/openai-mcp-tool-calling-agent.html

This notebook shows how to author an OpenAI agent that connects to MCP servers hosted on Databricks. You can connect to Databricks-managed MCP servers, custom MCP servers hosted as a Databricks App, or both simultaneously. To learn more about these options, see [MCP on Databricks](https://docs.databricks.com/aws/en/generative-ai/mcp/).


This notebook uses the [`ResponsesAgent`](https://mlflow.org/docs/latest/api_reference/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ResponsesAgent) interface for compatibility with Mosaic AI features. In this notebook you learn to:

- Author an [Open AI Responses API](https://platform.openai.com/docs/api-reference/responses) agent (wrapped with `ResponsesAgent`) that calls MCP tools
- Manually test the agent
- Evaluate the agent using Mosaic AI Agent Evaluation
- Log and deploy the agent

To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/author-agent) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/create-chat-model)).

## Prerequisites
- Address all `TODO`s in this notebook.

In [0]:
%pip install -U -qqqq backoff databricks-openai openai uv databricks-agents>=1.0.0 databricks-mcp
dbutils.library.restartPython()


## Define the agent in code
Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the `%%writefile` magic command, for subsequent logging and deployment.

#### Agent tools
This agent code adds the built-in Unity Catalog function `system.ai.python_exec` to the agent. The agent code also includes commented-out sample code for adding a vector search index to perform unstructured data retrieval.

For more examples of tools to add to your agent, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-tool) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/agent-tool))


In [0]:
%%writefile agent.py
import asyncio
import json
import os
import warnings
from typing import Any, Callable, Generator, List, Optional
from uuid import uuid4

import backoff
import mlflow
import nest_asyncio
import openai

from databricks.sdk import WorkspaceClient
from databricks_mcp import DatabricksMCPClient, DatabricksOAuthClientProvider
from databricks_ai_bridge import ModelServingUserCredentials

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client as connect
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel


nest_asyncio.apply()

############################################
## Define your LLM endpoint and system prompt
############################################
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"

# TODO: Update with your system prompt
SYSTEM_PROMPT = """
You are a helpful assistant that have access to tools.
"""

###############################################################################
# Configure Custom Tools for your agent
###############################################################################

# TODO:Define execution function and function spec for each custom tool

@mlflow.trace(name="parse_inputs", span_type=SpanType.TOOL)
def _parse_inputs(s: str) -> dict:
    a_str, b_str = s.strip().split()
    return {"a": int(a_str), "b": int(b_str)}

@mlflow.trace(name="add_numbers", span_type=SpanType.TOOL)
def _add_numbers(p: dict) -> dict:
    return {"sum": p["a"] + p["b"]}

@mlflow.trace(name="double_result", span_type=SpanType.TOOL)
def _double_result(p: dict) -> int:
    return p["sum"] * 2

def streaming_adder_exec(**kwargs):
    """Generator that streams each step, then returns final summary."""
    text = kwargs.get("text", "").strip()
    p1 = _parse_inputs(text); yield f"parsed: {p1}"
    p2 = _add_numbers(p1);    yield f"sum: {p2['sum']}"
    out = _double_result(p2); yield f"double: {out}"
    return f"parsed: {p1}\nsum: {p2['sum']}\ndouble: {out}"

STREAMING_ADDER_SPEC = {
    "type": "function",
    "function": {
        "name": "streaming_adder",
        "description": "Parse 'a b', add, then double the result.",
        "parameters": {
            "type": "object",
            "properties": {"text": {"type": "string"}},
            "required": ["text"],
        },
    },
}
###############################################################################
# Configure MCP Servers for your agent
# This section sets up server connections so your agent can retrieve data or take actions.
###############################################################################

#### TODO: Choose your MCP server connection type.

# ----- Simple: Managed MCP Server (no extra setup required) -----
# Uses your Databricks Workspace settings and Personal Access Token (PAT) auth.
# workspace_client = WorkspaceClient()

# Managed MCP Servers: Ready to use with default settings above
host = "https://e2-demo-field-eng.cloud.databricks.com"
MANAGED_MCP_SERVER_URLS = [
    f"{host}/api/2.0/mcp/functions/system/ai",
]

# ----- Advanced (optional): Custom MCP Server with OAuth -----
# For Databricks Apps hosting custom MCP servers, OAuth with a service principal is required.
# Uncomment and fill in your settings ONLY if connecting to a custom MCP server.

import os
workspace_client = WorkspaceClient(
    host= host,
    client_id=os.getenv("DATABRICKS_CLIENT_ID"),
    client_secret=os.getenv("DATABRICKS_CLIENT_SECRET"),
    auth_type="oauth-m2m"
)

# Add custom MCP servers here if needed
app_url ="https://databricks-mcp-server-1444828305810485.aws.databricksapps.com/mcp/"

# Custom MCP Servers: Add URLs below if needed (requires custom setup and OAuth above)
CUSTOM_MCP_SERVER_URLS = [
    app_url
    # e.g. "https://<custom-mcp-url>/mcp"
]


class ToolInfo(BaseModel):
    """
    Class representing a tool for the agent.
    - "name" (str): The name of the tool.
    - "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
    - "exec_fn" (Callable): Function that implements the tool logic
    """

    name: str
    spec: dict
    exec_fn: Callable


async def _run_custom_async(server_url: str, tool_name: str, ws: WorkspaceClient, **kwargs) -> str:
    """Executes a tool from a custom MCP server asynchronously using OAuth."""
    async with connect(server_url, auth=DatabricksOAuthClientProvider(ws)) as (
        read_stream,
        write_stream,
        _,
    ):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            response = await session.call_tool(tool_name, kwargs)
            return "".join([c.text for c in response.content])


async def get_custom_mcp_tools(ws: WorkspaceClient, server_url: str):
    """Retrieves the list of tools available from a custom MCP server."""
    async with connect(server_url, auth=DatabricksOAuthClientProvider(ws)) as (
        read_stream,
        write_stream,
        _,
    ):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tools_response = await session.list_tools()
            return tools_response.tools


async def create_mcp_tools(
    ws: WorkspaceClient, managed_server_urls: List[str] = None, custom_server_urls: List[str] = None
) -> List[ToolInfo]:
    """Aggregates all available tools from both managed and custom MCP servers into OpenAI-compatible ToolInfo objects."""
    tools = []

    #### TODO: uncomment these to use obo client for obo auth. ####
    #### use Ouath client for custom apps MCP server, or PAT for only managed MCP or no MCP ####
    # obo_ws_client = WorkspaceClient(credentials_strategy=ModelServingUserCredentials())
    # ws = obo_ws_client


    if managed_server_urls:
        for server_url in managed_server_urls:
            try:
                mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws)
                mcp_tools = mcp_client.list_tools()

                for mcp_tool in mcp_tools:
                    tool_spec = {
                        "type": "function",
                        "function": {
                            "name": mcp_tool.name,
                            "parameters": mcp_tool.inputSchema,
                        },
                        "description": mcp_tool.description or f"Tool: {mcp_tool.name}",
                    }

                    def create_managed_exec_fn(server_url, tool_name, ws):
                        def exec_fn(**kwargs):
                            client = DatabricksMCPClient(server_url=server_url, workspace_client=ws)
                            response = client.call_tool(tool_name, kwargs)
                            return "".join([c.text for c in response.content])

                        return exec_fn

                    exec_fn = create_managed_exec_fn(server_url, mcp_tool.name, ws)

                    tools.append(ToolInfo(name=mcp_tool.name, spec=tool_spec, exec_fn=exec_fn))
            except Exception as e:
                print(f"Error loading tools from managed server {server_url}: {e}")

    if custom_server_urls:
        for server_url in custom_server_urls:
            try:
                mcp_tools = await get_custom_mcp_tools(ws, server_url)

                for mcp_tool in mcp_tools:
                    tool_spec = {
                        "type": "function",
                        "function": {
                            "name": mcp_tool.name,
                            "parameters": mcp_tool.inputSchema,
                        },
                        "description": mcp_tool.description or f"Tool: {mcp_tool.name}",
                    }

                    def create_custom_exec_fn(server_url, tool_name, ws):
                        def exec_fn(**kwargs):
                            return asyncio.run(
                                _run_custom_async(server_url, tool_name, ws, **kwargs)
                            )

                        return exec_fn

                    exec_fn = create_custom_exec_fn(server_url, mcp_tool.name, ws)

                    tools.append(ToolInfo(name=mcp_tool.name, spec=tool_spec, exec_fn=exec_fn))
            except Exception as e:
                print(f"Error loading tools from custom server {server_url}: {e}")

    return tools


class MCPToolCallingAgent(ResponsesAgent):
    def __init__(self, llm_endpoint: str, tools: list[ToolInfo]):
        """Initializes the MCP Tool Calling Agent."""
        self.llm_endpoint = llm_endpoint
        self.workspace_client = workspace_client# WorkspaceClient()
        self.model_serving_client = self.workspace_client.serving_endpoints.get_open_ai_client()
        self._tools_dict = {tool.name: tool for tool in tools}

    def get_tool_specs(self) -> list[dict]:
        """Returns tool specifications in the format OpenAI expects."""
        return [tool_info.spec for tool_info in self._tools_dict.values()]

    @mlflow.trace(span_type=SpanType.TOOL)
    def execute_tool(self, tool_name: str, args: dict) -> Any:
        """Executes the specified tool with the given arguments."""
        return self._tools_dict[tool_name].exec_fn(**args)

    @backoff.on_exception(backoff.expo, openai.RateLimitError)
    @mlflow.trace(span_type=SpanType.LLM)
    def call_llm(self, messages: list[dict[str, Any]]) -> Generator[dict[str, Any], None, None]:
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", message="PydanticSerializationUnexpectedValue")
            for chunk in self.model_serving_client.chat.completions.create(
                model=self.llm_endpoint,
                messages=self.prep_msgs_for_cc_llm(messages),
                tools=self.get_tool_specs(),
                stream=True,
            ):
                yield chunk.to_dict()

    # def handle_tool_call(
    #     self, tool_call: dict[str, Any], messages: list[dict[str, Any]]
    # ) -> ResponsesAgentStreamEvent:
    #     """
    #     Execute tool calls, add them to the running message history, and return a ResponsesStreamEvent w/ tool output
    #     """
    #     args = json.loads(tool_call["arguments"])
    #     result = str(self.execute_tool(tool_name=tool_call["name"], args=args))

    #     tool_call_output = self.create_function_call_output_item(tool_call["call_id"], result)
    #     messages.append(tool_call_output)
    #     return ResponsesAgentStreamEvent(type="response.output_item.done", item=tool_call_output)

    # TODO: Updated function to handle tool outputs (this now returns a generator to accomodate step-by-step output from custom tool)
    def handle_tool_call(self, tool_call: dict[str, Any], messages: list[dict[str, Any]]) -> Generator[ResponsesAgentStreamEvent, None, None]:
        name, args, call_id = tool_call["name"], json.loads(tool_call["arguments"] or "{}"), tool_call["call_id"]
        result = self.execute_tool(name, args)
        import inspect
        if inspect.isgenerator(result):
            chunks = []
            try:
                while True:
                    chunk = next(result)
                    chunks.append(str(chunk))
                    yield ResponsesAgentStreamEvent(type="response.output_item.done", item=self.create_function_call_output_item(call_id, str(chunk)))
            except StopIteration as stop:
                final_text = stop.value if stop.value is not None else "\n".join(chunks)
        else:
            final_text = str(result)
        final_item = self.create_function_call_output_item(call_id, final_text)
        messages.append(
        final_item.model_dump() if hasattr(final_item, "model_dump") else final_item
    )
        yield ResponsesAgentStreamEvent(type="response.output_item.done", item=final_item)

    def call_and_run_tools(
        self,
        messages: list[dict[str, Any]],
        max_iter: int = 10,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        for _ in range(max_iter):
            last_msg = messages[-1]
            if last_msg.get("role", None) == "assistant":
                return
            elif last_msg.get("type", None) == "function_call":
                yield from self.handle_tool_call(last_msg, messages)
            else:
                yield from self.output_to_responses_items_stream(
                    chunks=self.call_llm(messages), aggregator=messages
                )

        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item("Max iterations reached. Stopping.", str(uuid4())),
        )

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
        return ResponsesAgentResponse(output=outputs, custom_outputs=request.custom_inputs)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        messages = [{"role": "system", "content": SYSTEM_PROMPT}] + [
            i.model_dump() for i in request.input
        ]

        yield from self.call_and_run_tools(messages)


# Create MCP tools from the configured servers
mcp_tools = asyncio.run(
    create_mcp_tools(
        ws=workspace_client,
        managed_server_urls=MANAGED_MCP_SERVER_URLS,
        custom_server_urls=CUSTOM_MCP_SERVER_URLS,
    )
)

#### TODO: add custom tool here ####
mcp_tools.append(ToolInfo(name="streaming_adder", spec=STREAMING_ADDER_SPEC, exec_fn=streaming_adder_exec))

# Log the model using MLflow
mlflow.openai.autolog()
AGENT = MCPToolCallingAgent(llm_endpoint=LLM_ENDPOINT_NAME, tools=mcp_tools)
mlflow.models.set_model(AGENT)

## Test the agent

Interact with the agent to test its output. Since we manually traced methods within `ResponsesAgent`, you can view the trace for each step the agent takes, with any LLM calls made via the OpenAI SDK automatically traced by autologging.

Replace this placeholder input with an appropriate domain-specific example for your agent.

In [0]:
dbutils.library.restartPython()

In [0]:
# ==============================================================================
# TODO: ONLY UNCOMMENT AND EDIT THIS SECTION IF YOU ARE USING OAUTH/SERVICE PRINCIPAL FOR CUSTOM MCP SERVERS.
#       For managed MCP (the default), LEAVE THIS SECTION COMMENTED OUT.
# ==============================================================================

# # Set your Databricks client ID and client secret for service principal authentication.
# DATABRICKS_CLIENT_ID = "<YOUR_CLIENT_ID>"
# client_secret_scope_name = "<YOUR_SECRET_SCOPE>"
# client_secret_key_name = "<YOUR_SECRET_KEY_NAME>"

# # Load your service principal credentials into environment variables
# os.environ["DATABRICKS_CLIENT_ID"] = DATABRICKS_CLIENT_ID
# os.environ["DATABRICKS_CLIENT_SECRET"] = dbutils.secrets.get(scope=client_secret_scope_name, key=client_secret_key_name)

import os
client_secret_scope_name = "cindy_demos"
client_secret_key_name = "cindy_demo_service_principal_secret"
client_id_key_name = "cindy_demo_service_principal_client"
os.environ["DATABRICKS_CLIENT_ID"] = dbutils.secrets.get(scope=client_secret_scope_name, key=client_id_key_name)
os.environ["DATABRICKS_CLIENT_SECRET"] = dbutils.secrets.get(scope=client_secret_scope_name, key=client_secret_key_name)

In [0]:
from agent import AGENT

result = AGENT.predict({"input": [{"role": "user", "content": "What is 6*7 in Python"}]})
print(result.model_dump(exclude_none=True))

In [0]:
import time
req = {"input": [{"role": "user", "content": "What is 6 7 with streaming adder"}]}
for ev in AGENT.predict_stream(req):
    if ev.type == "response.output_item.done":
        print('TIME:', time.time())
        print('EVENT:', ev.item)

In [0]:
for chunk in AGENT.predict_stream(
    {"input": [{"role": "user", "content": "6 7 with adder"}]}
):
    print(chunk.model_dump(exclude_none=True))

## Log the agent as an MLflow model

Log the agent as code from the `agent.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code).

### Enable automatic authentication for Databricks resources
For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling `mlflow.pyfunc.log_model().`

  - **TODO**: If your Unity Catalog tool queries a [vector search index](docs link) or leverages [external functions](docs link), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs ([AWS](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/log-agent#resources)).



In [0]:
import mlflow
from agent import MANAGED_MCP_SERVER_URLS, CUSTOM_MCP_SERVER_URLS, workspace_client, LLM_ENDPOINT_NAME
from mlflow.models.resources import DatabricksServingEndpoint, DatabricksFunction, DatabricksApp
from databricks_mcp import DatabricksOAuthClientProvider, DatabricksMCPClient
from databricks.sdk import WorkspaceClient
from pkg_resources import get_distribution

### Agent Auth (Agent is using a SP)
app_name = "databricks-mcp-server"

resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksApp(app_name=app_name)]

for mcp_server_url in MANAGED_MCP_SERVER_URLS:
    mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
    resources.extend(mcp_client.get_databricks_resources())

resources

In [0]:
with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="agent",
        python_model="agent.py",
        resources=resources,
        pip_requirements=[
            f"databricks-mcp=={get_distribution('databricks-mcp').version}",
            "backoff",
            f"mlflow=={get_distribution('mlflow').version}",
            f"mcp=={get_distribution('mcp').version}",
            f"databricks-openai=={get_distribution('databricks-openai').version}",
        ]
    )

## Evaluate the agent with Agent Evaluation

Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics.
See Databricks documentation ([AWS]((https://docs.databricks.com/aws/generative-ai/agent-evaluation) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-evaluation/)).


To evaluate your tool calls, add custom metrics. See Databricks documentation ([AWS](https://docs.databricks.com/en/generative-ai/agent-evaluation/custom-metrics.html#evaluating-tool-calls) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/custom-metrics#evaluating-tool-calls)).

In [0]:
import mlflow
from mlflow.genai.scorers import RelevanceToQuery, RetrievalGroundedness, RetrievalRelevance, Safety

eval_dataset = [
    {
        "inputs": {"input": [{"role": "user", "content": "what is 6 7 based on the adder tool."}]},
        "expected_response": "The final result is 26.",
    }
]

eval_results = mlflow.genai.evaluate(
    data=eval_dataset,
    predict_fn=lambda input: AGENT.predict({"input": input}),
    scorers=[RelevanceToQuery(), Safety()],  # add more scorers here if they're applicable
)

# Review the evaluation results in the MLfLow UI (see console output)

## Pre-deployment agent validation
Before registering and deploying the agent, perform pre-deployment checks using the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See Databricks documentation ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/model-serving-debug.html#validate-inputs) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/model-serving-debug#before-model-deployment-validation-checks)).

In [0]:
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"input": [{"role": "user", "content": "Hello!"}]},
    env_manager="uv",
)

## Register the model to Unity Catalog

Before you deploy the agent, you must register the agent to Unity Catalog.

- **TODO** Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog.

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = "cindy_demo_catalog"
schema = "default"
model_name = "mcp_tool_agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME)

## Deploy the agent

In [0]:
from databricks import agents

agents.deploy(
    UC_MODEL_NAME, 
    uc_registered_model_info.version,
    environment_vars={
    # ==============================================================================
    # TODO: ONLY UNCOMMENT AND CONFIGURE THE ENVIRONMENT_VARS SECTION BELOW
    #       IF YOU ARE USING OAUTH/SERVICE PRINCIPAL FOR CUSTOM MCP SERVERS.
    #       For managed MCP (the default), LEAVE THIS SECTION COMMENTED OUT.
    # ==============================================================================
        "DATABRICKS_CLIENT_ID": f"{{{{secrets/{client_secret_scope_name}/{client_id_key_name}}}}}",
        "DATABRICKS_CLIENT_SECRET": f"{{{{secrets/{client_secret_scope_name}/{client_secret_key_name}}}}}"
    },
    tags = {"endpointSource": "docs"}
)


## Test the agent endpoint

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See docs ([AWS](https://docs.databricks.com/en/generative-ai/deploy-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/deploy-agent)) for details

In [0]:
from databricks.sdk import WorkspaceClient
import warnings
warnings.filterwarnings("ignore")

endpoint = "agents_cindy_demo_catalog-default-mcp_tool_agent"

w = WorkspaceClient()  # Ensure databricks-sdk[openai] is installed
client = w.serving_endpoints.get_open_ai_client()

In [0]:
streaming_resp = client.responses.create(
    model=endpoint,
    stream= True,
    input=[{"role": "user", "content": "What is answer for 12 12 with streaming adder"}],
    extra_body = {"databricks_options": {"return_trace": True}} # returns detailed traces
)
for chunk in streaming_resp:
  if chunk.type == "response.output_item.done":
    if chunk.item.type=='function_call_output':
      print('TOOL OUTPUT:',chunk.item.output)
    if chunk.item.type=='message':
      print('AGENT OUPUT:',chunk.item.content[0].text)


In [0]:
streaming_resp = client.responses.create(
    model=endpoint,
    stream= True,
    input=[{"role": "user", "content": "what tools do you have"}],
    extra_body = {"databricks_options": {"return_trace": True}}
)
for chunk in streaming_resp:
  if chunk.type == "response.output_item.done":
    print(chunk)

In [0]:
chunk#.item.content[0].text