#Step 0: Depedencies and Demo Data

We need to install the necessary libraries and some demo data for the demo to run properly

In [0]:
%run ./config

In [0]:
%run ./rag_setup/rag_setup

# Disclaimer 

In order for Langgraph to run, we need to write the code into a separate file called **agent.py**. We will go through each step of setting up Langgraph in pieces to exaplin what is happening, then combine all of it in one cell to create the file.

#What is Langgraph? 

Langgraph is a stateful, agentic framework from Langchain that simplifies agent development. It allows developers to create complex agent workflows by organizing language model calls, tool use, and memory into structured, composable components that can be deployed as robust applications. LangGraph's key innovation is enabling dynamic, multi-step interactions between different "agents" while maintaining contextual awareness and providing clear visualization of the application's flow.

We will break each portion of Langgraph out to demonstrate this structure and explain what is happening.

#Step 0: Import the necessary libraries

In [0]:
from typing import Any, Generator, Optional, Sequence, Union

from config_import import demo_schema, catalog, chatBotModel, vectorSearchIndexName, embeddings_endpoint, agent_schema #we have some default configurations that we will import. You are welcome to change these. 

import mlflow
from databricks_langchain import (
    ChatDatabricks,
    UCFunctionToolkit,
    VectorSearchRetrieverTool,
    DatabricksEmbeddings
)
from unitycatalog.ai.core.databricks import DatabricksFunctionClient
from unitycatalog.ai.core.base import set_uc_function_client

from langchain_core.language_models import LanguageModelLike
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.tool_node import ToolNode
from mlflow.langchain.chat_agent_langgraph import ChatAgentState, ChatAgentToolNode
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)

#Step 1: Define your LLM Endpoint and System Prompt

We need to specify what LLM we plan to use for this application. On Databricks, any LLM that you host on Model serving, be it an open source model or external model, can be used. Model Serving handles compatibility between different providers allowing you to use any model you bring to Databricks 



In [0]:

# chatBotModel = "databricks-meta-llama-3-3-70b-instruct"
LLM_ENDPOINT_NAME = chatBotModel
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME) #Langgraph's implementation on what model to use

# TODO: Update with your system prompt
system_prompt = f"""## Instructions for Testing the Databricks Documentation Assistant chatbot

Your inputs are invaluable for the development team. By providing detailed feedback and corrections, you help us fix issues and improve the overall quality of the application. We rely on your expertise to identify any gaps or areas needing enhancement."""

#Step 2: Define our Tools

### Mosaic AI Tools on Unity Catalog

You can create and host functions/tools on Unity Catalog! You get the benefit of Unity Catalog but for your functions! 

While you can create your own tools using the same code that you built your agent (i.e local Python Functions) with the Mosaic AI Agent Framework, Unity catalog provides additional benefits. Here is a comparison 

1. **Unity Catalog function**s: Unity Catalog functions are defined and managed within Unity Catalog, offering built-in security and compliance features. Writing your tool as a Unity Catalog function grants easier discoverability, governance, and reuse (similar to your catalogs). Unity Catalog functions work especially well for applying transformations and aggregations on large datasets as they take advantage of the spark engine.

2. **Agent code tools**: These tools are defined in the same code that defines the AI agent. This approach is useful when calling REST APIs, using arbitrary code or libraries, or executing low-latency tools. However, this approach lacks the built-in discoverability and governance provided by Unity Catalog functions.

Unity Catalog functions have the same limitations seen here: https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-sql-function.html 

Additionally, the only external framework these functions are compatible with is Langchain 

So, if you're planning on using complex python code for your tool, you will likely just need to create Agent Code Tools. 

###Langgraph Implementation
For Langgraph, we need to define and list our tools out before hand so that we can pass it into the application. 

###First, let's define our own UDFs or Unity Catalog Functions

In [0]:
%sql
CREATE OR REPLACE FUNCTION identifier(CONCAT(:catalog_name||'.'||:agent_schema||'.','purchase_location'))()
    RETURNS Table(name STRING, purchases INTEGER)
    COMMENT 'Use this tool to find total purchase information about a particular location. This tool will provide a list of destinations that you will use to help you answer questions. Only use this if the user asks about locations.'
    RETURN SELECT dl.name AS Destination, count(tp.destination_id) AS Total_Purchases_Per_Destination
             FROM main.dbdemos_fs_travel.travel_purchase tp join main.dbdemos_fs_travel.destination_location dl on tp.destination_id = dl.destination_id
             group by dl.name
             order by count(tp.destination_id) desc
             LIMIT 10

