# Mosaic AI Agent Framework: Author and deploy a tool-calling DSPy agent (single turn)

This notebook shows how to author an DSPy agent and wrap it using the [`ResponsesAgent`](https://mlflow.org/docs/latest/api_reference/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ResponsesAgent) interface to make it compatible with Mosaic AI. In this notebook you learn to:

- Author a tool-calling DSPy agent wrapped with `ResponsesAgent`
- Manually test the agent's output
- Evaluate the agent using Mosaic AI Agent Evaluation
- Log and deploy the agent

To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/author-agent) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/create-chat-model)).

**Please note that the agent we build with this tutorial is single-turn, which means it doesn't pass in conversation history in subsequent LLM calls.**

## Prerequisites

- Address all `TODO`s in this notebook.

In [0]:
%pip install -U -qqqq dspy uv databricks-agents unitycatalog-ai mlflow-skinny[databricks]
dbutils.library.restartPython()


## Define the agent in code
Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the `%%writefile` magic command, for subsequent logging and deployment.

#### Agent tools
This agent code adds the built-in Unity Catalog function `system.ai.python_exec` to the agent.

For more examples of tools to add to your agent, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-tool) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/agent-tool)), and refer to [DSPy tool guide](https://dspy.ai/learn/programming/tools/) for how to convert your tool to DSPy tools.

#### Wrap the DSPy agent using the `ResponsesAgent` interface

Databricks recommends using `ResponsesAgent` as it simplifies authoring multi-turn conversational agents using an open source standard. See MLflow's [ResponsesAgent documentation](https://www.mlflow.org/docs/latest/llms/responses-agent-intro/).



In [0]:
%%writefile agent.py
import json
import re
from typing import Any, Generator
from uuid import NAMESPACE_DNS, uuid3, uuid4

import mlflow
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from unitycatalog.ai.core.base import get_uc_function_client
import dspy

############################################
# Define your LLM endpoint and system prompt
############################################
# TODO: Replace with your model serving endpoint, make sure you include the "databricks/" prefix.
LLM_ENDPOINT_NAME = "databricks/databricks-claude-3-7-sonnet"
lm = dspy.LM(LLM_ENDPOINT_NAME, cache=False)

###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################
tools = []

# You can use UDFs in Unity Catalog as agent tools
# Below, we add the `system.ai.python_exec` UDF, which provides
# a python code interpreter tool to our agent
# You can also add local DSPy tools: https://dspy.ai/learn/programming/tools/
uc_client = get_uc_function_client()


def execute_python_code(code):
    """Execute the python code"""
    outputs = uc_client.execute_function(function_name="system.ai.python_exec", parameters={"code": f"{code}"})
    if getattr(outputs, "error", None):
        tool_result = f"Encountering error {outputs.error}"
    else:
        tool_result = outputs.value
    return tool_result

# Convert the UDF to a `dspy.Tool`.
tools.append(dspy.Tool(execute_python_code))


# Create custom tool calling message for streaming purposes.
# Please refer to the [DSPy streaming guide](https://dspy.ai/tutorials/streaming/) for more details.
class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
    def tool_start_status_message(self, instance, inputs):
        tool_calling_info = {
            "tool_name": instance.name,
            "tool_args": inputs["kwargs"],
        }
        return json.dumps(tool_calling_info)

    def tool_end_status_message(self, outputs):
        tool_result = {"tool_result": outputs.value}
        return json.dumps(tool_result)


############################################
# Define the ResponsesAgent
############################################
class DSPyResponsesAgent(ResponsesAgent):
    def __init__(self, agent: dspy.Module, lm: dspy.LM):
        self.agent = agent
        self.lm = lm
        # Convert the agent to be streaming-compatible.
        self._streamified_agent = dspy.streamify(
            agent,
            status_message_provider=MyStatusMessageProvider(),
            stream_listeners=[
                dspy.streaming.StreamListener(signature_field_name="next_thought", allow_reuse=True),
                dspy.streaming.StreamListener(signature_field_name="answer"),
                dspy.streaming.StreamListener(signature_field_name="reasoning"),
            ],
            async_streaming=False,
        )
        # Agent internal states.
        self._concated_stream_chunks = [[]]
        self._last_tool_call_id = None

    def _dspy_stream_chunk_to_responses(self, chunk) -> dict[str, Any]:
        "Convert from DSPy streaming chunks to Responses output item dictionaries"

        if isinstance(chunk, dspy.streaming.StatusMessage):
            # Extract tool calling information to form the clean streaming chunks.
            message_dict = json.loads(chunk.message)
            if "tool_name" in message_dict:
                # Set a new tool call ID when detecting a new tool call.
                self._last_tool_call_id = str(uuid4())
                return self.create_function_call_item(
                    id=str(uuid4()),
                    call_id=self._last_tool_call_id,
                    name=message_dict["tool_name"],
                    arguments=json.dumps(message_dict["tool_args"]),
                )
            elif "tool_result" in message_dict:
                # `call_id` in the result chunk has to match the `call_id` from tool invocation chunk.
                call_id = self._last_tool_call_id
                self._last_tool_call_id = None
                return self.create_function_call_output_item(
                    call_id=call_id,
                    output=message_dict["tool_result"],
                )
        elif isinstance(chunk, dspy.streaming.StreamResponse):
            stream_chunk = self.create_text_delta(
                delta=chunk.chunk,
                item_id=str(
                    uuid3(
                        NAMESPACE_DNS,
                        f"{chunk.predict_name}.{chunk.signature_field_name}.stream_count{len(self._concated_stream_chunks)}",
                    )
                ),  # Generate a deterministic ID because streaming chunks from the same LM call should be grouped together.
            )
            self._concated_stream_chunks[-1].append(chunk.chunk)
            return stream_chunk
        

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        outputs = [event.item for event in self.predict_stream(request) if event.type == "response.output_item.done"]
        return ResponsesAgentResponse(output=[outputs[-1]])

    def predict_stream(
        self,
        request: ResponsesAgentRequest,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        last_message = request.input[-1]
        if last_message.role != "user":
            raise ValueError("No user question detected!")
        # This is a single-turn agent, we only use the last message which is the latest user message.
        question = request.input[-1].content

        with dspy.context(lm=self.lm):
            output = self._streamified_agent(question=question)
            for chunk in output:
                converted_chunk = self._dspy_stream_chunk_to_responses(chunk)
                if isinstance(chunk, dspy.streaming.StatusMessage):
                    yield ResponsesAgentStreamEvent(type="response.output_item.done", item=converted_chunk)
                elif isinstance(chunk, dspy.streaming.StreamResponse):
                    yield ResponsesAgentStreamEvent(**converted_chunk)
                    if chunk.is_last_chunk:
                        # The output field is finished, we yield the concatenated message with the same id.
                        text = "".join(self._concated_stream_chunks[-1])
                        self._concated_stream_chunks.append([])
                        yield ResponsesAgentStreamEvent(
                            type="response.output_item.done",
                            item=self.create_text_output_item(
                                text=text,
                                id=converted_chunk["item_id"],
                            ),
                        )

# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()
mlflow.dspy.autolog()
# For more context about DSPy signature, please read https://dspy.ai/learn/programming/signatures/.
agent = dspy.ReAct("question->answer", tools=tools, max_iters=5)
AGENT = DSPyResponsesAgent(agent, lm)
mlflow.models.set_model(AGENT)  

## Test the agent

Interact with the agent to test its output and tool-calling abilities. Since this notebook called `mlflow.dspy.autolog()`, you can view the trace for each step the agent takes.

Replace this placeholder input with an appropriate domain-specific example for your agent.

In [0]:
dbutils.library.restartPython()

In [0]:
from agent import AGENT

result = AGENT.predict({"input": [{"role": "user", "content": "What is 6*7 in Python?"}]})
print(result.model_dump(exclude_none=True))

In [0]:
for chunk in AGENT.predict_stream({"input": [{"role": "user", "content": "What is 6*7 in Python?"}]}):
    print(chunk.model_dump(exclude_none=True))

## Log the agent as an MLflow model

Log the agent as code from the `agent.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code).

### Enable automatic authentication for Databricks resources
For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling `mlflow.pyfunc.log_model().`

  - **TODO**: If your Unity Catalog tool queries a [vector search index](docs link) or leverages [external functions](docs link), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs ([AWS](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/log-agent#resources)).



In [0]:
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
import mlflow
from mlflow.models.resources import DatabricksFunction
from pkg_resources import get_distribution

resources = [DatabricksFunction(function_name="system.ai.python_exec")]

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="agent",
        python_model="agent.py",
        pip_requirements=[
            "unitycatalog-ai",
            f"dspy=={get_distribution('dspy').version}",
            f"databricks-connect=={get_distribution('databricks-connect').version}",
        ],
        resources=resources,
    )

## Evaluate the agent with Agent Evaluation

Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics.
See Databricks documentation ([AWS]((https://docs.databricks.com/aws/generative-ai/agent-evaluation) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-evaluation/)).


To evaluate your tool calls, add custom metrics. See Databricks documentation ([AWS](https://docs.databricks.com/en/generative-ai/agent-evaluation/custom-metrics.html#evaluating-tool-calls) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/custom-metrics#evaluating-tool-calls)).

In [0]:
import mlflow
from mlflow.genai.scorers import RelevanceToQuery, RetrievalGroundedness, RetrievalRelevance, Safety

eval_dataset = [
    {
        "inputs": {"input": [{"role": "user", "content": "Calculate the 15th Fibonacci number"}]},
        "expected_response": "The 15th Fibonacci number is 610.",
    }
]

eval_results = mlflow.genai.evaluate(
    data=eval_dataset,
    predict_fn=lambda input: AGENT.predict({"input": input}),
    scorers=[RelevanceToQuery(), Safety()],  # add more scorers here if they're applicable
)

# Review the evaluation results in the MLfLow UI (see console output)

## Pre-deployment agent validation
Before registering and deploying the agent, perform pre-deployment checks using the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See Databricks documentation ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/model-serving-debug.html#validate-inputs) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/model-serving-debug#before-model-deployment-validation-checks)).

In [0]:
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"input": [{"role": "user", "content": "What is 6*7 in Python?!"}]},
    env_manager="uv",
)

## Register the model to Unity Catalog

Before you deploy the agent, you must register the agent to Unity Catalog.

- **TODO** Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog.

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = ""
schema = ""
model_name = ""
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME)

## Deploy the agent

In [0]:
from databricks import agents

agents.deploy(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    tags={"endpointSource": "docs"},
)

## Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See docs ([AWS](https://docs.databricks.com/en/generative-ai/deploy-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/deploy-agent)) for details