# Keyword Finder Agent - 動作確認用ノートブック

このノートブックでは `tools/src/agents/keyword_finder/agent.py` の各コンポーネントを
ブロックごとに確認できます。

## 1. 環境設定とインポート

In [None]:
# パスの設定（toolsディレクトリをパスに追加）
import sys
from pathlib import Path

# toolsディレクトリをパスに追加
tools_dir = Path.cwd().parent
if str(tools_dir) not in sys.path:
    sys.path.insert(0, str(tools_dir))

print(f"Tools directory: {tools_dir}")

In [None]:
# 環境変数の読み込み（.envファイルがある場合）
from dotenv import load_dotenv

load_dotenv(tools_dir / ".env")
print("環境変数を読み込みました")

In [None]:
# 基本的なインポート
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import END, StateGraph

print("LangChain/LangGraphのインポート完了")

In [None]:
# プロジェクトモジュールのインポート
from src.agents.keyword_finder.prompts import (
    INTEGRATOR_SYSTEM_PROMPT,
    INTEGRATOR_USER_PROMPT,
    PLANNER_SYSTEM_PROMPT,
    PLANNER_USER_PROMPT,
    REFLECTOR_SYSTEM_PROMPT,
    REFLECTOR_USER_PROMPT,
)
from src.agents.keyword_finder.schemas import (
    AgentState,
    KeywordSearchInput,
    KeywordSearchOutput,
    Plan,
    ReflectionResult,
    ToolResult,
)
from src.common import get_logger, settings
from src.models import get_chat_model, get_structured_model
from src.tools import get_related_keywords, search_web

print("プロジェクトモジュールのインポート完了")

In [None]:
# ロガーの設定
logger = get_logger(__name__)

# 利用可能なツール
TOOLS = [search_web, get_related_keywords]

print(f"利用可能なツール: {[t.name for t in TOOLS]}")

## 2. 入力データの準備

In [None]:
# テスト用の入力データを作成
input_data = KeywordSearchInput(
    category="エンジニア",
    seed_keywords=["Python", "機械学習"],
    depth=2,
)

print(f"カテゴリ: {input_data.category}")
print(f"シードキーワード: {input_data.seed_keywords}")
print(f"深掘りレベル: {input_data.depth}")

In [None]:
# 初期状態を作成
initial_state: AgentState = {
    "input": input_data,
    "messages": [],
    "plan": None,
    "tool_results": [],
    "discovered_keywords": [],
    "reflection": None,
    "retry_count": 0,
    "output": None,
}

print("初期状態を作成しました")
print(f"状態のキー: {list(initial_state.keys())}")

## 3. 計画立案ノード (Planner Node)

In [None]:
# プロンプトの確認
print("=== PLANNER_SYSTEM_PROMPT ===")
print(PLANNER_SYSTEM_PROMPT)
print()
print("=== PLANNER_USER_PROMPT ===")
print(PLANNER_USER_PROMPT)

In [None]:
# 計画立案ノードの関数を定義
def planner(state: AgentState) -> dict:
    """計画立案ノード"""
    logger.info("計画立案を開始")

    input_data = state["input"]
    model = get_structured_model(Plan)

    messages = [
        SystemMessage(content=PLANNER_SYSTEM_PROMPT),
        HumanMessage(
            content=PLANNER_USER_PROMPT.format(
                category=input_data.category,
                seed_keywords=", ".join(input_data.seed_keywords),
                depth=input_data.depth,
            )
        ),
    ]

    plan = model.invoke(messages)
    logger.info(f"計画立案完了: {len(plan.subtasks)}個のサブタスク")

    return {
        "plan": plan,
        "messages": messages,
    }

print("planner関数を定義しました")

In [None]:
# 計画立案を実行
planner_result = planner(initial_state)

print("=== 計画立案結果 ===")
print(f"サブタスク数: {len(planner_result['plan'].subtasks)}")
print()
for i, subtask in enumerate(planner_result['plan'].subtasks, 1):
    print(f"{i}. {subtask}")

In [None]:
# 状態を更新
state_after_plan = {**initial_state, **planner_result}
print(f"計画: {state_after_plan['plan']}")

## 4. ツール実行ノード (Executor Node)

