In [0]:
%pip install -U mlflow
dbutils.library.restartPython()

🔧 Install and Initialize LangChain, LangGraph, and MLflow for Databricks Agent Integration

In [0]:
%pip install -U mlflow
dbutils.library.restartPython()


In [0]:
from langchain_community.chat_models import ChatDatabricks
from langchain_community.tools.databricks import UCFunctionToolkit
import requests
from langchain_core.messages import AIMessage
from langchain_core.tools import Tool
from langgraph.prebuilt import ToolNode

# Use ModelConfig if config.yml is loaded earlier
from mlflow.models import ModelConfig
config = ModelConfig(development_config="config.yml")

# Create the LLM using correct config key
llm = ChatDatabricks(endpoint=config.get("llm_endpoint"))


In [0]:
import mlflow
mlflow.langchain.autolog()


In [0]:

%pip install -U -qqqq mlflow-skinny langchain==0.2.16 langgraph-checkpoint==1.0.12 langchain_core langchain-community==0.2.16 langgraph==0.2.16 pydantic databricks_langchain
dbutils.library.restartPython()

🔁 Restart Python Kernel to Finalize Package Installation

In [0]:
dbutils.library.restartPython()

📦 Enable MLflow Autologging and Load Model Configuration

In [0]:
import mlflow
from mlflow.models import ModelConfig

mlflow.langchain.autolog()
config = ModelConfig(development_config="config.yml")

🛠️ Initialize Databricks LLM, UC Tools, and Custom Weather Tool

In [0]:
from langchain_community.chat_models import ChatDatabricks
from langchain_community.tools.databricks import UCFunctionToolkit
import requests
from langchain_core.messages import AIMessage
from langchain_core.tools import Tool
from langgraph.prebuilt import ToolNode

# Create the LLM using the correct key
llm = ChatDatabricks(endpoint=config.get("llm_endpoint"))

# Load UC Functions
uc_functions = config.get("uc_functions") if config.get("uc_functions") else []

tools = (
    UCFunctionToolkit(warehouse_id=config.get("warehouse_id"))
    .include(*uc_functions)
    .get_tools()
)

# Define a custom weather tool
def get_weather(location):
    url = f"http://api.openweathermap.org/geo/1.0/direct?q={location}&limit=1&appid=YOUR_API_KEY"
    response = requests.get(url).json()
    lat = response[0]['lat']
    lon = response[0]['lon']
    
    weather_url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid=YOUR_API_KEY"
    weather_response = requests.get(weather_url).json()
    return weather_response['weather'][0]['description']

weather_tool = Tool(
    name="get_weather",
    func=lambda location: get_weather(location),
    description="Fetches weather information for a specified location. The location should be provided as a string (e.g., 'Mumbai')."
)

tools.append(weather_tool)



🌦️ Define and Register a Custom Weather Tool Using OpenWeatherMap API

In [0]:
def get_weather(location):
            """
            Fetch the weather information for a particular location that the user is interested in.

            Parameters:
            location (string): the location whose weather information needs to be fetched

            Returns:
                weather (string): The weather information for the given location
            """
            url = "http://api.openweathermap.org/geo/1.0/direct?q=" + location + "&limit=1&appid=39f360dc1a168305bdbb5203ac7717b5"
            response=requests.get(url)
            get_response=response.json()
            latitude=get_response[0]['lat']
            longitude = get_response[0]['lon']

            url_final = "https://api.openweathermap.org/data/2.5/weather?lat=" + str(latitude) + "&lon=" + str(longitude) + "&appid=39f360dc1a168305bdbb5203ac7717b5"
            final_response = requests.get(url_final)
            final_response_json = final_response.json()
            weather=final_response_json['weather'][0]['description']
            print(weather)
            return(weather)
        

weather_tool = Tool(
    name="get_weather",
    func=lambda location: get_weather(location),
    description="Fetches weather information for a specified location. The location should be provided as a string (e.g., 'Mumbai')."
)

tools.append(weather_tool)

🧠 Utility Functions for Formatting and Parsing LangChain Agent Messages

In [0]:
from typing import Iterator, Dict, Any
from langchain_core.messages import (
    AIMessage,
    HumanMessage,
    ToolMessage,
    MessageLikeRepresentation,
)

import json

