# Laboratório de Agentes com GenAI

## Criação do Agente por Código

In [0]:
%pip install -U -qqqq mlflow langchain langgraph==0.3.4 databricks-langchain pydantic databricks-agents unitycatalog-langchain[databricks] databricks-openai uv

dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
catalog = "baraldi_catalog_new"
schema = "genai"

In [0]:
%%writefile agent.py
import json
from typing import Annotated, Any, Generator, Optional, Sequence, TypedDict, Union
from uuid import uuid4

import mlflow
from databricks_langchain import (
    ChatDatabricks,
    UCFunctionToolkit,
    VectorSearchRetrieverTool,
)
from langchain_core.language_models import LanguageModelLike
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    convert_to_openai_messages,
)
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt.tool_node import ToolNode
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)

############################################
# Define your LLM endpoint and system prompt
############################################
# TODO: Replace with your model serving endpoint
# LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)

# TODO: Update with your system prompt
system_prompt = "You are a helpful assistant that can run Python code."

###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################
tools = []

# You can use UDFs in Unity Catalog as agent tools
# Below, we add the `system.ai.python_exec` UDF, which provides
# a python code interpreter tool to our agent
# You can also add local LangChain python tools. See https://python.langchain.com/docs/concepts/tools

# TODO: Add additional tools
UC_TOOL_NAMES = ["system.ai.python_exec"]
uc_toolkit = UCFunctionToolkit(function_names=UC_TOOL_NAMES)
tools.extend(uc_toolkit.tools)

# Use Databricks vector search indexes as tools
# See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html#locally-develop-vector-search-retriever-tools-with-ai-bridge
# List to store vector search tool instances for unstructured retrieval.
VECTOR_SEARCH_TOOLS = []

# To add vector search retriever tools,
# use VectorSearchRetrieverTool and create_tool_info,
# then append the result to TOOL_INFOS.
# Example:
# VECTOR_SEARCH_TOOLS.append(
#     VectorSearchRetrieverTool(
#         index_name="",
#         # filters="..."
#     )
# )

tools.extend(VECTOR_SEARCH_TOOLS)

#####################
## Define agent logic
#####################


class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    custom_inputs: Optional[dict[str, Any]]
    custom_outputs: Optional[dict[str, Any]]


def create_tool_calling_agent(
    model: LanguageModelLike,
    tools: Union[ToolNode, Sequence[BaseTool]],
    system_prompt: Optional[str] = None,
):
    model = model.bind_tools(tools)

    # Define the function that determines which node to go to
    def should_continue(state: AgentState):
        messages = state["messages"]
        last_message = messages[-1]
        # If there are function calls, continue. else, end
        if isinstance(last_message, AIMessage) and last_message.tool_calls:
            return "continue"
        else:
            return "end"

    if system_prompt:
        preprocessor = RunnableLambda(
            lambda state: [{"role": "system", "content": system_prompt}] + state["messages"]
        )
    else:
        preprocessor = RunnableLambda(lambda state: state["messages"])
    model_runnable = preprocessor | model

    def call_model(
        state: AgentState,
        config: RunnableConfig,
    ):
        response = model_runnable.invoke(state, config)

        return {"messages": [response]}

    workflow = StateGraph(AgentState)

    workflow.add_node("agent", RunnableLambda(call_model))
    workflow.add_node("tools", ToolNode(tools))

    workflow.set_entry_point("agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "tools",
            "end": END,
        },
    )
    workflow.add_edge("tools", "agent")

    return workflow.compile()


