
# Create, evaluate, and deploy an AI agent

This notebook demonstrates how to use Mosaic AI to evaluate and improve the quality, cost, and latency of a tool-calling agent. It also shows you how to deploy the resulting agent to a web-based chat UI.

Using Mosiac AI Agent Evaluation ([AWS](https://docs.databricks.com/en/generative-ai/agent-evaluation/index.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/)), Agent Framework ([AWS](https://docs.databricks.com/en/generative-ai/agent-framework/build-genai-apps.html) |[Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/build-genai-apps)), MLflow ([AWS](https://docs.databricks.com/en/generative-ai/agent-framework/log-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/log-agent)) and Model Serving ([AWS](https://docs.databricks.com/en/generative-ai/agent-framework/deploy-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/deploy-agent)), this notebook:
1. Generates synthetic evaluation data from a document corpus.
2. Creates a tool-calling agent with a retriever tool.
3. Evaluates the agent's quality, cost, and latency across several foundational models.
4. Deploys the agent to a web-based chat app.

## Requirements: 
* Use serverless compute or a cluster running Databricks Runtime 14.3 or above.
* Databricks Serverless and Unity Catalog enabled.
* CREATE MODEL access to a Unity Catalog schema.
* Permission to create Model Serving endpoints.

<img src="https://docs.databricks.com/_static/images/generative-ai/synth-evals/demo-overview-optimized.gif"/>

For videos that go deeper into the capabilities, see this [YouTube channel](https://www.youtube.com/@EricPeter-q6o).

## Want to use your own data?

Alternatively, if you already have a Databricks Vector Search index set up, you can use the version of this notebook designed to use your own data ([AWS](https://docs.databricks.com/generative-ai/tutorials/agent-framework-notebook.html) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/tutorials/agent-framework-notebook)).

## Setup

In [0]:
%pip install -U -qqqq databricks-agents mlflow databricks-sdk[openai] backoff

dbutils.library.restartPython()

In [0]:
%reload_ext autoreload
%autoreload 2


## Step 1. Generate synthetic evaluation data to measure quality

**Challenges addressed**
1. How to start quality evaluation with diverse, representative data without SMEs spending months labeling?

**What is happening?**
- We pass the documents to the Synthetic API along with a `num_evals` and prompt-like `agent_description` and `question_guidelines` to tailor the generated questions for our use case. This API uses a proprietary synthetic generation pipeline developed by Mosaic AI Research.
- The API produces `num_evals` questions, each coupled with the source document and a list of facts, generated based on the source document. Each fact must be present in the agent's response for it to be considered correct.

*Why does the the API generates a list of facts, rather than a fully written answer. This...*
- Makes SME review more efficient: by focusing on facts rather than a full response, they can review and edit more quickly.
- Improves the accuracy of our proprietary LLM judges.

Interested in have your SMEs review the data? Check out a [video demo of the Eval Set UI](https://youtu.be/avY9724q4e4?feature=shared&t=130).


### Load the docs corpus
First, load the documents (Databricks documentation) used by the agent, filtering for a subset of the documentation.

For your agent, replace this step to instead load your parsed documents.

In [0]:
import pandas as pd

databricks_docs_url = "https://raw.githubusercontent.com/databricks/genai-cookbook/refs/heads/main/quick_start_demo/chunked_databricks_docs_filtered.jsonl"
parsed_docs_df = pd.read_json(databricks_docs_url, lines=True)

display(parsed_docs_df)

### Call API to generate synthetic evaluation data

In [0]:
# Use the synthetic eval generation API to get some evals
from databricks.agents.evals import generate_evals_df

# "Ghost text" for agent description and question guidelines - feel free to modify as you see fit.
agent_description = f"""
The agent is a RAG chatbot that answers questions about Databricks. Questions unrelated to Databricks are irrelevant.
"""
question_guidelines = f"""
# User personas
- A developer who is new to the Databricks platform
- An experienced, highly technical Data Scientist or Data Engineer

# Example questions
- what API lets me parallelize operations over rows of a delta table?
- Which cluster settings will give me the best performance when using Spark?

# Additional Guidelines
- Questions should be succinct, and human-like
"""

num_evals = 25
evals = generate_evals_df(
    docs=parsed_docs_df[
        :500
    ],  # Pass your docs. They should be in a Pandas or Spark DataFrame with columns `content STRING` and `doc_uri STRING`.
    num_evals=num_evals,  # How many synthetic evaluations to generate
    agent_description=agent_description,
    question_guidelines=question_guidelines,
)
display(evals)


## Step 2. Write the agent's code

### Function-calling agent that uses a retriever tool

**Challenges addressed**
- How do I track different versions of my agent's code or configuration?
- How do I enable observability, monitoring, and debugging of my agent’s logic?

**What is happening?**

First, create a function-calling agent with access to a retriever tool using OpenAI SDK and Python code. To keep the demo simple, the retriever is a function that performs keyword lookup rather than a vector search index.

When creating your agent, you can either:
1. Generate template agent code from the AI Playground
2. Use a template from our Cookbook
3. Start from an example in popular frameworks such as LangGraph, AutoGen, LlamaIndex, and others.

**NOTE: It is not necessary to understand how this agent works to understand the rest of this demo notebook.**  

*A few things to note about the code:*
1. The code is written to `fc_agent.py` in order to use [MLflow Models from Code](https://www.mlflow.org/blog/models_from_code) for logging, enabling easy tracking of each iteration as you tune the agent for quality.
2. The code is parameterized with an MLflow Model Configuration ([AWS](https://docs.databricks.com/en/generative-ai/agent-framework/create-agent.html#use-parameters-to-configure-the-agent) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/create-agent#agent-parameters)), enabling easy tuning of these parameters for quality improvement.
3. The code is wrapped in an MLflow [ChatModel](https://mlflow.org/docs/latest/llms/chat-model-intro/index.html), making the agent's code deployment-ready so any iteration can be shared with stakeholders for testing.
4. The code implements MLflow Tracing ([AWS](https://docs.databricks.com/en/mlflow/mlflow-tracing.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/mlflow/mlflow-tracing)) for unified observability during development and production. The same trace defined here will be logged for every production request post-deployment. For agent authoring frameworks like LangChain and LlamaIndex, you can perform tracing with one line of code: `mlflow.langchain.autolog()` or `mlflow.llama_index.autolog()`

In [0]:
%%writefile fc_agent.py
from databricks.sdk import WorkspaceClient
from openai import OpenAI
import openai
import pandas as pd
from typing import Any, Union, Dict, List, Optional
import mlflow
from mlflow.pyfunc import ChatModel
from mlflow.types.llm import ChatCompletionResponse, ChatMessage, ChatParams, ChatChoice
from dataclasses import asdict
import dataclasses
import json
import backoff  # for exponential backoff on LLM rate limits


# Default configuration for the agent.
DEFAULT_CONFIG = {
    'endpoint_name': "databricks-meta-llama-3-1-70b-instruct",
    'temperature': 0.01,
    'max_tokens': 1000,
    'system_prompt': """You are a helpful assistant that answers questions about Databricks. Questions unrelated to Databricks are irrelevant.

    You answer questions using a set of tools. If needed, you ask the user follow-up questions to clarify their request.
    """,
    'max_context_chars': 4096 * 4
}

# OpenAI-formatted function for the retriever tool
RETRIEVER_TOOL_SPEC = [{
    "type": "function",
    "function": {
        "name": "search_product_docs",
        "description": "Use this tool to search for Databricks product documentation.",
        "parameters": {
            "type": "object",
            "required": ["query"],
            "additionalProperties": False,
            "properties": {
                "query": {
                    "description": "a set of individual keywords to find relevant docs for. each item of the array must be a single word.",
                    "type": "array",
                    "items": {
                        "type": "string"
                    }
                }
            },
        },
    },
}]

class FunctionCallingAgent(mlflow.pyfunc.ChatModel):
    """
    Class representing a function-calling agent that has one tool: a retriever using keyword-based search.
    """

    def __init__(self):
        """
        Initialize the OpenAI SDK client connected to Model Serving.
        Load the agent's configuration from MLflow Model Config.
        """
        # Initialize OpenAI SDK connected to Model Serving
        w = WorkspaceClient()
        self.model_serving_client: OpenAI = w.serving_endpoints.get_open_ai_client()

        # Load config
        # When this agent is deployed to Model Serving, the configuration loaded here is replaced with the config passed to mlflow.pyfunc.log_model(model_config=...)
        self.config = mlflow.models.ModelConfig(development_config=DEFAULT_CONFIG)

        # Configure playground, review app, and agent evaluation to display the chunks from the retriever 
        mlflow.models.set_retriever_schema(
            name="db_docs",
            primary_key="chunk_id",
            text_column="chunked_text",
            doc_uri="doc_uri",
        )

        # Load the retriever tool's docs.
        raw_docs_parquet = "https://github.com/databricks/genai-cookbook/raw/refs/heads/main/quick_start_demo/chunked_databricks_docs.snappy.parquet"
        self.docs = pd.read_parquet(raw_docs_parquet).to_dict("records")

        # Identify the function used as the retriever tool
        self.tool_functions = {
            'search_product_docs': self.search_product_docs
        }

    @mlflow.trace(name="rag_agent", span_type="AGENT")
    def predict(
        self, context=None, messages: List[ChatMessage]=None, params: Optional[ChatParams] = None
    ) -> ChatCompletionResponse:
        """
        Primary function that takes a user's request and generates a response.
        """
        if messages is None:
            raise ValueError("predict(...) called without `messages` parameter.")
        
        # Convert all input messages to dict from ChatMessage
        messages = convert_chat_messages_to_dict(messages)

        # Add system prompt
        request = {
                "messages": [
                    {"role": "system", "content": self.config.get('system_prompt')},
                    *messages,
                ],
            }
            
        # Ask the LLM to call tools and generate the response
        output= self.recursively_call_and_run_tools(
            **request
        )
        
        # Convert response to ChatCompletionResponse dataclass
        return ChatCompletionResponse.from_dict(output)
    
    @mlflow.trace(span_type="RETRIEVER")
    def search_product_docs(self, query: list[str]) -> list[dict]:
        """
        Retriever tool. Simple keyword-based retriever - would be replaced with a Vector Index
        """
        keywords = query
        if len(keywords) == 0:
            return []
        result = []
        for chunk in self.docs:
            score = sum(
                (keyword.lower() in chunk["chunked_text"].lower())
                for keyword in keywords
            )
            result.append(
                {
                    "page_content": chunk["chunked_text"],
                    "metadata": {
                        "doc_uri": chunk["url"],
                        "score": score,
                        "chunk_id": chunk["chunk_id"],
                    },
                }
            )
        ranked_docs = sorted(result, key=lambda x: x["metadata"]["score"], reverse=True)
        cutoff_docs = []
        context_budget_left = self.config.get("max_context_chars")
        for doc in ranked_docs:
            content = doc["page_content"]
            doc_len = len(content)
            if context_budget_left < doc_len:
                cutoff_docs.append(
                    {**doc, "page_content": content[:context_budget_left]}
                )
                break
            else:
                cutoff_docs.append(doc)
            context_budget_left -= doc_len
        return cutoff_docs

    ##
    # Helper functions below
    ##
    @backoff.on_exception(backoff.expo, openai.RateLimitError)
    def completions_with_backoff(self, **kwargs):
        """
        Helper: exponetially backoff if the LLM's rate limit is exceeded.
        """
        traced_chat_completions_create_fn = mlflow.trace(
            self.model_serving_client.chat.completions.create,
            name="chat_completions_api",
            span_type="CHAT_MODEL",
        )
        return traced_chat_completions_create_fn(**kwargs)

    def chat_completion(self, messages: List[ChatMessage]) -> ChatCompletionResponse:
        """
        Helper: Call the LLM configured via the ModelConfig using the OpenAI SDK
        """
        request = {"messages": messages, "temperature": self.config.get("temperature"), "max_tokens": self.config.get("max_tokens"),  "tools": RETRIEVER_TOOL_SPEC}
        return self.completions_with_backoff(
            model=self.config.get("endpoint_name"), **request,
                
        )

    @mlflow.trace(span_type="CHAIN")
    def recursively_call_and_run_tools(self, max_iter=10, **kwargs):
        """
        Helper: Recursively calls the LLM using the tools in the prompt. Either executes the tools and recalls the LLM or returns the LLM's generation.
        """
        messages = kwargs["messages"]
        del kwargs["messages"]
        i = 0
        while i < max_iter:
            with mlflow.start_span(name=f"iteration_{i}", span_type="CHAIN") as span:
                response = self.chat_completion(messages=messages)
                assistant_message = response.choices[0].message  # openai client
                tool_calls = assistant_message.tool_calls  # openai
                if tool_calls is None:
                    # the tool execution finished, and we have a generation
                    return response.to_dict()
                tool_messages = []
                for tool_call in tool_calls:  # TODO: should run in parallel
                    with mlflow.start_span(
                        name="execute_tool", span_type="TOOL"
                    ) as span:
                        function = tool_call.function  
                        args = json.loads(function.arguments)  
                        span.set_inputs(
                            {
                                "function_name": function.name,
                                "function_args_raw": function.arguments,
                                "function_args_loaded": args,
                            }
                        )
                        result = self.execute_function(
                            self.tool_functions[function.name], args
                        )
                        tool_message = {
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": result,
                        } 

                        tool_messages.append(tool_message)
                        span.set_outputs({"new_message": tool_message})
                assistant_message_dict = assistant_message.dict().copy()  
                del assistant_message_dict["content"]
                del assistant_message_dict["function_call"] 
                if "audio" in assistant_message_dict:
                    del assistant_message_dict["audio"]  # hack to make llama70b work
                messages = (
                    messages
                    + [
                        assistant_message_dict,
                    ]
                    + tool_messages
                )
                i += 1
        # TODO: Handle more gracefully
        raise "ERROR: max iter reached"

    def execute_function(self, tool, args):
        """
        Execute a tool and return the result as a JSON string
        """
        result = tool(**args)
        return json.dumps(result)
        
def convert_chat_messages_to_dict(messages: List[ChatMessage]):
    new_messages = []
    for message in messages:
        if type(message) == ChatMessage:
            # Remove any keys with None values
            new_messages.append({k: v for k, v in asdict(message).items() if v is not None})
        else:
            new_messages.append(message)
    return new_messages
    

# tell MLflow logging where to find the agent's code
mlflow.models.set_model(FunctionCallingAgent())

Empty `__init__.py` to allow the `FunctionCallingAgent()` to be imported.

In [0]:
%%writefile __init__.py

# Empty file


### Vibe check the agent

Test the agent for a sample query to see the MLflow Trace.

In [0]:
import fc_agent
from fc_agent import FunctionCallingAgent
fc_agent = FunctionCallingAgent()

response = fc_agent.predict(messages=[{"role": "user", "content": "What is lakehouse monitoring?"}])


## Step 3. Evaluate the agent

## Initial evaluation

**Challenges addressed**
- What are the right metrics to evaluate quality? How do I trust the outputs of these metrics?
- I need to evaluate many ideas - how do I…
    - …run evaluation quickly so the majority of my time isn’t spent waiting?
    - …quickly compare these different versions of my agent on quality, cost, and latency?
- How do I quickly identify the root cause of any quality problems?

**What is happening?**

Now, run Agent Evaluation's proprietary LLM judges using the synthetic evaluation set to see the quality, cost, and latency of the agent and identify any root causes of quality issues. Agent Evaluation is tightly integrated with `mlflow.evaluate()`. 

Mosaic AI Research has invested signficantly in the quality AND speed of the LLM judges, optimizing the judges to agree with human raters. Read more [details in our blog](https://www.databricks.com/blog/databricks-announces-significant-improvements-built-llm-judges-agent-evaluation) about how our judges outperform the competition. 

After evaluation runs, click `View Evaluation Results` to open the MLflow UI for this Run. This lets you:
- See summary metrics
- See root cause analysis that identifies the most important issues to fix
- Inspect individual responses to gain intuition about how the agent is performing
- See the judge outputs to understand why the responses were graded as pass or fail
- Compare between multiple runs to see how quality changed between experiments

You can also inspect the other tabs:
- `Overview` lets you see the agent's configuration and parameters
- `Artifacts` lets you see the agent's code

This UIs, coupled with the speed of evaluation, help you efficiently test your hypotheses to improve quality, letting you reach the production quality bar in less time. 

<img src="https://docs.databricks.com/_static/images/generative-ai/synth-evals/eval-1-optimized.gif"/>


In [0]:
from mlflow.models.resources import DatabricksServingEndpoint
import mlflow

# First, define a helper function so you can compare the agent across multiple parameters and LLMs.
def log_and_evaluate_agent(agent_config: dict, run_name: str):

    # Define the databricks resources so this logged agent is deployment ready
    resources = [DatabricksServingEndpoint(endpoint_name=agent_config["endpoint_name"])]

    # Start a run to contain the agent. `run_name` is a human-readable label for this run.
    with mlflow.start_run(run_name=run_name):
        # Log the agent's code and configuration to MLflow
        model_info = mlflow.pyfunc.log_model(
            python_model="fc_agent.py",
            artifact_path="agent",
            model_config=agent_config,
            resources=resources,
            input_example={
                "messages": [
                    {"role": "user", "content": "What is lakehouse monitoring?"}
                ]
            },
            pip_requirements=["databricks-sdk[openai]", "mlflow", "databricks-agents", "backoff"],
        )

        # Run evaluation
        eval_results = mlflow.evaluate(
            data=evals,  # Your evaluation set
            model=model_info.model_uri,  # Logged agent from above
            model_type="databricks-agent",  # activate Mosaic AI Agent Evaluation
        )

        return (model_info, eval_results)


# Now, call the helper function to run evaluation.
# The configuration keys must match those defined in `fc_agent.py`
model_info_llama_70b, eval_results = log_and_evaluate_agent(
    agent_config={
        "endpoint_name": "databricks-meta-llama-3-1-70b-instruct",
        "temperature": 0.01,
        "max_tokens": 1000,
        "system_prompt": """You are a helpful assistant that answers questions about Databricks. Questions unrelated to Databricks are irrelevant.

    You answer questions using a set of tools. If needed, you ask the user follow-up questions to clarify their request.
    """,
        "max_context_chars": 4096 * 4,
    },
    run_name="llama-3-1-70b-instruct",
)


### Compare multiple LLMs on quality, cost, and latency

**Challenges addressed**
- How to determine the foundational model that offers the right balance of quality, cost, and latency?

**What is happening?**

Normally, you would use the evaluation results to inform your hypotheses to improve quality, iteratively implementing, evaluating, and comparing each idea to the baseline. This demo assumes that you have fixed any root causes identified above and now want to optimize the agent for quality, cost, and latency. 

Here, you run evaluation for several LLMs. After the evaluation runs, click `View Evaluation Results` to open the MLflow UI for one of the runs. In the MLFLow Evaluations UI, use the **Compare to Run** dropdown to select another run name. This comparison view helps you quickly identify where the agent got better, worse, or stayed the same.

Then, go to the MLflow Experiement page and click the chart icon in the upper left corner by `Runs`. Here, you can compare the models quantiatively across quality, cost, and latency metrics. The number of tokens used serves as a proxy for cost.

This helps you make informed tradeoffs in partnership with your business stakeholders about quality, cost, and latency. Further, you can use this view to provide quantitative updates to your stakeholders so they can follow your progress improving quality.

<img src="https://docs.databricks.com/_static/images/generative-ai/synth-evals/eval-2-optimized.gif"/>

In [0]:
baseline_config = {
    "endpoint_name": "databricks-meta-llama-3-1-70b-instruct",
    "temperature": 0.01,
    "max_tokens": 1000,
    "system_prompt": """You are a helpful assistant that answers questions about Databricks. Questions unrelated to Databricks are irrelevant.

    You answer questions using a set of tools. If needed, you ask the user follow-up questions to clarify their request.
    """,
    "max_context_chars": 4096 * 4,
}

llama405b_config = baseline_config.copy()
llama405b_config["endpoint_name"] = "databricks-meta-llama-3-1-405b-instruct"
llama405b_config, _ = log_and_evaluate_agent(
    agent_config=llama405b_config,
    run_name="llama-3-1-405b-instruct",
)

# If you have an External Model, such as OpenAI, uncomment this code, and replace `<my-external-model-endpoint-name>` to include this model in the evaluation
# my_model_config = baseline_config.copy()
# my_model_config['endpoint_name'] = '<my-external-model-endpoint-name>'

# model_info_my_model_config, _ = log_and_evaluate_agent(
#     agent_config=my_model_config,
#     run_name=my_model_config['endpoint_name'],
# )


# Step 4. [Optional] Deploy the agent

### Deploy to pre-production for stakeholder testing

**Challenges addressed**
- How do I quickly create a Chat UI for stakeholders to test the agent?
- How do I track each piece of feedback and have it linked to what is happening in the bot so I can debug issues – without resorting to spreadsheets?

**What is happening?**

First, register one of the agent models that you logged above to Unity Catalog. Then, use Agent Framework to deploy the agent to Model serving using one line of code: `agents.deploy()`.

The resulting Model Serving endpoint:
- Is connected to the review app, which is a lightweight chat UI that can be shared with any user in your company, even if they don't have Databricks workspace access
- Is integrated with AI Gateway so every request and response and its accompanying MLflow trace and user feedback is stored in an Inference Table

Optionally, you can turn on Agent Evaluation’s monitoring capabilities, which are unified with the offline experience used above, and get a ready-to-go dashboard that runs judges on a sample of the traffic.

<img src="https://docs.databricks.com/_static/images/generative-ai/synth-evals/review-app-optimized.gif"/>


In [0]:
from databricks import agents
import mlflow

# Connect to the Unity Catalog model registry
mlflow.set_registry_uri("databricks-uc")

# Configure UC model location
UC_MODEL_NAME = f"catalog.schema.db_docs_agent"  # REPLACE WITH UC CATALOG/SCHEMA THAT YOU HAVE `CREATE MODEL` permissions in
assert (
    UC_MODEL_NAME != "catalog.schema.db_docs_agent"
), "Please replace 'catalog.schema.db_docs_agent' with your actual UC catalog and schema."

# Register the Llama 70b version to Unity Catalog
uc_registered_model_info = mlflow.register_model(
    model_uri=model_info_llama_70b.model_uri, name=UC_MODEL_NAME
)
# Deploy to enable the review app and create an API endpoint
deployment_info = agents.deploy(
    model_name=UC_MODEL_NAME, model_version=uc_registered_model_info.version
)


## Step 5. Deploy to production and monitor

**Challenges addressed**
- How do I host my agent as a production ready, scalable service?
- How do I execute tool code securely and ensure it respects my governance policies?
- How do I enable telemetry or observability in development and production?
- How do I monitor my agent’s quality at-scale in production? How do I quickly investigate and fix any quality issues?

With Agent Framework, production deployment is the same for pre-production and production - you already have a highly scalable REST API that can be intergated in your application. This API provides an endpoint to get agent responses and to pass back user feedback so you can use that feedback to improve quality.

To learn more about how monitoring works (in summary, Databricks has adapted a version of the above UIs and LLM judges for monitoring), read the documentation ([AWS](https://docs.databricks.com/en/generative-ai/agent-evaluation/evaluating-production-traffic.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/evaluating-production-traffic)) or watch this [2 minute video](https://www.youtube.com/watch?v=ldAzmKkvQTU).