# 1. Unity Catalog Function を用いたツール呼び出しエージェントシステム

## 概要
- **Databricks SDK** を使用して Genie Conversation API を呼び出す関数を定義
- **Unity Catalog** に Function として登録
- その関数を **ツールノード** として定義する
- **完全な制御とカスタマイズ性**

In [0]:
# Install Unity Catalog AI integration packages with the Databricks extra
%pip install -U -qqq unitycatalog-ai[databricks] unitycatalog-langchain[databricks] databricks-langchain mlflow-skinny[databricks] langgraph==0.3.4 databricks-agents uv
%pip uninstall -y databricks-connect pyspark
%pip install databricks-connect

dbutils.library.restartPython()

## Genie API を叩く関数を作る

In [0]:
def ask_bakehouse_genie(question: str, token: str) -> str:
    """
    DatabricksのText-to-SQL ツールであるGenieを呼び出し、質問文字列に対する最終回答テキストを返す。
    この Genie Space にはベーカリーフランチャイズビジネスのシミュレーションデータが含まれており、
    販売トランザクション、顧客情報、フランチャイズ情報、サプライヤー情報、メディアレビューなどが含まれています。

    Args:
        question: ユーザーの質問（日本語可）
        token: Databricks の Personal Access Token
    Returns:
        最終回答のテキスト
    """
    from databricks.sdk import WorkspaceClient
    import pandas as pd
    GENIE_SPACE_ID = "***"
    w = WorkspaceClient(
        host="***",
        token=token
    )

    # 会話を開始（完了まで待機）
    msg = w.genie.start_conversation_and_wait(
        space_id=GENIE_SPACE_ID,
        content=question,
    )

    # 添付から query 結果を探す
    for att in getattr(msg, "attachments", []) or []:
        att_id = getattr(att, "attachment_id", None)
        if not att_id:
            continue

        qres = w.genie.get_message_attachment_query_result(
            space_id=msg.space_id,
            conversation_id=msg.conversation_id,
            message_id=msg.message_id,
            attachment_id=att_id,
        )

        stmt = qres.as_dict().get("statement_response") or {}
        result = stmt.get("result") or {}
        manifest = stmt.get("manifest") or {}
        columns = (manifest.get("schema") or {}).get("columns") or []
        data = result.get("data_array") or []

        if not columns or not data:
            return None

        header = [str(c["name"]) for c in columns]
        df = pd.DataFrame(data, columns=header)

        # 表を文字列に整形
        return df.to_json()

    return None

In [0]:
token = dbutils.secrets.get(
    scope="20250826_CDDN", key="tkamei_PAT"
)
ask_bakehouse_genie("売上トップ3のフランチャイズ店舗はどこですか？", token)

In [0]:

def ask_weather_genie(question: str, token: str) -> str:
    """
    DatabricksのText-to-SQL ツールであるGenieを呼び出し、質問文字列に対する最終回答テキストを返す。
    このGenie SpaceにはAccuWeatherの気象データを含む2つのテーブルが含まれています。
    各テーブルは、トップ50のグローバル都市の1ヶ月分の予測および歴史的気象データを提供します。
    データは、温度、湿度、降水量、風速などの気象パラメータを含み、メートル法単位で表されています。

    Args:
        question: ユーザーの質問（日本語可）
        token: Databricks の Personal Access Token
    Returns:
        最終回答のテキスト
    """
    from databricks.sdk import WorkspaceClient
    import pandas as pd
    GENIE_SPACE_ID = "***"
    w = WorkspaceClient(
        host="***",
        token=token
    )

    # 会話を開始（完了まで待機）
    msg = w.genie.start_conversation_and_wait(
        space_id=GENIE_SPACE_ID,
        content=question,
    )

    # 添付から query 結果を探す
    for att in getattr(msg, "attachments", []) or []:
        att_id = getattr(att, "attachment_id", None)
        if not att_id:
            continue

        qres = w.genie.get_message_attachment_query_result(
            space_id=msg.space_id,
            conversation_id=msg.conversation_id,
            message_id=msg.message_id,
            attachment_id=att_id,
        )

        stmt = qres.as_dict().get("statement_response") or {}
        result = stmt.get("result") or {}
        manifest = stmt.get("manifest") or {}
        columns = (manifest.get("schema") or {}).get("columns") or []
        data = result.get("data_array") or []

        if not columns or not data:
            return None

        header = [str(c["name"]) for c in columns]
        df = pd.DataFrame(data, columns=header)

        # 表を文字列に整形（pandas に任せる）
        return df.to_json()

    return None

In [0]:
token = dbutils.secrets.get(
    scope="20250826_CDDN", key="tkamei_PAT"
)
ask_weather_genie("東京の2024年7月の平均気温は何度ですか？", token)

## Genie API を叩く関数を Unity Catalog に登録する

In [0]:
from unitycatalog.ai.core.databricks import DatabricksFunctionClient

