# Use Lakehouse Monitoring for GenAI to monitor your production agent

This notebook demonstrates how to monitor a deployed GenAI app / Agent using Lakehouse Monitoring for GenAI. It will:
1. Deploy a "hello world" agent using Agent Framework.
2. Configure quality monitoring using Agent Evaluation's LLM judges.
3. Send sample traffic to the deployed endpoint.

Lakehouse Monitoring for GenAI allows you to:
- Track quality and operational performance (latency, request volume, errors, etc.).
- Run LLM-based evaluations on production traffic to detect drift or regressions using Agent Evaluation's [LLM judges](https://docs.databricks.com/aws/en/generative-ai/agent-evaluation/llm-judge-reference)
- Deep dive into individual requests to debug and improve agent responses.
- Transform real-world logs into evaluation sets to drive continuous improvements.

**Note:** When you deploy agents authored with [ChatAgent](https://docs.databricks.com/aws/en/generative-ai/agent-framework/author-agent#-use-chatagent-to-author-agents) using Agent Frameworks' `agents.deploy(...)`, basic monitoring is automatically configured with operational metrics (request volume, latency, error rate, etc).  You can optionally configure quality metrics using Agent Evaluation's [propietary LLM judges](https://docs.databricks.com/aws/en/generative-ai/agent-evaluation/llm-judge-reference).

In [0]:
%pip install -U -qqqq databricks-agents>=0.17.0 databricks-sdk[openai] backoff uv
dbutils.library.restartPython()

## Select a Unity Catalog schema

Ensure you have CREATE TABLE and CREATE MODEL access in this schema.  By default, these values are set to your workspace's default catalog & schema.

In [0]:
# Get the workspace default UC catalog / schema
uc_default_location = spark.sql("select current_catalog() as current_catalog, current_schema() as current_schema").collect()[0]
current_catalog = uc_default_location["current_catalog"]
current_schema = uc_default_location["current_schema"]


# Modify the UC catalog / schema here or at the top of the notebook in the widget editor
dbutils.widgets.text("uc_catalog", current_catalog)
dbutils.widgets.text("uc_schema", current_schema)
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_PREFIX = f"{UC_CATALOG}.{UC_SCHEMA}"

## Agent creation and deployment

In this section, we will:
1. Create a simple agent by using Llama 70B
2. Log the agent using MLflow
3. Deploy the agent. This will automatically setup basic monitoring that tracks request volume, latency, and errors.

You can skip this step if you already have a deployed agent.

In [0]:
%%writefile hello_world_agent.py
from typing import Any, Generator, Optional

import mlflow
from databricks.sdk import WorkspaceClient
from mlflow.entities import SpanType
from mlflow.pyfunc.model import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)

mlflow.openai.autolog()

# Optional: Replace with any model serving endpoint
LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct"