class LangGraphResponsesAgent(ResponsesAgent):
    def __init__(self, agent):
        self.agent = agent

    def _langchain_to_responses(self, messages: list[BaseMessage]) -> list[dict[str, Any]]:
        "Convert from ChatCompletion dict to Responses output item dictionaries"
        for message in messages:
            message = message.model_dump()
            role = message["type"]
            if role == "ai":
                if tool_calls := message.get("tool_calls"):
                    return [
                        self.create_function_call_item(
                            id=message.get("id") or str(uuid4()),
                            call_id=tool_call["id"],
                            name=tool_call["name"],
                            arguments=json.dumps(tool_call["args"]),
                        )
                        for tool_call in tool_calls
                    ]
                else:
                    return [
                        self.create_text_output_item(
                            text=message["content"],
                            id=message.get("id") or str(uuid4()),
                        )
                    ]
            elif role == "tool":
                return [
                    self.create_function_call_output_item(
                        call_id=message["tool_call_id"],
                        output=message["content"],
                    )
                ]
            elif role == "user":
                return [message]

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
        return ResponsesAgentResponse(output=outputs, custom_outputs=request.custom_inputs)

    def predict_stream(
        self,
        request: ResponsesAgentRequest,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        cc_msgs = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input])

        for event in self.agent.stream({"messages": cc_msgs}, stream_mode=["updates", "messages"]):
            if event[0] == "updates":
                for node_data in event[1].values():
                    for item in self._langchain_to_responses(node_data["messages"]):
                        yield ResponsesAgentStreamEvent(type="response.output_item.done", item=item)
            # filter the streamed messages to just the generated text messages
            elif event[0] == "messages":
                try:
                    chunk = event[1][0]
                    if isinstance(chunk, AIMessageChunk) and (content := chunk.content):
                        yield ResponsesAgentStreamEvent(
                            **self.create_text_delta(delta=content, item_id=chunk.id),
                        )
                except Exception as e:
                    print(e)


# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()
mlflow.langchain.autolog()
agent = create_tool_calling_agent(llm, tools, system_prompt)
AGENT = LangGraphResponsesAgent(agent)
mlflow.models.set_model(AGENT)

Overwriting agent.py


In [0]:
from agent import AGENT

result = AGENT.predict({"input": [{"role": "user", "content": "What is 6*7 in Python?"}]})
print(result.model_dump(exclude_none=True))



{'object': 'response', 'output': [{'type': 'function_call', 'id': 'run--d359113d-46d6-44d5-a0c0-124f6337dcca', 'call_id': 'toolu_bdrk_01PUFG7yHZTYTUh8Pbowf5VH', 'name': 'system__ai__python_exec', 'arguments': '{"code": "print(6 * 7)"}'}, {'type': 'function_call_output', 'call_id': 'toolu_bdrk_01PUFG7yHZTYTUh8Pbowf5VH', 'output': '{"format": "SCALAR", "value": "42\\n"}'}, {'type': 'message', 'id': 'run--598f9abf-7f16-4887-ba01-376a7ca03161', 'content': [{'text': 'In Python, 6*7 equals 42. The asterisk (*) is the multiplication operator in Python, and this simple expression multiplies the number 6 by the number 7 to give the result 42.', 'type': 'output_text'}], 'role': 'assistant'}]}


Trace(trace_id=tr-32b78d2b11bc7f3ea4fa703adfa8ebf0)

In [0]:
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
from agent import UC_TOOL_NAMES, VECTOR_SEARCH_TOOLS
import mlflow
from mlflow.models.resources import DatabricksFunction
from pkg_resources import get_distribution

resources = []
for tool in VECTOR_SEARCH_TOOLS:
    resources.extend(tool.resources)
for tool_name in UC_TOOL_NAMES:
    resources.append(DatabricksFunction(function_name=tool_name))

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="agent",
        python_model="agent.py",
        pip_requirements=[
            "databricks-langchain",
            f"langgraph=={get_distribution('langgraph').version}",
            f"databricks-connect=={get_distribution('databricks-connect').version}",
        ],
        resources=resources,
    )

🔗 View Logged Model at: https://e2-demo-field-eng.cloud.databricks.com/ml/experiments/2964497923461911/models/m-1b7f8e5773df47b8838d70a8eca68d9c?o=1444828305810485
2025/09/24 18:49:46 INFO mlflow.pyfunc: Predicting on input example to validate output


In [0]:
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"input": [{"role": "user", "content": "What is 6*7 in Python?!"}]},
    env_manager="uv",
)

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

2025/09/24 18:49:57 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