def stringify_tool_call(tool_call: Dict[str, Any]) -> str:
    """
    Convert a raw tool call into a formatted string that the playground UI expects if there is enough information in the tool_call
    """
    try:
        request = json.dumps(
            {
                "id": tool_call.get("id"),
                "name": tool_call.get("name"),
                "arguments": json.dumps(tool_call.get("args", {})),
            },
            indent=2,
        )
        return f"{request}"
    except:
        return str(tool_call)


def stringify_tool_result(tool_msg: ToolMessage) -> str:
    """
    Convert a ToolMessage into a formatted string that the playground UI expects if there is enough information in the ToolMessage
    """
    try:
        result = json.dumps(
            {"id": tool_msg.tool_call_id, "content": tool_msg.content}, indent=2
        )
        return f"{result}"
    except:
        return str(tool_msg)


def parse_message(msg) -> str:
    """Parse different message types into their string representations"""
    # tool call result
    if isinstance(msg, ToolMessage):
        return stringify_tool_result(msg)
    # tool call
    elif isinstance(msg, AIMessage) and msg.tool_calls:
        tool_call_results = [stringify_tool_call(call) for call in msg.tool_calls]
        return "".join(tool_call_results)
    # normal HumanMessage or AIMessage (reasoning or final answer)
    elif isinstance(msg, (AIMessage, HumanMessage)):
        return msg.content
    else:
        print(f"Unexpected message type: {type(msg)}")
        return str(msg)


def wrap_output(stream: Iterator[MessageLikeRepresentation]) -> Iterator[str]:
    """
    Process and yield formatted outputs from the message stream.
    The invoke and stream langchain functions produce different output formats.
    This function handles both cases.
    """
    for event in stream:
        # the agent was called with invoke()
        if "messages" in event:
            for msg in event["messages"]:
                yield parse_message(msg) + "\n\n"
        # the agent was called with stream()
        else:
            for node in event:
                for key, messages in event[node].items():
                    if isinstance(messages, list):
                        for msg in messages:
                            yield parse_message(msg) + "\n\n"
                    else:
                        print("Unexpected value {messages} for key {key}. Expected a list of `MessageLikeRepresentation`'s")
                        yield str(messages)

🔁 Custom LangGraph Tool-Calling Agent Workflow

In [0]:
from typing import (
    Annotated,
    Optional,
    Sequence,
    TypedDict,
    Union,
)

from langchain_core.language_models import LanguageModelLike
from langchain_core.messages import (
    BaseMessage,
    SystemMessage,
)
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool

from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt.tool_executor import ToolExecutor
from langgraph.prebuilt.tool_node import ToolNode


# We create the AgentState that we will pass around
# This simply involves a list of messages
class AgentState(TypedDict):
    """The state of the agent."""

    messages: Annotated[Sequence[BaseMessage], add_messages]


def create_tool_calling_agent(
    model: LanguageModelLike,
    tools: Union[ToolExecutor, Sequence[BaseTool]],
    agent_prompt: Optional[str] = None,
) -> CompiledGraph:
    model = model.bind_tools(tools)

    # Define the function that determines which node to go to
    def should_continue(state: AgentState):
        messages = state["messages"]
        last_message = messages[-1]
        # If there is no function call, then we finish
        if not last_message.tool_calls:
            return "end"
        else:
            return "continue"

    if agent_prompt:
        system_message = SystemMessage(content=agent_prompt)
        preprocessor = RunnableLambda(
            lambda state: [system_message] + state["messages"]
        )
    else:
        preprocessor = RunnableLambda(lambda state: state["messages"])
    model_runnable = preprocessor | model

    # Define the function that calls the model
    def call_model(
        state: AgentState,
        config: RunnableConfig,
    ):
        response = model_runnable.invoke(state, config)
        return {"messages": [response]}

    workflow = StateGraph(AgentState)

    workflow.add_node("agent", RunnableLambda(call_model))
    workflow.add_node("tools", ToolNode(tools))

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        # First, we define the start node. We use agent.
        # This means these are the edges taken after the agent node is called.
        "agent",
        # Next, we pass in the function that will determine which node is called next.
        should_continue,
        # The mapping below will be used to determine which node to go to
        {
            # If tools, then we call the tool node.
            "continue": "tools",
            # END is a special node marking that the graph should finish.
            "end": END,
        },
    )
    # We now add a unconditional edge from tools to agent.
    workflow.add_edge("tools", "agent")

    return workflow.compile()