class SimpleChatAgent(ChatAgent):
    def __init__(self):
        self.workspace_client = WorkspaceClient()
        self.client = self.workspace_client.serving_endpoints.get_open_ai_client()
        self.llm_endpoint = LLM_ENDPOINT_NAME

        # Fake documents to simulate the retriever
        self.documents = [
            mlflow.entities.Document(
                metadata={"doc_uri": "uri1.txt"},
                page_content="""Lakehouse Monitoring for GenAI helps you monitor the quality, cost, and latency of production GenAI apps.  Lakehouse Monitoring for GenAI allows you to:\n- Track quality and operational performance (latency, request volume, errors, etc.).\n- Run LLM-based evaluations on production traffic to detect drift or regressions using Agent Evaluation's LLM judges.\n- Deep dive into individual requests to debug and improve agent responses.\n- Transform real-world logs into evaluation sets to drive continuous improvements.""",
            ),
            # This is a new document about spark.
            mlflow.entities.Document(
                metadata={"doc_uri": "uri2.txt"},
                page_content="The latest spark version in databricks in 3.5.0",
            ),
        ]

        # Tell Agent Evaluation's judges and review app about the schema of your retrieved documents
        mlflow.models.set_retriever_schema(
            name="fake_vector_search",
            primary_key="doc_uri",
            text_column="page_content",
            doc_uri="doc_uri"
            # other_columns=["column1", "column2"],
        )

    @mlflow.trace(span_type=SpanType.RETRIEVER)
    def dummy_retriever(self):
      # Fake retriever
      return self.documents
  

    def prepare_messages_for_llm(
        self, messages: list[ChatAgentMessage]
    ) -> list[dict[str, Any]]:
        """Filter out ChatAgentMessage fields that are not compatible with LLM message formats"""
        compatible_keys = ["role", "content", "name", "tool_calls", "tool_call_id"]
        return [
            {
                k: v
                for k, v in m.model_dump_compat(exclude_none=True).items()
                if k in compatible_keys
            }
            for m in messages
        ]

    @mlflow.trace(span_type=SpanType.PARSER)
    def prepare_rag_prompt(self, messages):

        docs = self.dummy_retriever()

        messages = self.prepare_messages_for_llm(messages)

        messages[-1]['content'] = f"Answer the user's question based on the documents.\nDocuments: <documents>{docs}</documents>.\nUser's question: <user_question>{messages[-1]['content']}</user_question>"

        return messages

    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        
        messages = self.prepare_rag_prompt(messages)

        resp = self.client.chat.completions.create(
            model=self.llm_endpoint,
            messages=messages,
        )

        return ChatAgentResponse(
            messages=[
                ChatAgentMessage(**resp.choices[0].message.to_dict(), id=resp.id)
            ],
        )

    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        
        messages = self.prepare_rag_prompt(messages)

        for chunk in self.client.chat.completions.create(
            model=self.llm_endpoint,
            messages=messages,
            stream=True,
        ):
            if not chunk.choices or not chunk.choices[0].delta.content:
                continue

            yield ChatAgentChunk(
                delta=ChatAgentMessage(
                    **{
                        "role": "assistant",
                        "content": chunk.choices[0].delta.content,
                        "id": chunk.id,
                    }
                )
            )


from mlflow.models import set_model

AGENT = SimpleChatAgent()
set_model(AGENT)

## Test the agent locally

In [0]:
%load_ext autoreload 
%autoreload 2

In [0]:
from hello_world_agent import AGENT
AGENT.predict({
        "messages": [{"role": "user", "content": "How do I monitor my genai app?"}]
    })

In [0]:
import mlflow
from mlflow.models.resources import DatabricksServingEndpoint

with mlflow.start_run():
  model_info = mlflow.pyfunc.log_model(
    python_model="hello_world_agent.py",
    artifact_path="agent",
    input_example={
        "messages": [{"role": "user", "content": "How do I monitor my genai app?"}]
    },  
    resources=[DatabricksServingEndpoint(endpoint_name="databricks-meta-llama-3-3-70b-instruct")],
    pip_requirements=["databricks-sdk[openai]", "mlflow", "databricks-agents", "backoff"],    
  )

In [0]:
# Let's validate that the model can be loaded, and try invoking it.
mlflow.models.predict(
    model_uri=model_info.model_uri,
    input_data={"messages": [{"role": "user", "content": "How do I monitor my genai app?"}]},
    env_manager="uv",
)

In [0]:
from databricks.agents import deploy
import mlflow

# Set the name of the model to use in your Unity Catalog schema defined at the top of this notebook

MODEL_NAME = "my_demo_agent"

# Register the model in Unity Catalog and deploy it as a serving endpoint
mlflow.set_registry_uri("databricks-uc")
uc_model_info = mlflow.register_model(
    model_uri=model_info.model_uri, name=f"{UC_CATALOG}.{UC_SCHEMA}.{MODEL_NAME}"
)
deployment = deploy(model_name=uc_model_info.name, model_version=uc_model_info.version)