###Next, let's add to as one of the tools that Langgraph Agent Can call

We need to specific the location of the function using catalog.schema.tool_name, which we set in the configuration file. The * simply means that we want the agent to be able to use all UDFs located in that schema. If you have multiple functions, it will considerr all the functions for use

In [0]:
###############################################################################
## To create and see usage examples of more tools, see https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################
tools = []

# Add additional tools
client = DatabricksFunctionClient()
set_uc_function_client(client)
uc_tool_names = [f"{catalog}.{agent_schema}.*"] # you can specify individual tools as list but we are going to select all for this demo
uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names)
tools.extend(uc_toolkit.tools)

# Use Databricks vector search indexes as tools
# See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html for details

# Add vector search indexes, we set this up using rag_setup. 
vector_search_tools = [
        VectorSearchRetrieverTool(
          index_name=f"{catalog}.{demo_schema}.{vectorSearchIndexName}",
          tool_name="databricks_docs_retriever",
          tool_description="Retrieves information about Databricks products from official Databricks documentation.",
          columns=["id", "url", "content"],
          embedding=DatabricksEmbeddings(endpoint=embeddings_endpoint),
          text_column="content",
        )
]
tools.extend(vector_search_tools)

#Step 3: Define the Agent's Logic

Now we have to set up the code that dictates the logic the Agent will go through. Langgraph has a concept called Nodes and Edges that help dictate and enforce what step an Agent should take next which could be a new Agent or a new tool. We compile this Langgraph workflow at the end to represent the state of the graph. Between each node, the ChatAgentState is passed so that the Agent remembers what has happened previously 

In [0]:
def create_tool_calling_agent(
    model: LanguageModelLike,
    tools: Union[ToolNode, Sequence[BaseTool]],
    system_prompt: Optional[str] = None,
) -> CompiledGraph:
    model = model.bind_tools(tools)

    # Define the function that determines which node to go to
    def should_continue(state: ChatAgentState):
        messages = state["messages"]
        last_message = messages[-1]
        # If there are function calls, continue. else, end
        if last_message.get("tool_calls"):
            return "continue"
        else:
            return "end"

    if system_prompt:
        preprocessor = RunnableLambda(
            lambda state: [{"role": "system", "content": system_prompt}]
            + state["messages"]
        )
    else:
        preprocessor = RunnableLambda(lambda state: state["messages"])
    model_runnable = preprocessor | model

    def call_model(
        state: ChatAgentState,
        config: RunnableConfig,
    ):
        response = model_runnable.invoke(state, config)

        return {"messages": [response]}

    workflow = StateGraph(ChatAgentState)

    workflow.add_node("agent", RunnableLambda(call_model))
    workflow.add_node("tools", ChatAgentToolNode(tools))

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "tools",
            "end": END,
        },
    )
    workflow.add_edge("tools", "agent")

    return workflow.compile()

#Step 4: Create a ChatAgent Model

ChatAgent is a new MLflow Interface that enforces a chat schema to author conversational agents. It is an important step in reviewing our Agent's performance, confirm tools calls and support multi-agent scenarios. We will see this in action with Databrick's Review App. 

Check out MLflow's documentaton here: https://mlflow.org/docs/latest/api_reference/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ChatAgent

We complete the setup by telling MLflow where the Agent configuration is located and enabling mlflow.langchain.autolog which include Langgraph. This enables MLflow traces, experiments and other logging capabilities critical in managing our Agent in production. 

In [0]:
class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {"messages": self._convert_messages_to_dict(messages)}

        messages = []
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {"messages": self._convert_messages_to_dict(messages)}
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
                )

mlflow.langchain.autolog()
agent = create_tool_calling_agent(llm, tools, system_prompt)
AGENT = LangGraphChatAgent(agent)
mlflow.models.set_model(AGENT)

#Completed Code

Now we run all our code to create an agent.py file that we will use to run the Langgraph agent

In [0]:
%%writefile agent.py
from typing import Any, Generator, Optional, Sequence, Union