In [None]:
# ツール実行ノードの関数を定義
def executor(state: AgentState) -> dict:
    """ツール実行ノード"""
    logger.info("ツール実行を開始")

    plan = state["plan"]
    if not plan:
        return {}

    tool_results = []
    discovered_keywords = list(state.get("discovered_keywords", []))

    model = get_chat_model().bind_tools(TOOLS)

    for subtask in plan.subtasks:
        logger.info(f"サブタスク実行: {subtask}")

        # ツールを実行
        messages = [
            SystemMessage(content="サブタスクを実行してください。"),
            HumanMessage(content=subtask),
        ]

        response = model.invoke(messages)

        # ツール呼び出しがある場合は実行
        if response.tool_calls:
            for tool_call in response.tool_calls:
                tool_name = tool_call["name"]
                tool_args = tool_call["args"]

                logger.info(f"ツール呼び出し: {tool_name}({tool_args})")

                # ツールを実行
                if tool_name == "search_web":
                    results = search_web.invoke(tool_args)
                    tool_results.append(
                        ToolResult(
                            tool_name=tool_name,
                            query=tool_args.get("query", ""),
                            results=results,
                        )
                    )
                elif tool_name == "get_related_keywords":
                    results = get_related_keywords.invoke(tool_args)
                    discovered_keywords.extend(results)
                    tool_results.append(
                        ToolResult(
                            tool_name=tool_name,
                            query=tool_args.get("keyword", ""),
                            results=results,
                        )
                    )

    logger.info(
        f"ツール実行完了: {len(tool_results)}回実行, "
        f"{len(discovered_keywords)}個のキーワード発見"
    )

    return {
        "tool_results": tool_results,
        "discovered_keywords": discovered_keywords,
    }

print("executor関数を定義しました")

In [None]:
# ツール実行を実行
executor_result = executor(state_after_plan)

print("=== ツール実行結果 ===")
print(f"ツール実行回数: {len(executor_result['tool_results'])}")
print(f"発見したキーワード数: {len(executor_result['discovered_keywords'])}")
print()
print("発見したキーワード（最初の10個）:")
for kw in executor_result['discovered_keywords'][:10]:
    print(f"  - {kw}")

In [None]:
# 状態を更新
state_after_execute = {**state_after_plan, **executor_result}
print(f"発見したキーワード数: {len(state_after_execute['discovered_keywords'])}")

## 5. 内省ノード (Reflector Node)

In [None]:
# プロンプトの確認
print("=== REFLECTOR_SYSTEM_PROMPT ===")
print(REFLECTOR_SYSTEM_PROMPT)
print()
print("=== REFLECTOR_USER_PROMPT ===")
print(REFLECTOR_USER_PROMPT)

In [None]:
# 内省ノードの関数を定義
def reflector(state: AgentState) -> dict:
    """内省ノード"""
    logger.info("内省を開始")

    input_data = state["input"]
    discovered_keywords = state.get("discovered_keywords", [])
    tool_results = state.get("tool_results", [])

    # ツール結果のサマリーを作成
    tool_summary = "\n".join(
        f"- {r.tool_name}: {r.query} → {len(r.results)}件"
        for r in tool_results
    )

    model = get_structured_model(ReflectionResult)

    messages = [
        SystemMessage(content=REFLECTOR_SYSTEM_PROMPT),
        HumanMessage(
            content=REFLECTOR_USER_PROMPT.format(
                category=input_data.category,
                seed_keywords=", ".join(input_data.seed_keywords),
                discovered_keywords="\n".join(
                    f"- {kw}" for kw in discovered_keywords[:30]
                ),
                tool_results_summary=tool_summary,
            )
        ),
    ]

    reflection = model.invoke(messages)
    logger.info(f"内省完了: 十分={reflection.is_sufficient}")

    return {
        "reflection": reflection,
        "retry_count": state.get("retry_count", 0) + 1,
    }

print("reflector関数を定義しました")

In [None]:
# 内省を実行
reflector_result = reflector(state_after_execute)

print("=== 内省結果 ===")
print(f"十分か: {reflector_result['reflection'].is_sufficient}")
print(f"フィードバック: {reflector_result['reflection'].feedback}")
print(f"追加クエリ: {reflector_result['reflection'].additional_queries}")
print(f"リトライ回数: {reflector_result['retry_count']}")

In [None]:
# 状態を更新
state_after_reflect = {**state_after_execute, **reflector_result}
print(f"内省結果: {state_after_reflect['reflection']}")

## 6. 継続判定 (Should Continue)

In [None]:
# 継続判定関数を定義
def should_continue(state: AgentState) -> str:
    """継続判定"""
    reflection = state.get("reflection")
    retry_count = state.get("retry_count", 0)

    if reflection and reflection.is_sufficient:
        return "integrate"

    if retry_count >= settings.max_retry_count:
        logger.warning(f"最大リトライ回数（{settings.max_retry_count}）に達しました")
        return "integrate"

    return "execute"