In [0]:
from databricks.sdk.service.serving import EndpointStateReady, EndpointStateConfigUpdate
from databricks.sdk import WorkspaceClient
import time

print("\nWaiting for endpoint to deploy.  This can take 10 - 20 minutes.", end="")
w = WorkspaceClient()
while w.serving_endpoints.get(deployment.endpoint_name).state.ready == EndpointStateReady.NOT_READY or w.serving_endpoints.get(deployment.endpoint_name).state.config_update == EndpointStateConfigUpdate.IN_PROGRESS:
    print(".", end="")
    time.sleep(30)
print(deployment.endpoint_name)
print("\nREADY!")

In [0]:
deployment

## Configuring Quality Monitoring Metrics

Since our agent was deployed using `agents.deploy`, basic monitoring (request volume, latency, errors) is already set up automatically.  The monitor is attached to this notebook's MLflow Experiment by default.

Now we'll add quality evaluation metrics that use LLM judges to our monitoring. The monitoring configuration specified here will:
 - Sample 100% of requests for evaluation
 - Evaluate responses against safety, relevance, chunk relevance, groundedness (lack of hallucinations) and custom guidelines

##### Agent Evaluation's built-in judges
- Judges that run without ground-truth labels or retrieval in traces:
  - `guideline_adherence`: guidelines allows developers write plain-language checklists or rubrics in their evaluation, improving transparency and trust with business stakeholders through easy-to-understand, structured grading rubrics. 
  - `safety`: making sure the response is safe
  - `relevance_to_query`: making sure the response is relevant
- For traces with retrieved docs (spans of type `RETRIEVER`):
   - `groundedness`: detect hallucinations
   - `chunk_relevance`: chunk-level relevance to the query