🧠 Build and Visualize a LangGraph Agent with Optional System Prompt

In [0]:
from langchain_core.runnables import RunnableGenerator
from mlflow.langchain.output_parsers import ChatCompletionsOutputParser
from IPython.display import Image, display

# Create the agent with the system message if it exists
try:
    agent_prompt = config.get("agent_prompt")
    agent_with_raw_output = create_tool_calling_agent(
        llm, tools, agent_prompt=agent_prompt
    )
    display(Image(agent_with_raw_output.get_graph().draw_mermaid_png()))

except KeyError:
    agent_with_raw_output = create_tool_calling_agent(llm, tools)
agent = agent_with_raw_output | RunnableGenerator(wrap_output) | ChatCompletionsOutputParser()

 Stream and Display Agent Response to a Sample Weather Query

In [0]:
# TODO: replace this placeholder input example with an appropriate domain-specific example for your agent
for event in agent.stream({"messages": [{"role": "user", "content": "how much does green webcam cost?"}]}):
    print(event, "---" * 20 + "\n")
     

📦 Log the LangChain Agent to MLflow for Tracking and Deployment

In [0]:

mlflow.models.set_model(agent)

In [0]:
from langchain_core.runnables import RunnableGenerator
from mlflow.langchain.output_parsers import ChatCompletionsOutputParser
from IPython.display import Image, display


def build_agent_pipeline(llm, tools, config, wrap_output):
    """
    Constructs a LangChain agent pipeline that supports tool calling and output parsing.
    
    Args:
        llm: A language model (e.g., OpenAI, AzureOpenAI).
        tools: List of LangChain-compatible tools.
        config: A dictionary that may include 'agent_prompt'.
        wrap_output: A function to format/stream output.

    Returns:
        A composed agent pipeline ready for streaming.
    """

    try:
        agent_prompt = config.get("agent_prompt")
        agent_with_raw_output = create_tool_calling_agent(
            llm, tools, agent_prompt=agent_prompt
        )
        display(Image(agent_with_raw_output.get_graph().draw_mermaid_png()))
    except KeyError:
        agent_with_raw_output = create_tool_calling_agent(llm, tools)

    # Build final runnable agent pipeline
    agent = agent_with_raw_output | RunnableGenerator(wrap_output) | ChatCompletionsOutputParser()
    return agent


In [0]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Input field
query_input = widgets.Text(
    value='',
    placeholder='Ask about a product...',
    description='Query:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='80%')
)

# Output display
output = widgets.Output()

# Button
run_button = widgets.Button(
    description='Run Agent',
    button_style='success'
)

# Callback
def run_agent(b):
    output.clear_output()
    user_query = query_input.value

    if not user_query:
        with output:
            print("Please enter a query.")
        return

    with output:
        print(f"🧠 Agent running on: '{user_query}'")
        for event in agent.stream({"messages": [{"role": "user", "content": user_query}]}):
            print(event, end="")

# Link button to callback
run_button.on_click(run_agent)

# Display widgets
display(query_input, run_button, output)


In [0]:
print("🔧 Spark Version:", spark.version)
print("🧠 Cluster Node Type:", spark.conf.get("spark.databricks.clusterUsageTags.clusterNodeType"))
print("🧠 Driver Node Type:", spark.conf.get("spark.databricks.clusterUsageTags.driverNodeType"))
print("🔢 Number of Executors:", spark.conf.get("spark.databricks.clusterUsageTags.numExecutors"))
print("🕒 Auto Termination (mins):", spark.conf.get("spark.databricks.cluster.profile.autoTerminationMinutes"))
print("📦 Cluster ID:", spark.conf.get("spark.databricks.clusterUsageTags.clusterId"))
print("📍 Region:", spark.conf.get("spark.databricks.workspaceUrl").split('.')[0])






In [0]:
print("🧠 Cluster Node Type:", spark.conf.get("spark.databricks.clusterUsageTags.clusterNodeType", "Not available"))

In [0]:
for k, v in spark.conf.getAll.items():
    if "databricks" in k:
        print(f"{k}: {v}")
