# Mosaic AI Agent Framework: Author and deploy a multi-agent system with Genie and Serving Endpoints

This notebook demonstrates how to build a multi-agent system using Mosaic AI Agent Framework and [LangGraph](https://blog.langchain.dev/langgraph-multi-agent-workflows/), where [Genie](https://www.databricks.com/product/ai-bi/genie) is one of the agents.
In this notebook, you:
1. Author a multi-agent system using LangGraph.
1. Wrap the LangGraph agent with MLflow `ResponsesAgent` to ensure compatibility with Databricks features.
1. Manually test the multi-agent system's output.
1. Log and deploy the multi-agent system.

This example is based on [LangGraph documentation - Multi-agent supervisor example](https://github.com/langchain-ai/langgraph/blob/main/docs/docs/tutorials/multi_agent/agent_supervisor.md)

## Why use a Genie agent?

Multi-agent systems consist of multiple AI agents working together, each with specialized capabilities. As one of those agents, Genie allows users to interact with their structured data using natural language. Unlike SQL functions which can only run pre-defined queries, Genie has the flexibility to create novel queries to answer user questions.

## Prerequisites

- Address all `TODO`s in this notebook.
- Create a Genie Space, see Databricks documentation ([AWS](https://docs.databricks.com/aws/genie/set-up) | [Azure](https://learn.microsoft.com/azure/databricks/genie/set-up)).

In [0]:
%pip install -U -qqq mlflow-skinny[databricks] databricks-langchain databricks-agents uv langgraph-supervisor==0.0.31 langchain>=1.0.2 deepagents
# dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
import langchain
print(langchain.__version__)

1.2.7



## Define the multi-agent system

Create a multi-agent system in LangGraph using a supervisor agent node with one or more of the following subagents:
- **GenieAgent**: A LangChain runnable that allows you to easily interact with your Genie Space to query structured data.
- **Custom serving agent**: An agent that is already hosted as an existing endpoint on Databricks.
- **In-code tool-calling agent**: An agent that calls Unity Catalog function tools, defined within this notebook. This example uses `system.ai.python_exec`, but for examples of other tools you can add to your agents, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-tool) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/agent-tool)).

The supervisor agent is responsible for creating and routing tool calls to each of your subagents, passing only the context necessary. You can modify this behavior and pass along the entire message history if desired. See the [LangGraph docs](https://langchain-ai.github.io/langgraph/reference/supervisor/) for more information.

#### Write agent code to file

Define the agent code in a single cell below. This lets you write the agent code to a local Python file, using the `%%writefile` magic command, for subsequent logging and deployment.


In [0]:
%%writefile agent_dev.py
import json
import re
from datetime import datetime
from typing import Any, Generator
from uuid import uuid4

import mlflow
from databricks_langchain import (
    ChatDatabricks,
    DatabricksFunctionClient,
    UCFunctionToolkit,
    set_uc_function_client,
)
from databricks_langchain.genie import GenieAgent
from langchain.agents import create_agent
from langchain.agents.middleware import TodoListMiddleware
from langchain.agents.middleware.summarization import SummarizationMiddleware
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import create_react_agent
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from pydantic import BaseModel

from deepagents.middleware.filesystem import FilesystemMiddleware, FilesystemState
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware

client = DatabricksFunctionClient()
set_uc_function_client(client)


########################################
# Formatting Helper Functions
########################################

def remove_file_references(content: str) -> str:
    """Remove references to virtual file operations from agent responses.

    Strips mentions of file operations (write_file, read_file, ls) that are
    implementation details not relevant to end users.
    """
    if not content:
        return content

    patterns = [
        r"I(?:'ve| have) saved.*?to [\w_\-\./]+\.\w+",
        r"I(?:'ll| will) save.*?to [\w_\-\./]+\.\w+",
        r"Let me save.*?to [\w_\-\./]+\.\w+",
        r"I(?:'ve| have) written.*?to [\w_\-\./]+\.\w+",
        r"Saved to [\w_\-\./]+\.\w+",
        r"Writing to [\w_\-\./]+\.\w+",
        r"I(?:'ll| will) write.*?to [\w_\-\./]+\.\w+",
        r"Saving.*?to [\w_\-\./]+\.\w+",
        r"Reading from [\w_\-\./]+\.\w+",
        r"I(?:'ve| have) read.*?from [\w_\-\./]+\.\w+",
        r"Let me read.*?from [\w_\-\./]+\.\w+",
        r"The file [\w_\-\./]+\.\w+ contains",
        r"From [\w_\-\./]+\.\w+:",
        r"I(?:'ve| have) saved.*?to /large_tool_results/[\w_\-\./]+",
        r"The (?:full )?results? (?:were|was|has been) (?:automatically )?saved to /large_tool_results/[\w_\-\./]+",
    ]

    cleaned = content
    for pattern in patterns:
        cleaned = re.sub(pattern, "", cleaned, flags=re.IGNORECASE)

    cleaned = re.sub(r'\n\s*\n\s*\n', '\n\n', cleaned)
    cleaned = cleaned.strip()

    return cleaned


def apply_format_template(content: str, state: dict) -> str:
    """Apply consistent formatting to final response."""
    return content.strip()


########################################
# Configuration Models
########################################

GENIE = "genie"


class ServedSubAgent(BaseModel):
    endpoint_name: str
    name: str
    task: str
    description: str


class Genie(BaseModel):
    space_id: str
    name: str
    task: str = GENIE
    description: str


class InCodeSubAgent(BaseModel):
    tools: list[str]
    name: str
    description: str


def stringify_content(state):
    msgs = state["messages"]
    if isinstance(msgs[-1].content, list):
        msgs[-1].content = json.dumps(msgs[-1].content, indent=4)
    return {"messages": msgs}


########################################
# Build Domain Tools (Genie + UC Functions)
########################################

def build_domain_tools(
    externally_served_agents: list[ServedSubAgent | Genie],
    in_code_agents: list[InCodeSubAgent],
) -> list:
    """Build the domain-specific tools (Genie, UC functions, served agents).

    These are the tools the agent uses for actual work. Planning tools
    (write_todos, read_todos) and filesystem tools (ls, read_file, write_file,
    edit_file, glob, grep) are provided by middleware — not assembled here.

    Returns:
        List of LangChain tool objects
    """
    domain_tools = []

    # UC function tools
    for agent_config in in_code_agents:
        uc_toolkit = UCFunctionToolkit(function_names=agent_config.tools)
        domain_tools.extend(uc_toolkit.tools)

    # Genie and served endpoint tools
    for agent_config in externally_served_agents:
        if isinstance(agent_config, Genie):
            genie_agent = GenieAgent(
                genie_space_id=agent_config.space_id,
                genie_agent_name=agent_config.name,
                description=agent_config.description,
            )

            @tool
            def genie_query_tool(question: str, _agent=genie_agent) -> str:
                """Query the Genie Space for data about Databricks consumption.

                Use this tool to get data from tables including:
                - Customer accounts and consumption metrics
                - DBU usage and costs
                - Use case details, timelines, and status
                - Databricks SKU information

                Genie can execute SQL queries based on natural language questions.
                You can call this tool multiple times with different questions.

                IMPORTANT: To prevent context overflow, use aggregations and limits:
                - Request aggregated/summarized data when possible (GROUP BY, SUM, AVG)
                - Add "LIMIT 50" or similar to avoid retrieving thousands of rows

                Args:
                    question: Natural language question about the data (include LIMIT clauses)

                Returns:
                    Data results and insights from Genie
                """
                result = _agent.invoke({"messages": [{"role": "user", "content": question}]})
                return result["messages"][-1].content

            genie_query_tool.name = agent_config.name.replace(" ", "_").replace("-", "_")
            domain_tools.append(genie_query_tool)

        else:
            model = ChatDatabricks(
                endpoint=agent_config.endpoint_name,
                use_responses_api="responses" in agent_config.task
            )
            model._stream = lambda x: model._stream(**x, stream=False)
            served_agent = create_react_agent(
                model,
                tools=[],
                name=agent_config.name,
                post_model_hook=stringify_content,
            )

            @tool
            def served_agent_tool(task: str, _agent=served_agent, _name=agent_config.name) -> str:
                f"""Delegate task to {_name}.

                {agent_config.description}

                Args:
                    task: The task or question to send to this agent

                Returns:
                    Response from the agent
                """
                result = _agent.invoke({"messages": [{"role": "user", "content": task}]})
                return result["messages"][-1].content

            served_agent_tool.name = agent_config.name.replace(" ", "_").replace("-", "_")
            domain_tools.append(served_agent_tool)

    return domain_tools


########################################
# System Prompt (domain-only, no tool instructions)
########################################

def build_system_prompt() -> str:
    """Build the system prompt with domain-specific instructions only.

    Tool usage instructions for planning (write_todos, read_todos) and
    filesystem (ls, read_file, write_file, etc.) are injected by their
    respective middleware — they should NOT be duplicated here.
    """
    current_date = datetime.now().strftime("%B %d, %Y")

    return f"""You are an AI assistant for analyzing Databricks consumption at various customers and creating reports.

CURRENT DATE: {current_date}
NOTE: Databricks fiscal year starts February 1st. Use fiscal quarters/years for all time-based queries (FY Q1=Feb-Apr, Q2=May-Jul, Q3=Aug-Oct, Q4=Nov-Jan) where needed.

KEY ANALYSIS GUIDELINES:
- Focus on DOLLARS as the primary metric (only analyze DBUs if explicitly requested)
- Use COMPLETED time periods by default (completed months, quarters, weeks) - exclude current/ongoing periods unless specifically asked to include them
- When working with Account Executives, call get_accounts_by_account_executive FIRST to identify which accounts you need to analyze

PLANNING AND EXECUTION:
For COMPLEX tasks (reports, multi-step analysis, questions requiring multiple data sources):
1. Use write_todos at the start to create a comprehensive plan with ALL anticipated steps
2. If querying by AE name, include a step to call get_accounts_by_account_executive to identify accounts
3. Execute each step by calling the appropriate tools
4. After completing each step, call write_todos to update the status of completed items
5. Use read_todos periodically to stay focused and see what's left
6. Synthesize comprehensive answer when all steps are complete

For SIMPLE tasks (single data query, straightforward question):
1. Call the appropriate tool directly
2. Return the result

QUERY OPTIMIZATION:
- When querying Genie, use aggregations (SUM, AVG, COUNT, GROUP BY) instead of raw data when possible
- Add LIMIT clauses (e.g., "LIMIT 50") to Genie queries to avoid retrieving thousands of rows

IMPORTANT GUIDELINES:
- You can call any tool multiple times with different inputs
- Break down complex questions into specific, answerable sub-questions
- For data queries, use the Genie tool which can execute SQL
- Always synthesize results into a clear, comprehensive answer
- Track your progress with TODOs to avoid losing focus
- DO NOT continuously revise your TODO plan - create it once, then execute
- When you have answered the question, STOP - do not look for additional work
- When using a UC tool, return the complete table output as returned by the tool itself."""


########################################
# Create Agent with Middleware
########################################

def create_langgraph_supervisor(
    llm: BaseChatModel,
    externally_served_agents: list[ServedSubAgent | Genie] = [],
    in_code_agents: list[InCodeSubAgent] = [],
) -> CompiledStateGraph:
    """Create a planning supervisor agent using langchain create_agent with middleware.

    Middleware provides (automatically, no prompt engineering needed):
    - TodoListMiddleware: write_todos, read_todos tools + planning system prompt
    - FilesystemMiddleware: ls, read_file, write_file, edit_file, glob, grep tools
      + auto-eviction of large tool results to /large_tool_results/
    - SummarizationMiddleware: token-aware context management, summarizes
      conversation when approaching context budget
    - PatchToolCallsMiddleware: handles dangling tool calls in message history

    Args:
        llm: Foundation model (ChatDatabricks instance)
        externally_served_agents: Genie spaces and served endpoints
        in_code_agents: UC function agents

    Returns:
        Compiled agent graph
    """
    domain_tools = build_domain_tools(externally_served_agents, in_code_agents)
    system_prompt = build_system_prompt()

    middleware = [
        TodoListMiddleware(),
        FilesystemMiddleware(
            # Auto-evict tool results larger than ~60K chars (15K tokens * 4 chars/token)
            # to /large_tool_results/ in the virtual filesystem.
            # This prevents Genie results from consuming the 128K context window.
            tool_token_limit_before_evict=15000,
        ),
        SummarizationMiddleware(
            model=llm,
            # Summarize conversation when it exceeds ~90K tokens (~70% of 128K).
            # Keeps 6 most recent messages after summarization.
            max_tokens_before_summary=90000,
            messages_to_keep=6,
        ),
        PatchToolCallsMiddleware(),
    ]

    agent = create_agent(
        model=llm,
        system_prompt=system_prompt,
        tools=domain_tools,
        middleware=middleware,
    )

    return agent


def create_supervisor_with_formatter(
    llm: BaseChatModel,
    externally_served_agents: list[ServedSubAgent | Genie] = [],
    in_code_agents: list[InCodeSubAgent] = [],
) -> CompiledStateGraph:
    """Create supervisor agent with post-processing formatter node.

    Wraps the base supervisor with a formatter that:
    1. Removes file operation references from responses
    2. Applies consistent formatting templates
    """
    supervisor = create_langgraph_supervisor(llm, externally_served_agents, in_code_agents)

    def format_final_response(state: dict) -> dict:
        """Reformat final AI message to remove file references and apply templates."""
        messages = state["messages"]

        last_ai_message = None
        for msg in reversed(messages):
            if msg.type == "ai":
                last_ai_message = msg
                break

        if last_ai_message and last_ai_message.content:
            content = last_ai_message.content
            content = remove_file_references(content)
            content = apply_format_template(content, state)
            last_ai_message.content = content

        return {"messages": messages}

    # Use FilesystemState so the outer graph propagates the 'files' channel
    # from the supervisor subgraph (needed for custom_outputs in predict()).
    workflow = StateGraph(FilesystemState)
    workflow.add_node("supervisor", supervisor)
    workflow.add_node("formatter", format_final_response)

    workflow.set_entry_point("supervisor")
    workflow.add_edge("supervisor", "formatter")
    workflow.add_edge("formatter", END)

    return workflow.compile()


##########################################
# Wrap as MLflow ResponsesAgent
##########################################

class LazyLangGraphResponsesAgent(ResponsesAgent):
    """MLflow ResponsesAgent wrapper with lazy initialization.

    Message sanitization, trimming, and truncation are handled by middleware:
    - PatchToolCallsMiddleware: fixes dangling tool calls
    - SummarizationMiddleware: token-aware context management
    - FilesystemMiddleware: auto-evicts large tool results
    """

    def __init__(self):
        self._agent = None
        self._final_state = None

    @property
    def agent(self):
        if self._agent is None:
            self._agent = get_supervisor()
        return self._agent

    def _langchain_to_responses(self, message: BaseMessage) -> list[dict[str, Any]]:
        """Convert LangChain message to Responses API output items."""
        message = message.model_dump()
        role = message["type"]
        output = []
        if role == "ai":
            if message.get("content"):
                output.append(
                    self.create_text_output_item(
                        text=message["content"],
                        id=message.get("id") or str(uuid4()),
                    )
                )
            if tool_calls := message.get("tool_calls"):
                output.extend(
                    [
                        self.create_function_call_item(
                            id=message.get("id") or str(uuid4()),
                            call_id=tool_call["id"],
                            name=tool_call["name"],
                            arguments=json.dumps(tool_call["args"]),
                        )
                        for tool_call in tool_calls
                    ]
                )
        elif role == "tool":
            output.append(
                self.create_function_call_output_item(
                    call_id=message["tool_call_id"],
                    output=message["content"],
                )
            )
        return output

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]

        files = {}
        if self._final_state:
            files = self._final_state.get("files", {})

        return ResponsesAgentResponse(
            output=outputs,
            custom_outputs={
                **(request.custom_inputs or {}),
                "files": files,
            }
        )

    def predict_stream(
        self,
        request: ResponsesAgentRequest,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        cc_msgs = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input])

        # Middleware handles sanitization, trimming, and truncation:
        # - PatchToolCallsMiddleware: fixes dangling tool calls
        # - SummarizationMiddleware: token-aware context management
        # - FilesystemMiddleware: auto-evicts large tool results

        first_name = True
        seen_ids = set()

        initial_state = {"messages": cc_msgs}
        config = {"recursion_limit": 100}

        for event_name, events in self.agent.stream(initial_state, config=config, stream_mode=["updates"]):
            if event_name == "updates":
                if not first_name:
                    node_name = tuple(events.keys())[0]
                    if node_name != "formatter":
                        yield ResponsesAgentStreamEvent(
                            type="response.output_item.done",
                            item=self.create_text_output_item(
                                text=f"<name>{node_name}</name>",
                                id=str(uuid4()),
                            ),
                        )
                for node_data in events.values():
                    self._final_state = node_data

                    for msg in node_data["messages"]:
                        if msg.id not in seen_ids:
                            seen_ids.add(msg.id)
                            for item in self._langchain_to_responses(msg):
                                yield ResponsesAgentStreamEvent(
                                    type="response.output_item.done", item=item
                                )
            first_name = False