print("should_continue関数を定義しました")

In [None]:
# 継続判定を実行
next_step = should_continue(state_after_reflect)
print(f"次のステップ: {next_step}")
print(f"（最大リトライ回数: {settings.max_retry_count}）")

## 7. 結果統合ノード (Integrator Node)

In [None]:
# プロンプトの確認
print("=== INTEGRATOR_SYSTEM_PROMPT ===")
print(INTEGRATOR_SYSTEM_PROMPT)
print()
print("=== INTEGRATOR_USER_PROMPT ===")
print(INTEGRATOR_USER_PROMPT)

In [None]:
# 結果統合ノードの関数を定義
def integrator(state: AgentState) -> dict:
    """結果統合ノード"""
    logger.info("結果統合を開始")

    input_data = state["input"]
    discovered_keywords = state.get("discovered_keywords", [])
    tool_results = state.get("tool_results", [])

    # 検索結果のサマリー
    search_summary = []
    for r in tool_results:
        if r.tool_name == "search_web":
            for result in r.results[:3]:
                search_summary.append(f"- {result.title}: {result.snippet}")

    model = get_structured_model(KeywordSearchOutput)

    messages = [
        SystemMessage(content=INTEGRATOR_SYSTEM_PROMPT),
        HumanMessage(
            content=INTEGRATOR_USER_PROMPT.format(
                category=input_data.category,
                seed_keywords=", ".join(input_data.seed_keywords),
                discovered_keywords="\n".join(
                    f"- {kw}" for kw in set(discovered_keywords)
                ),
                search_results="\n".join(search_summary) or "なし",
            )
        ),
    ]

    output = model.invoke(messages)
    logger.info(f"結果統合完了: {len(output.results)}個のキーワード")

    return {"output": output}

print("integrator関数を定義しました")

In [None]:
# 結果統合を実行
integrator_result = integrator(state_after_reflect)

print("=== 結果統合 ===")
output = integrator_result['output']
print(f"カテゴリ: {output.category}")
print(f"シードキーワード: {output.seed_keywords}")
print(f"結果数: {len(output.results)}")
print()
print("=== キーワード結果 ===")
for result in output.results:
    print(f"\nキーワード: {result.keyword}")
    print(f"  競合度: {result.competition}")
    print(f"  関連度: {result.relevance_score}")
    print(f"  トピック案: {result.suggested_topics}")

In [None]:
# サマリーを表示
print("=== 分析サマリー ===")
print(output.summary)

## 8. グラフ全体の実行

In [None]:
# グラフ作成関数をインポート
from src.agents.keyword_finder.agent import create_keyword_finder_graph

# グラフを作成
graph = create_keyword_finder_graph()
print("グラフを作成しました")

In [None]:
# グラフを可視化（mermaidが利用可能な場合）
try:
    from IPython.display import Image, display
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception as e:
    print(f"グラフの可視化に失敗: {e}")
    print("mermaidまたはpygraphvizが必要です")

In [None]:
# グラフ全体を実行
from src.agents.keyword_finder.agent import run_keyword_finder

# 新しい入力で実行
test_input = KeywordSearchInput(
    category="資産形成",
    seed_keywords=["投資信託", "NISA"],
    depth=1,
)

result = run_keyword_finder(test_input)
print("=== 実行完了 ===")

In [None]:
# 結果を表示
print(f"カテゴリ: {result.category}")
print(f"結果数: {len(result.results)}")
print()
for r in result.results:
    print(f"- {r.keyword} (競合: {r.competition}, 関連度: {r.relevance_score})")
print()
print(f"サマリー: {result.summary}")

## 9. 個別ツールのテスト

In [None]:
# search_web ツールのテスト
search_results = search_web.invoke({"query": "Python 入門"})
print("=== search_web 結果 ===")
for r in search_results[:3]:
    print(f"- {r.title}")
    print(f"  URL: {r.url}")
    print(f"  {r.snippet[:100]}...")
    print()

In [None]:
# get_related_keywords ツールのテスト
related_keywords = get_related_keywords.invoke({"keyword": "Python"})
print("=== get_related_keywords 結果 ===")
for kw in related_keywords:
    print(f"- {kw}")

## 10. 設定の確認

In [None]:
# 現在の設定を確認
print("=== 設定 ===")
print(f"最大リトライ回数: {settings.max_retry_count}")
print(f"デバッグモード: {settings.debug}")