2025/09/24 18:50:00 INFO mlflow.utils.virtualenv: Environment /tmp/virtualenv_envs/mlflow-3cac0f5ff7910cc1e1c603974e701a0fbefa1f52 already exists
2025/09/24 18:50:00 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /tmp/virtualenv_envs/mlflow-3cac0f5ff7910cc1e1c603974e701a0fbefa1f52/bin/activate && python -c ""']'
2025/09/24 18:50:00 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /tmp/virtualenv_envs/mlflow-3cac0f5ff7910cc1e1c603974e701a0fbefa1f52/bin/activate && python /local_disk0/.ephemeral_nfs/envs/pythonEnv-8a584c9d-8065-448b-bb15-3fd0175fac07/lib/python3.12/site-packages/mlflow/pyfunc/_mlflow_pyfunc_backend_predict.py --model-uri file:///local_disk0/user_tmp_data/spark-8a584c9d-8065-448b-bb15-3f/tmppec2hwww/agent --content-type json --input-path /local_disk0/user_tmp_data/spark-8a584c9d-8065-448b-bb15-3f/tmpk3sc11oh/input.json']'


{"object": "response", "output": [{"type": "function_call", "id": "run--69497ed2-bea4-4d2d-8546-a650dfb6c753", "call_id": "toolu_bdrk_01LP8EmUkG73mf544XtTGHxz", "name": "system__ai__python_exec", "arguments": "{\"code\": \"print(6 * 7)\"}"}, {"type": "function_call_output", "call_id": "toolu_bdrk_01LP8EmUkG73mf544XtTGHxz", "output": "{\"format\": \"SCALAR\", \"value\": \"42\\n\"}"}, {"type": "message", "id": "run--a7a2d12c-92e3-47b6-bb33-c439b425b17c", "content": [{"text": "In Python, 6*7 equals 42.", "type": "output_text"}], "role": "assistant"}]}

2025/09/24 18:50:12 INFO mlflow.tracing.export.async_export_queue: Flushing the async trace logging queue before program exit. This may take a while...


In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
model_name = "agentemodelo"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME)

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Successfully registered model 'baraldi_catalog_new.genai.agentemodelo'.


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/13 [00:00<?, ?it/s]

🔗 Created version '1' of model 'baraldi_catalog_new.genai.agentemodelo': https://e2-demo-field-eng.cloud.databricks.com/explore/data/models/baraldi_catalog_new/genai/agentemodelo/version/1?o=1444828305810485


In [0]:
from databricks import agents

agents.deploy(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    tags={"endpointSource": "docs"},
)

## Criação do Agente por UI

![AgentUI.png](./AgentUI.png "AgentUI.png")

### Criação da Genie

![Genie.png](./Genie.png "Genie.png")

### Criação do Knowledge Assistant

In [0]:
#Read pdf file from and use ai_parse_document to extract
from pyspark.sql.functions import expr
df=spark.read.format("binaryFile").load("/Volumes/baraldi_catalog/teste/volumepdf/NIPS-2017-attention-is-all-you-need-Paper.pdf").withColumn(


    "parsed",
    expr("ai_parse_document(content)"))
display(df)

In [0]:
#Extract all columns from dataframe
from pyspark.sql.functions import col, parse_json

df_copy=df.withColumn(
   "parsed_json",
   parse_json(col("parsed").cast("string"))) \
 .selectExpr(
   "path",
   "parsed_json:document:elements")
display(df_copy)

In [0]:
#Explode into multiple rows with content from each page
from pyspark.sql.functions import explode
from pyspark.sql.functions import from_json, explode, col


# Define the expected array type for your data
from pyspark.sql.types import ArrayType, StringType
array_schema = ArrayType(StringType())


# Convert 'elements' (VARIANT) to array by parsing as JSON string
df_copy2 = df_copy.withColumn("elements_array", from_json(col("elements").cast("string"), array_schema))

# Explode the new array column
df_copy3 = df_copy2.select("path", explode(col("elements_array")).alias("element"))
display(df_copy3)

In [0]:
display(df_copy3)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

#Add id to the dataframe
df_copy4=df_copy3.withColumn("id", monotonically_increasing_id())
display(df_copy4)

In [0]:
#Write dataframe to table users.daniel_baraldi.documents
df_copy4.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{catalog}.{schema}.documentos")

### Criação do Agente Orquestrador com Agent Bricks

![agentbricks.png](./agentbricks.png "agentbricks.png")