#######################################################
# Configure the Foundation Model and Serving Sub-Agents
#######################################################

LLM_ENDPOINT_NAME = "databricks-gpt-oss-120b"
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)

EXTERNALLY_SERVED_AGENTS = [
    Genie(
        space_id="01f0fd193cf412f7a40f97d24851c0d1",
        name="logfood-genie",
        description="This Genie agent can answer questions based on a database containing tables related to Databricks consumption at different customers, including accounts, dollars, dbus, the Databricks SKU along with Use Case details such as target live dates, use case descriptions and updates. Use Genie to fetch and analyze data from these tables by specifying the relevant columns and filters. Genie can execute SQL queries to provide precise data insights based on your questions.",
    ),
]

IN_CODE_AGENTS = [
    InCodeSubAgent(
        tools=["ankit_yadav.demo.get_accounts_by_account_executive"],
        name="account lookup",
        description="Returns selected account details managed by a specific Account Executive. Use this tool FIRST when working with AE names to identify which accounts you need to analyze before querying consumption data.",
    ),
    InCodeSubAgent(
        tools=["ankit_yadav.demo.get_account_summaries"],
        name="account summary",
        description="Generates AI-powered account-level summaries for all accounts managed by a specific Account Executive. Analyzes use case patterns, pipeline health, opportunities, and risks across all use cases per account. Use for comprehensive account overviews and strategic insights.",
    ),
    InCodeSubAgent(
        tools=["ankit_yadav.demo.get_live_date_follow_up_messages"],
        name="follow-up messages",
        description="Generates AI-powered follow-up messages for use cases targeting go-live in current or next month. Pass specific AE name, multiple names separated by comma, or NULL/'ALL' for all AEs. Creates concise Slack/Teams-ready messages for AE and SA based on next steps and use case details. Returns account, use case info, target live date, days until live, current stage, monthly dollars, and tailored follow-up message.",
    )
]