client = DatabricksFunctionClient()

bakehouse genie の登録

In [0]:
CATALOG = "20250826_cddn"
SCHEMA = "agent_with_custom_tool"

function_info = client.create_python_function(
  func=ask_bakehouse_genie,
  catalog=CATALOG,
  schema=SCHEMA,
  replace=True,
  dependencies=[
    "databricks-sdk>=0.30.0","pandas"
  ]
)

In [0]:
# token を secret から取得して渡すラッパー関数
raw_func = f"{CATALOG}.{SCHEMA}.ask_bakehouse_genie"
tool_func = f"{CATALOG}.{SCHEMA}.ask_bakehouse_genie_tool"

sql_body = f"""
CREATE OR REPLACE FUNCTION {tool_func}(
  question STRING COMMENT '質問（日本語可）'
)
RETURNS STRING
LANGUAGE SQL
COMMENT 'DatabricksのText-to-SQL ツールであるGenieを呼び出し、質問文字列に対する最終回答テキストを返す。この Genie Space にはベーカリーフランチャイズビジネスのシミュレーションデータが含まれており、販売トランザクション、顧客情報、フランチャイズ情報、サプライヤー情報、メディアレビューなどが含まれています。'
RETURN {raw_func}(question, secret('20250826_CDDN', 'tkamei_PAT'));
"""
client.create_function(sql_function_body=sql_body)

In [0]:
result = client.execute_function(
  function_name=f"{CATALOG}.{SCHEMA}.ask_bakehouse_genie_tool",
  parameters={"question": "売上トップ3のフランチャイズ店舗はどこですか？"}
)

result.value 

weather genie の登録

In [0]:
CATALOG = "20250826_cddn"
SCHEMA = "agent_with_custom_tool"

function_info = client.create_python_function(
  func=ask_weather_genie,
  catalog=CATALOG,
  schema=SCHEMA,
  replace=True,
  dependencies=[
    "databricks-sdk>=0.30.0","pandas"
  ]
)

In [0]:
# token を secret から取得して渡すラッパー関数
raw_func = f"{CATALOG}.{SCHEMA}.ask_weather_genie"
tool_func = f"{CATALOG}.{SCHEMA}.ask_weather_genie_tool"

sql_body = f"""
CREATE OR REPLACE FUNCTION {tool_func}(
  question STRING COMMENT '質問（日本語可）'
)
RETURNS STRING
LANGUAGE SQL
COMMENT 'DatabricksのText-to-SQL ツールであるGenieを呼び出し、質問文字列に対する最終回答テキストを返す。このGenie SpaceにはAccuWeatherの気象データを含む2つのテーブルが含まれています。各テーブルは、トップ50のグローバル都市の1ヶ月分の予測および歴史的気象データを提供します。データは、温度、湿度、降水量、風速などの気象パラメータを含み、メートル法単位で表されています。'
RETURN {raw_func}(question, secret('20250826_CDDN', 'tkamei_PAT'));
"""
function_info = client.create_function(sql_function_body=sql_body)

In [0]:
result = client.execute_function(
  function_name=f"{CATALOG}.{SCHEMA}.ask_weather_genie_tool",
  parameters={"question": "東京の2024年7月の平均気温は何度ですか？"}
)

result.value 

## Agent.py の作成（Models From Code）

%%writefile を使用してagent.pyファイルを作成します。これはMLflow Models From Codeで使用されます。

In [0]:
%%writefile agent.py
from typing import TypedDict, Annotated, Sequence, Optional, Any, List, Generator
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph.message import add_messages

from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, convert_to_openai_messages
from langchain_core.runnables import RunnableLambda
from databricks_langchain import ChatDatabricks, UCFunctionToolkit
import mlflow
from mlflow.pyfunc import ChatAgent
from mlflow.langchain.chat_agent_langgraph import ChatAgentState
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)

#######################################
# ツールコールエージェントグラフの定義
#######################################
CATALOG = "20250826_cddn"
SCHEMA = "agent_with_custom_tool"
bakehouse_genie_func_name = f"{CATALOG}.{SCHEMA}.ask_bakehouse_genie_tool"
weather_genie_func_name = f"{CATALOG}.{SCHEMA}.ask_weather_genie_tool"

toolkit = UCFunctionToolkit(function_names=[bakehouse_genie_func_name, weather_genie_func_name])
tools = toolkit.tools

LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct"
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME).bind_tools(tools)