from config_import import demo_schema, catalog, chatBotModel, vectorSearchIndexName, embeddings_endpoint, agent_schema
import mlflow
from databricks_langchain import (
    ChatDatabricks,
    UCFunctionToolkit,
    VectorSearchRetrieverTool,
    DatabricksEmbeddings
)
from unitycatalog.ai.core.databricks import DatabricksFunctionClient
from unitycatalog.ai.core.base import set_uc_function_client
from langchain_core.language_models import LanguageModelLike
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.tool_node import ToolNode
from mlflow.langchain.chat_agent_langgraph import ChatAgentState, ChatAgentToolNode
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)
############################################
# Define your LLM endpoint and system prompt
############################################
# TODO: Replace with your model serving endpoint
# LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct"
LLM_ENDPOINT_NAME = chatBotModel
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)

# TODO: Update with your system prompt
system_prompt = f"""## Instructions for Testing the Databricks Documentation Assistant chatbot

Your inputs are invaluable for the development team. By providing detailed feedback and corrections, you help us fix issues and improve the overall quality of the application. We rely on your expertise to identify any gaps or areas needing enhancement."""

###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################
tools = []

# You can use UDFs in Unity Catalog as agent tools
# Below, we add the `system.ai.python_exec` UDF, which provides
# a python code interpreter tool to our agent
# You can also add local LangChain python tools. See https://python.langchain.com/docs/concepts/tools

# Add additional tools
client = DatabricksFunctionClient()
set_uc_function_client(client)
uc_tool_names = [f"{catalog}.{agent_schema}.*"]
uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names)
tools.extend(uc_toolkit.tools)

# Use Databricks vector search indexes as tools
# See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html
# for details

# Add vector search indexes
vector_search_tools = [
        VectorSearchRetrieverTool(
          index_name=f"{catalog}.{demo_schema}.{vectorSearchIndexName}",
          tool_name="databricks_docs_retriever",
          tool_description="Retrieves information about Databricks products from official Databricks documentation. This must be used",
          columns=["id", "url", "content"]
        )
]
tools.extend(vector_search_tools)

#####################
## Define agent logic
#####################


def create_tool_calling_agent(
    model: LanguageModelLike,
    tools: Union[ToolNode, Sequence[BaseTool]],
    system_prompt: Optional[str] = None,
) -> CompiledGraph:
    model = model.bind_tools(tools)

    # Define the function that determines which node to go to
    def should_continue(state: ChatAgentState):
        messages = state["messages"]
        last_message = messages[-1]
        # If there are function calls, continue. else, end
        if last_message.get("tool_calls"):
            return "continue"
        else:
            return "end"

    if system_prompt:
        preprocessor = RunnableLambda(
            lambda state: [{"role": "system", "content": system_prompt}]
            + state["messages"]
        )
    else:
        preprocessor = RunnableLambda(lambda state: state["messages"])
    model_runnable = preprocessor | model

    def call_model(
        state: ChatAgentState,
        config: RunnableConfig,
    ):
        response = model_runnable.invoke(state, config)

        return {"messages": [response]}

    workflow = StateGraph(ChatAgentState)

    workflow.add_node("agent", RunnableLambda(call_model))
    workflow.add_node("tools", ChatAgentToolNode(tools))

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "tools",
            "end": END,
        },
    )
    workflow.add_edge("tools", "agent")

    return workflow.compile()


class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {"messages": self._convert_messages_to_dict(messages)}

        messages = []
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {"messages": self._convert_messages_to_dict(messages)}
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
                )


# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()
mlflow.langchain.autolog()
agent = create_tool_calling_agent(llm, tools, system_prompt)
AGENT = LangGraphChatAgent(agent)
mlflow.models.set_model(AGENT)

#Test the Langgraph Agent

Now that we have a theoretically working Agent. Let's make sure it works! 

In [0]:
dbutils.library.restartPython()

In [0]:
from agent import AGENT

result = AGENT.predict({"messages": [{"role": "user", "content": "Hello! what is databricks?"}]})
print(f"Full Payload which shows the Agent determining what tool to use and the tool it called before providing answer: {result}.\n\n**Actual Answer**: {result.messages[-1].content}")

#Log the Langgraph Agent

Now that we have a confirmed working Langgraph Agent, we need to log the model to MLflow. This will capture all the dependencies and necessary credentials we need to deploy the Langgraph Agent.

If you plan on using other Databricks features that need credentials, you can use automatic authetication passthrough. This is what our _resources_ variable is for: https://docs.databricks.com/aws/en/generative-ai/agent-framework/deploy-agent#authentication-for-dependent-resources

