In [None]:
from langchain_core.messages.ai import AIMessage
from botocore.config import Config
from langchain.chat_models import init_chat_model
from langchain_community.agent_toolkits import FileManagementToolkit
from langchain_tavily import TavilySearch
from langchain_core.messages import (
    BaseMessage,
    SystemMessage,
    AIMessage,
    ToolMessage,
    ToolCall
)
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages

from dotenv import load_dotenv
load_dotenv(override=True)

True

In [2]:
web_search = TavilySearch(max_results=2, topic="general")

In [3]:
working_directory = "report"

file_toolkit = FileManagementToolkit(
    root_dir = str(working_directory),
    selected_tools = ["write_file"]
)

In [4]:
file_toolkit

FileManagementToolkit(root_dir='report', selected_tools=['write_file'])

In [5]:
file_toolkit.get_tools()

[WriteFileTool(root_dir='report')]

In [7]:
file_toolkit.get_tools()[0]

WriteFileTool(root_dir='report')

In [8]:
write_file = file_toolkit.get_tools()[0]

In [9]:
tools = [web_search, write_file]
tools

[TavilySearch(max_results=2, topic='general', api_wrapper=TavilySearchAPIWrapper(tavily_api_key=SecretStr('**********'), api_base_url=None)),
 WriteFileTool(root_dir='report')]

In [11]:
print(tools[0].name)
print(tools[1].name)

tavily_search
write_file


In [12]:
from rich import print
# 使用するツールのリスト
tools = [web_search, write_file]
tools_by_name = {tool.name: tool for tool in tools}

print(tools_by_name)

In [None]:
cfg = Config(
    read_timeout=300,
)

llm_with_tools = init_chat_model(
    # model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
    model_provider="bedrock_converse",
    config=cfg,
).bind_tools(tools)

print(llm_with_tools.invoke("こんにちは"))



In [14]:
# システムプロンプト
system_prompt = """
あなたの責務はユーザからのリクエストを調査し、調査結果をファイル出力することです。
- ユーザーのリクエスト調査にWeb検索が必要であれば、Web検索ツールを使ってください。
- 必要な情報が集まったと判断したら検索は終了して下さい。
- 検索は最大2回までとしてください。
- ファイル出力はHTML形式(.html)に変換して保存してください。
  * Web検索が拒否された場合、Web検索を中止してレポート作成してください。
  * レポート保存を拒否された場合、レポート作成を中止し、内容をユーザーに直接伝えて下さい。
"""

In [15]:
@task
def invoke_llm(messages: list[BaseMessage]) -> AIMessage:
    response = llm_with_tools.invoke(
        [SystemMessage(content=system_prompt)] + messages
    )
    return response

In [None]:
@task
def use_tool(tool_call):
    tool = tools_by_name[tool_call["name"]]
    observation = tool.invoke(tool_call["args"])

    return ToolMessage(content=observation, tool_call_id=tool_call["id"])

In [None]:
def ask_human(tool_call: ToolCall):
    tool_name = tool_call["name"]
    tool_args = tool_call["args"]
    tool_data = {"name": tool_name}
    if tool_name == web_search.name:
        args =  f'* ツール名\n'
        args += f'  * {tool_name}\n'
        args += "* 引数\n"
        for key, value in tool_args.items():
            args += f'  * {key}\n'
            args += f'    * {value}\n'

        tool_data["args"] = args
    elif tool_name == write_file.name:
        args =  f'* ツール名\n'
        args += f'  * {tool_name}\n'
        args += f'* 保存ファイル名\n'
        args += f'  * {tool_args["file_path"]}'
        tool_data["html"] = tool_args["text"]
    tool_data["args"] = args

    feedback = interrupt(tool_data)
    
    if feedback == "APPROVE": # ユーザーがツール利用を承認したとき
        return tool_call

    # ユーザーがツール利用を承認しなかったとき(DENY)
    return ToolMessage(
        content="ツール利用が拒否されたため、処理を終了してください。", 
        name=tool_name, 
        tool_call_id=tool_call["id"]
    )

In [None]:
checkpointer = MemorySaver()
@entrypoint(checkpointer)
def agent(messages):
    llm_response = invoke_llm(messages.result())
    print(llm_response)

    while True:
        if not llm_response.tool_calls:
            break

        approve_tools = []
        tool_results = []

        for tool_call in llm_response.tool_calls:
            feedback = ask_human(tool_call)

            if isinstance(feedback, ToolMessage):
                tool_results.append(feedback)
            else:
                approve_tools.append(feedback)

        tool_futures = []
        for tool_call in approve_tools:
            future = use_tool(tool_call)
            tool_futures.append(future)

        tool_use_results = []
        for future in tool_futures:
            result = future.result()
            tool_use_results.append(result)

        messages = add_messages(
            messages,
            [llm_response, *tool_use_results, *tool_results]
        )

        llm_response = invoke_llm(messages).result()
    
    return llm_response


In [None]:
# チェックポインターの設定
checkpointer = MemorySaver()
@entrypoint(checkpointer)
def agent(messages):
    # LLMを呼び出し
    llm_response = invoke_llm(messages).result()
    
    # ツール呼び出しがある限り繰り返す
    while True:
        if not llm_response.tool_calls:
            break

        approve_tools = []
        tool_results = []
        
        # 各ツール呼び出しに対してユーザーの承認を求める
        for tool_call in llm_response.tool_calls:
            feedback = ask_human(tool_call)
            if isinstance(feedback, ToolMessage):
                tool_results.append(feedback)
            else:
                approve_tools.append(feedback)

        # 承認されたツールを実行
        tool_futures = []
        for tool_call in approve_tools:
            future = use_tool(tool_call)   # 非同期実行を開始
            tool_futures.append(future)

        # Future が完了するのを待って結果だけを集める
        tool_use_results = []
        for future in tool_futures:
            result = future.result()       # 完了まで待ち、結果を取得
            tool_use_results.append(result)

        # メッセージリストに追加
        messages = add_messages(
            messages,
            [llm_response, *tool_use_results, *tool_results]
        )

        # モデルを再度呼び出し
        llm_response = invoke_llm(messages).result()
    
    # 最終結果を返す
    return llm_response