See the full list of built-in judges ([AWS](https://docs.databricks.com/aws/en/generative-ai/agent-evaluation/llm-judge-reference) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/llm-judge-reference)).


In [0]:
from databricks.agents.evals.monitors import create_monitor, get_monitor, update_monitor, delete_monitor

# Get the current monitor configuration 
monitor = get_monitor(endpoint_name=deployment.endpoint_name)

In [0]:
# Update the monitor to add evaluation metrics
monitor = update_monitor(
    endpoint_name=deployment.endpoint_name,
    monitoring_config={
        "sample": 1,  # Sample 100% of requests - this can be any number from 0 (0%) to 1 (100%).
        # Select 0+ of Agent Evaluation's built-in judges
        "metrics": ['guideline_adherence', 'groundedness', 'safety', 'relevance_to_query', 'chunk_relevance'],
        # Customize these guidelines based on your business requirements.  These guidelines will be analyzed using Agent Evaluation's built in guideline_adherence judge
        "global_guidelines": {
            "english": ["The response must be in English."],
            "clarity": ["The response must be clear, coherent, and concise."],
            "relevant_if_not_refusal": ["Determine if the response provides an answer to the user's request.  A refusal to answer is considered relevant.  However, if the response is NOT a refusal BUT also doesn't provide relevant information, then the answer is not relevant."],
            "no_answer_if_no_docs": ["If the agent can not find a relevant document, it should refuse to answer the question and not discuss the reasons why it could not answer."]
        }
    }
)

## Generate Sample Traffic

Now that our endpoint is deployed, we'll send some sample questions to generate traffic for monitoring.

#### Helper function to send simulated traffic

In [0]:
from mlflow import deployments

client = deployments.get_deploy_client("databricks")

questions = [
    "What is Mosaic AI Agent Evaluation?",
    "How do you use MLflow with Databricks for experiment tracking?",
    "What should I use Databricks Feature Store for?",
    "How does AutoML work in Databricks?",
    "What is Model Serving in Databricks and what are its deployment options?",
    "How does Databricks handle distributed deep learning training?",
    "Does Unity Catalog support models?",
    "What is the Databricks Lakehouse?",
    "Which Llama models are supported on Databricks?",
    "How does Databricks integrate with popular ML frameworks like PyTorch and TensorFlow?"
]

for i, question in enumerate(questions, 1):
    print(f"\nQuestion {i}: {question}")  
    response = client.predict(
        endpoint=deployment.endpoint_name,
        inputs={
            "messages": [
                {"role": "user", "content": question}
            ]
        }
    )
    print(response)
    

## [Optional] Enable integration with Review App and Evaluation Sets

To fix any quality issues identified in the monitoring dashboard, you can:
1. Copy the production trace to an evaluation dataset to use it as a test case in `mlflow.evaluate(...)`
2. Send the production trace to the Review App to collect domain expert input/labels

To enable these features, you need to create an Evaluation Set and Labeling Session.  For more information on these concepts, see [the documentation](https://docs.databricks.com/aws/en/generative-ai/agent-evaluation/review-app#datasets).

#### 1. Create an evaluation dataset

Your monitor will show all evaluation datasets linked to the MLflow Eperiment where the monitor is configured - by default, this is the Notebook's MLflow Experiment.

In [0]:
from databricks.agents import datasets
from databricks.sdk.errors.platform import NotFound

# Make sure you have updated the uc_catalog & uc_schema widgets to a valid catalog/schema where you have CREATE TABLE permissions.
EVAL_DATASET_NAME = "agent_evaluation_set"

UC_TABLE_NAME = f'{UC_CATALOG}.{UC_SCHEMA}.{EVAL_DATASET_NAME}'

# Remove the evaluation dataset if it already exists
try:
  datasets.delete_dataset(UC_TABLE_NAME)
except NotFound:
  pass

# Create the evaluation dataset
dataset = datasets.create_dataset(UC_TABLE_NAME)

#### 2. Create a labeling session

Your monitor will show all labeling sessions linked to the MLflow Eperiment where the monitor is configured - by default, this is the Notebook's MLflow Experiment.

In [0]:
from databricks.agents import review_app

# OPTIONAL: Add a comma separated list of domain experts who will provide feedback/alebsl
# If not provided, only the user running this notebook will be granted access to the review app.
DOMAIN_EXPERT_EMAILS = []

# Get the Review App from the current MLflow Experiment
my_review_app = review_app.get_review_app()

# Optional: Add a custom question for your domain experts
my_review_app.create_label_schema(
  name="good_response",
  # Type can be "expectation" or "feedback".
  type="feedback",
  title="Is this a good response?",
  input=review_app.label_schemas.InputCategorical(options=["Yes", "No"]),
  instruction="Optional: provide a rationale below.",
  enable_comment=True,
  overwrite=True
)

my_session = my_review_app.create_labeling_session(
    name="collect_facts",
    assigned_users=DOMAIN_EXPERT_EMAILS, # If not provided, only the user running this notebook will be granted access
    # Built-in labeling schemas: EXPECTED_FACTS, GUIDELINES, EXPECTED_RESPONSE
    label_schemas=[review_app.label_schemas.GUIDELINES,  "good_response"],
)

# URLs to share with the SME.
print("Review App URL:", my_review_app.url)
print("Labeling session URL: ", my_session.url)

### Viewing Monitoring Results

The monitoring results are stored in Delta tables and can be accessed in two ways:
1. Through the MLflow UI (click the link generated above)
1. Directly querying the Delta table containing evaluated traces

Below, we'll query the Delta table to see the evaluation results, filtering out skipped evaluations.

If you do not see monitoring results, wait until the next run of the monitoring job.

In [0]:
# Read evaluated traces from Delta
display(spark.table(monitor.evaluated_traces_table).filter("evaluation_status != 'skipped'"))

## Cleanup

When you're done with the demo, you can delete the endpoint and the monitor using the code below. 

In [0]:
databricks.agents.delete_deployment(model_name=uc_model_info.name, model_version=uc_model_info.version)