#################################################
# Build TOOLS list for resource registration
#################################################

# UC function tools (needed by resource setup cell for DatabricksFunction registration)
TOOLS = []
for _agent_cfg in IN_CODE_AGENTS:
    _uc_toolkit = UCFunctionToolkit(function_names=_agent_cfg.tools)
    TOOLS.extend(_uc_toolkit.tools)

#################################################
# Create supervisor and set up MLflow for tracing
#################################################

_supervisor = None

def get_supervisor():
    """Lazily create supervisor on first use to avoid model loading timeout."""
    global _supervisor
    if _supervisor is None:
        _supervisor = create_supervisor_with_formatter(llm, EXTERNALLY_SERVED_AGENTS, IN_CODE_AGENTS)
    return _supervisor


mlflow.langchain.autolog()
AGENT = LazyLangGraphResponsesAgent()
mlflow.models.set_model(AGENT)


Writing agent_dev.py


## Test the agent

Interact with the agent to test its output. Since this notebook called `mlflow.langchain.autolog()` you can view the trace for each step the agent takes.

**Important:** LangGraph internally uses exceptions (something like `Command` or `ParentCommand`) to switch between nodes. These particular exceptions may appear in your MLflow traces as Events, but this behavior is expected and should not be a cause for concern.

In [0]:
# dbutils.library.restartPython()