In [0]:
import mlflow
from agent import tools, LLM_ENDPOINT_NAME
from databricks_langchain import VectorSearchRetrieverTool
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint
from unitycatalog.ai.langchain.toolkit import UnityCatalogTool

mlflow.set_registry_uri("databricks-uc")

resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]
for tool in tools:
    if isinstance(tool, VectorSearchRetrieverTool):
        resources.extend(tool.resources)
    elif isinstance(tool, UnityCatalogTool):
        resources.append(DatabricksFunction(function_name=tool.uc_function_name))


with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        artifact_path="agent",
        python_model="agent.py",
        pip_requirements=[
            "mlflow",
            "langgraph==0.3.4",
            "databricks-langchain",
        ],
        resources=resources,
        code_paths=['config_import.py']
    )

#Register the MLflow Model to Unity Catalog

To prepare our model for serving, we must make sure the model is registered to Unity Catalog

In [0]:
from config_import import demo_schema, catalog, finalchatBotModelName

# TODO: define the catalog, schema, and model name for your UC model
catalog = catalog
schema = demo_schema
model_name = finalchatBotModelName
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

# RAG in Production

This workshop is not to show you how to set up RAG on Databricks. Please check out our self paced learning here: <insert link here> 

You can follow the notebooks in the folder called RAG to set one up. However, this workshop we will demonstrate what it looks like to prepare and monitor your RAG application in Production. 



### Evaluate your bot's quality with Mosaic AI Agent Evaluation specialized LLM judge models

Evaluation is a key part of deploying a RAG application. Databricks simplify this tasks with specialized LLM models tuned to evaluate your bot's quality/cost/latency, even if ground truth is not available.

This Agent Evaluation's specialized AI evaluator is integrated into integrated into `mlflow.evaluate(...)`, all you need to do is pass `model_type="databricks-agent"`.

Mosaic AI Agent Evaluation evaluates:
1. Answer correctness - requires ground truth
2. Hallucination / groundness - no ground truth required
3. Answer relevance - no ground truth required
4. Retrieval precision - no ground truth required
5. (Lack of) Toxicity - no ground truth required

In this example, we'll use an evaluation set that we curated based on our internal experts using the Mosaic AI Agent Evaluation review app interface.  This proper Eval Dataset is saved as a Delta Table.

In [0]:
from databricks.agents.evals import generate_evals_df
import mlflow

agent_description = "A chatbot that answers questions about Databricks Documentation."
question_guidelines = """
Questions must strictly be about Databricks and its documentation. 
# User personas
- A developer new to the Databricks platform and documentation
# Example questions
- What API lets me parallelize operations over rows of a delta table?
"""
# TODO: Spark/Pandas DataFrame with "content" and "doc_uri" columns.
docs = spark.table(f"{catalog}.{demo_schema}.databricks_documentation")
docs = docs.withColumnRenamed("url", "doc_uri")
evals = generate_evals_df(
    docs=docs,
    num_evals=10,
    agent_description=agent_description,
    question_guidelines=question_guidelines,
)
print(evals)
eval_result = mlflow.evaluate(data=evals, model=logged_agent_info.model_uri, model_type="databricks-agent")

In [0]:
eval_dataset = spark.table(f"{catalog}.{demo_schema}.eval_set_databricks_documentation").sample(fraction=0.1, seed=2).limit(10).toPandas()
display(eval_dataset)

In [0]:
with mlflow.start_run():
    # Evaluate the logged model
    eval_results = mlflow.evaluate(
        data=eval_dataset, # Your evaluation set
        model=logged_agent_info.model_uri,
        model_type="databricks-agent", # active Mosaic AI Agent Evaluation
    )

#Deploy your Agent/Model

Databricks provides an easy function called deploy() from the Mosaic AI Agent Framework. This deploys a Model Serving Endpoint on a CPU resource, a Feedback Model and a Review App UI with a shareable link. This is a critical step in review your application as you can send the Review App to subject matter experts for review. They can then provide feedback directly on the app that you can later review. 

In [0]:
from databricks import agents
agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags = {"endpointSource": "docs"})

# Mosaic AI Model Training for Fine Tuning LLMs 

We do not expect you for this workshop to fine tune an LLM. However, we will be demonstrating the performance impact of fine-tuning a Llama-1B model through the playground! 

We trained this model on a dataset containing medical terms. While larger models can handle these words well, the smaller models struggle with them since they are rarely used. 