system_prompt = """You are a tool-routing assistant. Reply in the user’s language (default: Japanese).

# 重要方針
- ツール（UC Function）を使うと決めたら、ユーザーの直近メッセージを **一字一句そのまま** `question` 引数に渡すこと。
  - **改変禁止**：翻訳・要約・言い換え・付け足し（例:「ください」「最終回答のみで」など）・メタ注釈・前置き・後置き・句読点や空白の正規化・大文字小文字/全角半角の変更・改行の除去/追加・引用符/コードブロックの付与・不要なトリミングは一切しない。
  - 連続メッセージがある場合は「直近のユーザーメッセージ」だけを渡す（ユーザーが「前の質問も含めて」と明示した場合のみ、指示どおりの順序で **原文ママ連結** する）。
- トークンやシークレットには触れない。`ask_bakehouse_genie_tool(question: STRING)` と `ask_weather_genie_tool(question: STRING)` は内部で Secret を注入するため、追加引数は渡さない。
- 非同期実行はしない。待機指示・時間見積もりはしない。

# ルーティング規則
- ベーカリーの販売・顧客・フランチャイズ・サプライヤー・メディアレビュー等に関するデータ質問 → `ask_bakehouse_genie_tool`
- 天気・気象に関するデータ質問（都市/日付/気温/降水量など） → `ask_weather_genie_tool`
- 上記以外の一般的な会話やツール不要の雑談 → ツール呼び出しを行わず、通常応答。
- どちらか迷った場合は **質問文を改変せず**、より関連が強い方のツールを1つだけ呼び出す。

# 出力ポリシー
- ツールが返す文字列（JSON を想定）を解析し、表形式が適切なら見やすいテーブルに整形して提示する。
  - レコードが多い場合は上位のみを表示し、件数と省略を明記。
  - 数値・単位はそのまま保持し、勝手な丸めや再解釈をしない。
- ツールが空・不正・エラーっぽい場合は、「元の質問はそのまま送ったが結果が取得できなかった」旨を簡潔に伝え、**質問文の改変を提案しない**まま、ユーザーに追加の条件（期間/対象/粒度など）を任意で示せる選択肢を短く提示する。
- 思考過程は開示しない。確信のない推測や補完をしない。データの範囲・限界は簡潔に示す。

# ツール仕様（参照用）
- `ask_bakehouse_genie_tool(question: STRING) -> STRING`
- `ask_weather_genie_tool(question: STRING) -> STRING`
（どちらも `question` のみをそのまま渡す。余計な文字は絶対に付けない。）
"""

class AgentState(ChatAgentState):
    """拡張されたエージェント状態"""
    messages: Annotated[Sequence[BaseMessage], add_messages]

def call_model(state: AgentState):
    preprocessor = RunnableLambda(
        lambda s: [{"role": "system", "content": system_prompt}] + list(s["messages"])
    )
    model_runnable = preprocessor | llm
    ai_msg = model_runnable.invoke(state)
    return {"messages": [ai_msg]}

tool_node = ToolNode(tools)

graph = StateGraph(AgentState)
graph.add_node("agent", RunnableLambda(call_model))
graph.add_node("tools", tool_node)

graph.set_entry_point("agent")
graph.add_conditional_edges("agent", tools_condition, {"tools": "tools", END: END})
graph.add_edge("tools", "agent")

toolcall_agent = graph.compile()

class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {
            "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
        }

        config: dict[str, Any] = {}
        if custom_inputs:
            config.setdefault("configurable", {}).update(custom_inputs)

        out_messages: list[ChatAgentMessage] = []
        for event in self.agent.stream(request, stream_mode="updates", config=config):
            for node_data in event.values():
                out_messages.extend(
                    ChatAgentMessage(**convert_to_openai_messages(msg), id=msg.id) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=out_messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {
            "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
        }

        config: dict[str, Any] = {}
        if custom_inputs:
            config.setdefault("configurable", {}).update(custom_inputs)

        for event in self.agent.stream(request, stream_mode="updates", config=config):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": convert_to_openai_messages(msg)}, id=msg.id)
                    for msg in node_data.get("messages", [])
                )

mlflow.langchain.autolog()
AGENT = LangGraphChatAgent(toolcall_agent)
mlflow.models.set_model(AGENT)

## エージェントのテスト

In [0]:
dbutils.library.restartPython()

In [0]:
from agent import AGENT

toolcall_agent = AGENT.agent
toolcall_agent

In [0]:
from agent import AGENT

response = AGENT.predict({"messages": [{"role": "user", "content": "売上トップ3のフランチャイズ店舗はどこですか？"}]})

print(response.messages[-1].content)

In [0]:
response = AGENT.predict({"messages": [{"role": "user", "content": "東京の2024年7月の平均気温は何度ですか？"}]})

print(response.messages[-1].content)

## まとめ

このノートブックでは、以下を実装しました：
1. **Databricks SDK** を使用した Genie Conversation API の呼び出し関数作成
2. **Unity Catalog Function** としての登録
3. **UCFunctionToolkit** を使用したツールの作成
4. **LangGraph** によるTool-Callエージェントシステムの構築
5. **MLflow Models From Code** によるモデル管理

### 嬉しさ
- Genie 呼び出し関数を Unity Catalog に登録するので、他のエージェントシステム等と共有できる
- Genie 呼び出しの出力形式を比較的自由に整形できる