In [0]:
import langchain
print(langchain.__version__)

1.2.7


In [0]:
from agent_dev import AGENT

# TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
input_example = {"input": [{"role": "user", "content": "What tools are available to you"}]}

AGENT.predict(input_example)

  return orig_warn(*args, **kwargs)


ResponsesAgentResponse(tool_choice=None, truncation=None, id=None, created_at=None, error=None, incomplete_details=None, instructions=None, metadata=None, model=None, object='response', output=[OutputItem(type='message', id='lc_run--019c0a23-6705-71f1-bb01-45ab0500f69f-0', content=[{'text': 'Here are the tools I can use to help you:\n\n**Data & Account‑related tools**\n- **ankit_yadav__demo__get_accounts_by_account_executive** – Retrieve a list of accounts managed by a specific Account Executive.  \n- **ankit_yadav__demo__get_account_summaries** – Generate AI‑powered summaries for all accounts of a given AE (use‑case patterns, pipeline health, opportunities, risks, etc.).  \n- **ankit_yadav__demo__get_live_date_follow_up_messages** – Create concise follow‑up messages for use cases that are slated to go live this month or next.  \n- **logfood_genie** – Query the Genie data warehouse for Databricks consumption, costs, DBU usage, SKU info, timelines, and other analytics via natural‑langua

Trace(trace_id=tr-746b7de531add45aa16b06498288c33d)

In [0]:
# from agent_dev import AGENT

# # TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
# input_example = {"input": [{"role": "user", "content": "What is the trend for JOsh Hermans accounts over the last three quarters? Give me a quarter over quarter % change for his accounts"}]}

# AGENT.predict(input_example)



In [0]:
for event in AGENT.predict_stream(input_example):
  print(event.model_dump(exclude_none=True))

{'type': 'response.output_item.done', 'item': {'id': 'lc_run--019c0a23-a324-7a93-9446-d30446a200f0-0', 'content': [{'text': 'Here’s a quick rundown of the tools I can use while working with you:\n\n### Project‑management / planning\n- **`write_todos`** – Create, update, and track a structured todo list for multi‑step tasks.\n\n### Filesystem utilities\n- **`ls`** – List the contents of a directory.  \n- **`read_file`** – Read the contents of a file (with optional pagination).  \n- **`write_file`** – Create a new file with supplied content.  \n- **`edit_file`** – Perform exact‑string replacements in an existing file.  \n- **`glob`** – Find files that match a glob pattern (e.g., `**/*.py`).  \n- **`grep`** – Search for literal text across files, returning matching file names or line content.\n\n### Databricks‑specific data APIs\n- **`ankit_yadav__demo__get_accounts_by_account_executive`** – Retrieve a list of customer accounts managed by a given Account Executive.  \n- **`ankit_yadav__de

Trace(trace_id=tr-974e3a20f3ab439d75c25e90fb1458d0)

## Log the agent as an MLflow model

Log the agent as code from the `agent.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code).

### Enable automatic authentication for Databricks resources
For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling `mlflow.pyfunc.log_model().`
  - **TODO**: If your Unity Catalog tool queries a [vector search index](docs link) or leverages [external functions](docs link), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-authentication#supported-resources-for-automatic-authentication-passthrough) | [Azure](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-authentication#supported-resources-for-automatic-authentication-passthrough)).

  - **TODO**: Add the SQL Warehouse or tables powering your Genie space to enable passthrough authentication. ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-authentication#supported-resources-for-automatic-authentication-passthrough) | [Azure](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-authentication#supported-resources-for-automatic-authentication-passthrough)). If your genie space uses "embedded credentials" then you do not have to add this.

In [0]:
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
import mlflow
from agent_dev import EXTERNALLY_SERVED_AGENTS, LLM_ENDPOINT_NAME, TOOLS, Genie
from databricks_langchain import UnityCatalogTool, VectorSearchRetrieverTool
from mlflow.models.resources import (
    DatabricksFunction,
    DatabricksGenieSpace,
    DatabricksServingEndpoint,
    DatabricksSQLWarehouse,
    DatabricksTable
)
from pkg_resources import get_distribution

# TODO: Manually include underlying resources if needed. See the TODO in the markdown above for more information.
resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]
# TODO: Add SQL Warehouses and delta tables powering the Genie Space
resources.append(DatabricksSQLWarehouse(warehouse_id="9f759a35683b2137"))
resources.append(DatabricksTable(table_name="ankit_yadav.demo.dim_accounts"))
resources.append(DatabricksTable(table_name="ankit_yadav.demo.fact_consumption_weekly"))
resources.append(DatabricksTable(table_name="ankit_yadav.demo.fact_consumption_daily"))
resources.append(DatabricksTable(table_name="ankit_yadav.demo.fact_consumption_monthly"))
resources.append(DatabricksTable(table_name="ankit_yadav.demo.dim_use_cases"))



# Add tools from Unity Catalog
for tool in TOOLS:
    if isinstance(tool, VectorSearchRetrieverTool):
        resources.extend(tool.resources)
    elif isinstance(tool, UnityCatalogTool):
        resources.append(DatabricksFunction(function_name=tool.uc_function_name))

# Add serving endpoints and Genie Spaces
for agent in EXTERNALLY_SERVED_AGENTS:
    if isinstance(agent, Genie):
        resources.append(DatabricksGenieSpace(genie_space_id=agent.space_id))
    else:
        resources.append(DatabricksServingEndpoint(endpoint_name=agent.endpoint_name))

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="agent_dev",
        python_model="agent_dev.py",
        resources=resources,
        pip_requirements=[
    f"databricks-connect=={get_distribution('databricks-connect').version}",
    f"mlflow=={get_distribution('mlflow').version}",
    f"databricks-langchain=={get_distribution('databricks-langchain').version}",
    f"langgraph=={get_distribution('langgraph').version}",
    f"langgraph-supervisor=={get_distribution('langgraph-supervisor').version}",
    f"langchain=={get_distribution('langchain').version}",
    f"deepagents=={get_distribution('deepagents').version}",
],

    
)

🔗 View Logged Model at: https://fe-sandbox-serverless-9thezy.cloud.databricks.com/ml/experiments/1642964254361159/models/m-e1dd47eba9cf42cda4db417556abc942?o=7474651123329331
2026/01/29 14:24:12 INFO mlflow.pyfunc: Predicting on input example to validate output


## Pre-deployment agent validation
Before registering and deploying the agent, perform pre-deployment checks using the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See Databricks documentation ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/model-serving-debug.html#validate-inputs) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/model-serving-debug#before-model-deployment-validation-checks)).

In [0]:
import mlflow
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent_dev",
    input_data=input_example,
    env_manager="uv",
)

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

2026/01/29 14:24:25 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

2026/01/29 14:24:28 INFO mlflow.utils.virtualenv: Creating a new environment in /tmp/virtualenv_envs/mlflow-04a354c89f2e7c0da1c602cd033dbef842728f7f with python version 3.12.3 using uv
Using CPython 3.12.3 interpreter at: [36m/usr/bin/python3.12[39m
Creating virtual environment at: [36m/tmp/virtualenv_envs/mlflow-04a354c89f2e7c0da1c602cd033dbef842728f7f[39m
Activate with: [32msource /tmp/virtualenv_envs/mlflow-04a354c89f2e7c0da1c602cd033dbef842728f7f/bin/activate[39m
2026/01/29 14:24:29 INFO mlflow.utils.virtualenv: Installing dependencies
[2mUsing Python 3.12.3 environment at: /tmp/virtualenv_envs/mlflow-04a354c89f2e7c0da1c602cd033dbef842728f7f[0m
[2mResolved [1m3 packages[0m [2min 37ms[0m[0m
[36m[1mDownloading[0m[39m pip [2m(1.8MiB)[0m
[36m[1mDownloading[0m[39m setuptools [2m(1.2MiB)[0m
 [36m[1mDownloaded[0m[39m pip
 [36m[1mDownloaded[0m[39m setuptools
[2mPrepared [1m3 packages[0m [2min 169ms[0m[0m
[2mInstalled [1m3 packages[0m [2min 21ms[

{"object": "response", "output": [{"type": "message", "id": "lc_run--019c0a24-8019-7090-abbd-affdea1ff187-0", "content": [{"text": "Here\u2019s a quick overview of the tools I can use while helping you:\n\n| Category | Tool | What it does |\n|----------|------|--------------|\n| **Task Management** | `write_todos` | Create, update, and track a structured to\u2011do list for multi\u2011step projects. |\n| **Filesystem** | `ls` | List the contents of a directory. |\n| | `read_file` | Read a file (with optional pagination). |\n| | `write_file` | Create a brand\u2011new file with supplied content. |\n| | `edit_file` | Perform an exact\u2011string replacement inside an existing file. |\n| | `glob` | Find files that match a glob pattern (e.g., `**/*.py`). |\n| | `grep` | Search for a literal text pattern across files (can return file names, matching lines, or counts). |\n| **Databricks Consumption Data** | `ankit_yadav__demo__get_accounts_by_account_executive` | Retrieve account details for 

2026/01/29 14:24:57 INFO mlflow.tracing.export.async_export_queue: Flushing the async trace logging queue before program exit. This may take a while...


## Register the model to Unity Catalog

Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog.

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = "ankit_yadav"
schema = "demo"
model_name = "logfood_agent_dev"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

Successfully registered model 'ankit_yadav.demo.logfood_agent_dev'.


Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/13 [00:00<?, ?it/s]

🔗 Created version '1' of model 'ankit_yadav.demo.logfood_agent_dev': https://fe-sandbox-serverless-9thezy.cloud.databricks.com/explore/data/models/ankit_yadav/demo/logfood_agent_dev/version/1?o=7474651123329331


## Deploy the agent

In [0]:
# from databricks import agents

# agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags={"endpointSource": "docs"})

from databricks import agents

deployment = agents.deploy(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    workload_type="CPU",            # compute type
    workload_size="Medium",         # maps to 8–16 provisioned concurrency
    tags={"endpointSource": "docs"}
)

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

For more information, see: https://docs.databricks.com/aws/en/generative-ai/agent-framework/feedback-model



    Deployment of ankit_yadav.demo.logfood_agent_dev version 1 initiated.  This can take up to 15 minutes and the Review App & Query Endpoint will not work until this deployment finishes.

    View status: https://fe-sandbox-serverless-9thezy.cloud.databricks.com/ml/endpoints/agents_ankit_yadav-demo-logfood_agent_dev/?o=7474651123329331
    Review App: https://fe-sandbox-serverless-9thezy.cloud.databricks.com/ml/review-v2/df920ad2af9d41cfb878d272eba71636/chat?o=7474651123329331

You can refer back to the links above from the endpoint detail page at https://fe-sandbox-serverless-9thezy.cloud.databricks.com/ml/endpoints/agents_ankit_yadav-demo-logfood_agent_dev/?o=7474651123329331.

To set up monitoring for your deployed agent, see:
https://docs.databricks.com/aws/en/mlflow3/genai/eval-monitor/production-monitoring


## Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See Databricks documentation ([AWS](https://docs.databricks.com/en/generative-ai/deploy-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/deploy-agent)).

# Testing

In [0]:
%skip
from agent_dev import AGENT

# TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
input_example = {"input": [{"role": "user", "content": "Which use cases in the Evaluating or Confirming stage are associated with accounts that have shown declining revenue in the last 3 months? For these accounts, what's their primary product mix, and what recommendations would you make? Do this analysis for Josh Herman accounts."}]}

AGENT.predict(input_example)

In [0]:
%skip
#Not Correct
from agent_dev import AGENT

# TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
input_example = {"input": [{"role": "user", "content": "Compare accounts that adopted AI products in the last 6 months vs those that haven't. What's the difference in total revenue growth, use case progression speed, and workspace activity? Which AE's accounts show the strongest AI adoption?"}]}

AGENT.predict(input_example)

In [0]:
%skip
#Write todos was not called, Incorrect "All accounts have 0 active use‑cases and 0 days since the last use‑case update, giving them a perfect “Update Score” of 100 but an “Active UC Score” of 0".
from agent_dev import AGENT

# TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
input_example = {"input": [{"role": "user", "content": "Create a health score for each Josh Herman accounts based on: (1) revenue trend last 90 days, (2) number of active use cases,(3) product diversity (# of different products used), and (4) days since last use case update. Rank accounts by health score and identify the top 5 at-risk accounts."}]}

AGENT.predict(input_example)

In [0]:
%skip
from agent_dev import AGENT

# TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
input_example = {"input": [{"role": "user", "content": "Analyze weekly consumption patterns over the last 12 weeks for Josh Herman accounts. Are there any day-of-week or week-of-month patterns? Which accounts show the most volatility, and what might explain it based on their use case stages?"}]}

AGENT.predict(input_example)

In [0]:
%skip
from agent_dev import AGENT

# TODO: Replace this placeholder `input_example` with a domain-specific prompt for your agent.
input_example = {"input": [{"role": "user", "content": "For accounts that eventually adopted AI products, what was their typical journey? What products did they start with? How long before AI adoption? What was their consumption pattern leading up to AI adoption? Use this to create a playbook for accelerating AI adoption in current SQL-only accounts."}]}

AGENT.predict(input_example)