# 初めてのインテリジェントエージェントチームを構築する：ADK を使用した段階的な天気予報ボット


## 事前準備

### CLI 環境で Google CLoud にログイン

以下のコマンドを実行してログインしてください。 Cloud Shell の場合はこの操作は不要です。
```
gcloud auth application-default login
```


### プロジェクトの設定

以下のプロジェクト ID (GOOGLE_CLOUD_PROJECT) と STAGING_BUCKET をワークショップ用の ID と Cloud Storage Bucket に修正してください。

In [None]:
# @title Vertex AI の設定
GOOGLE_CLOUD_PROJECT = "agentspace-demo-466305" # @param {type:"string"}
STAGING_BUCKET = "gs://tayzar-bucket" #@param {type:"string"}

In [None]:
# @title Vertex AI SDK の初期化
import os
import vertexai


LOCATION = "us-central1"

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"
os.environ["GOOGLE_CLOUD_LOCATION"] = LOCATION
os.environ["GOOGLE_CLOUD_PROJECT"] = GOOGLE_CLOUD_PROJECT

vertexai.init(
    project=GOOGLE_CLOUD_PROJECT,
    location=LOCATION,
    staging_bucket=STAGING_BUCKET,
)

# --- 使いやすくするためにモデル定数を定義します ---
DEFAULT_MODEL = "gemini-2.5-flash"

import logging

class _NoFunctionCallWarning(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        message = record.getMessage()
        if "there are non-text parts in the response:" in message:
            return False
        else:
            return True

logging.getLogger("google_genai.types").addFilter(_NoFunctionCallWarning())

---

## 第１章：初めてのエージェント - 基本的な天気情報検索


###  1\. Toolの定義 (get_weather)

**docstring は重要です！**
エージェントの LLM は、関数の docstring から以下の情報を取得します。
*  ツールが何をするか。
*  いつ使用するか。
*  どの引数が必要か（city: str）。
*  どの情報を返すか。

In [None]:
# 天気情報取得関数
def get_weather(city: str) -> dict:
    """指定された都市の現在の天気予報を取得します。

    Args:
        city (str): 都市名（例：「ニューヨーク」、「ロンドン」、「東京」）。

    Returns:
        dict: 天気情報を含む辞書。
              'status' キー（'success' または 'error'）を含みます。
              'success' の場合、天気の詳細情報を持つ 'report' キーを含みます。
              'error' の場合、'error_message' キーを含みます。
    """

    mock_weather_db = {
        "ニューヨーク": {"status": "success", "report": "ニューヨークの天気は晴れ、気温は25℃です。"},
        "ロンドン": {"status": "success", "report": "ロンドンは曇り、気温は15℃です。"},
        "東京": {"status": "success", "report": "東京は小雨、気温は18℃です。"},
    }

    if city in mock_weather_db:
        return mock_weather_db[city]
    else:
        return {"status": "error", "error_message": f"申し訳ありませんが、「{city}」の天気情報はありません。"}

print(get_weather("ニューヨーク"))
print(get_weather("パリ"))

### 2\. エージェントの定義 (`weather_agent`)

In [None]:
from google.adk.agents import LlmAgent

# LLM Agentオブジェクト初期化
weather_agent = LlmAgent(
    name="weather_agent_v1",
    model=DEFAULT_MODEL,
    description="特定の都市の天気情報を提供します。",
    instruction="あなたは親切な天気アシスタントです。気象キャスターのように回答してください",
    tools=[get_weather],
)

### 3\.Local Agent Wrapper

In [None]:
import copy, datetime, json, os, pprint, time, uuid
from google.genai.types import Part, Content
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner

# ADKの主要オブジェクト三兄弟のクラス
class LocalAgent:
    def __init__(self, agent, app_name='default_app', user_id='default_user', debug=False, initial_state = None):
        self._agent = agent
        self._app_name = app_name
        self._user_id = user_id
        self._runner = Runner(
            app_name=self._app_name,
            agent=self._agent,
            session_service=InMemorySessionService()
        )
        self._session = None
        self._debug = debug
        self._initial_state = initial_state

    def print_session_state(self):
        return self._session
    
    async def stream(self, query):
        if not self._session:
            self._session = await self._runner.session_service.create_session(
                app_name=self._app_name,
                user_id=self._user_id,
                session_id=uuid.uuid4().hex,
                state = self._initial_state
            )
        content = Content(role='user', parts=[Part.from_text(text=query)])
        async_events = self._runner.run_async(
            user_id=self._user_id,
            session_id=self._session.id,
            new_message=content,
        )
        result = []

        async for event in async_events:
            if self._debug:
                print(f"呼び出しAgent: {event.author}")
            if (event.content and event.content.parts):

                response = '\n'.join([p.text for p in event.content.parts if p.text])
                if self._debug:
                    function_call = '\n'.join([p.function_call.name for p in event.content.parts if p.function_call])
                    if function_call:
                        print(f"呼び出しTool: {function_call}")
                if response:
                    print(response)
                    result.append(response)
        return result

### 4\. テストしてみましょう！


In [None]:
client = LocalAgent(weather_agent)
print("------------------message (1)------------------")
_ = await client.stream("東京の天気はどうですか？")

print("\n------------------message (2)------------------")
_ = await client.stream("パリはどうですか？")

print("\n------------------message (3)------------------")
_ = await client.stream("ニューヨークの天気を教えてください")


おめでとうございます！最初の ADK エージェントの構築と対話に成功しました。エージェントはユーザーのリクエストを理解し、ツールを使用して情報を見つけ、ツールの結果に基づいて適切に応答します。
次のステップでは、このエージェントを強化する基盤となる言語モデルを簡単に切り替える方法を探ります。

---


## 第２章：エージェントチームの構築 - 挨拶と別れの委任


### 1\. サブエージェント用のツールを定義する


In [None]:
def say_hello(name: str) -> str:
    """名前を指定して簡単な挨拶を提供します。

    Args:
        name (str, optional): 挨拶する相手の名前

    Returns:
        str: 友好的な挨拶メッセージ。
    """
    print(f"--- ツール: say_hello が名前: {name} で呼び出されました ---")
    if name:
      return f"こんにちは、{name}さん！"
    else:
      return "こんにちは！"

def say_goodbye() -> str:
    """会話を締めくくるための簡単な別れのメッセージを提供します。"""
    print(f"--- ツール: say_goodbye が呼び出されました ---")
    return "さようなら！良い一日を。"

print("Greeting and Farewell tools defined.")


print(say_hello("田中"))
print(say_goodbye())

### 2\.  サブエージェント（挨拶と別れ）を定義する


In [None]:
greeting_agent = LlmAgent(
    # シンプルなタスクにはより高速で安価なモデルを使用
    # Lite modelですと、Toolを使ってくれないこともあるので、instructionで必ず使うように指示しています。
    model = DEFAULT_MODEL,
    name="greeting_agent",
    instruction="""
                あなたは挨拶エージェントです。あなたの唯一のタスクはユーザーに友好的な挨拶を提供することです。
                必ず'say_hello'ツールを使用して挨拶を生成してください。
                """,
    description="簡単な挨拶を処理します。", # タスク振り分ける時に重要
    tools=[say_hello],
)

farewell_agent = LlmAgent(
    model = DEFAULT_MODEL,
    name="farewell_agent",
    instruction="""
                あなたは別れのエージェントです。あなたの唯一のタスクは丁寧な別れのメッセージを提供することです。
                ユーザーが会話を終了する意思を示した場合（例：「バイバイ」、「さようなら」、「ありがとう、バイバイ」、「また会いましょう」などの言葉を使用）、
                必ず'say_goodbye'ツールを使用してください。
                """,
    description="簡単な別れの挨拶を処理します。",
    tools=[say_goodbye],
)


### 3\.  サブエージェントを持つルートエージェントを定義する（Weather Agent v2）



In [None]:
if greeting_agent and farewell_agent and 'get_weather' in globals():
    weather_agent_team = LlmAgent(
        name="weather_agent_v2", # 新しいバージョン名を付ける
        model=DEFAULT_MODEL,
        description="メインコーディネーターエージェント。天気リクエストを処理し、挨拶/別れを専門家に委任します。",
        instruction=""""
                    あなたはチームを調整するメイン天気エージェントです。あなたの主な責任は天気情報を提供することです。
                    """,
        tools=[get_weather],
        sub_agents=[greeting_agent, farewell_agent]
    )
    print(f"✅ Root Agent '{weather_agent_team.name}' created with sub-agents: {[sa.name for sa in weather_agent_team.sub_agents]}")

else:
    print("❌ Cannot create root agent because one or more sub-agents failed to initialize or 'get_weather' tool is missing.")
    if not greeting_agent: print(" - Greeting Agent is missing.")
    if not farewell_agent: print(" - Farewell Agent is missing.")
    if 'get_weather' not in globals(): print(" - get_weather function is missing.")



### 4\. テストしてみましょう！

In [None]:
client = LocalAgent(weather_agent_team, debug=True)

print("------------------message (1)------------------")
_ = await client.stream("こんにちは!")

print("\n------------------message (2)------------------")
_ = await client.stream("ニューヨークの天気はどうですか？")

print("\n------------------message (3)------------------")
_ = await client.stream("ありがとう、さようなら！")

---

## 第３章：Session Stateでメモリとパーソナライゼーションを追加する


### 1\.  状態を意識した天気予報ツールを作成する (get_weather_stateful)

In [None]:
from google.adk.tools.tool_context import ToolContext

def get_weather_stateful(city: str, tool_context: ToolContext) -> dict:
    """指定された都市の現在の天気予報を取得します、セッションの状態に基づいて温度の単位を変換します。

    Args:
        city (str): 日本語での都市名（例：「ニューヨーク」、「ロンドン」、「東京」）。
        tool_context (ToolContext): ツール呼び出しのコンテキストを提供し、呼び出しコンテキスト、関数呼び出しID、イベントアクション、認証レスポンスへのアクセスを含みます。

    Returns:
        dict: 天気情報を含む辞書。
              'status' キー（'success' または 'error'）を含みます。
              'success' の場合、天気の詳細情報を持つ 'report' キーを含みます。
              'error' の場合、'error_message' キーを含みます。
    """

    print(f"--- ツール: get_weather_stateful が {city} のために呼び出されました ---")

    # --- 状態から設定を読み込み ---
    preferred_unit = tool_context.state.get("user_preference_temperature_unit", "Celsius")
    print(f"--- ツール: 状態 'user_preference_temperature_unit' を読み込み中: {preferred_unit} ---")


    # モックの天気データ（内部では常に摂氏で保存）
    mock_weather_db = {
        "ニューヨーク": {"temp_c": 25, "condition": "晴れ"},
        "ロンドン": {"temp_c": 15, "condition": "曇り"},
        "東京": {"temp_c": 18, "condition": "雨"},
    }

    if city in mock_weather_db:
        data = mock_weather_db[city]
        temp_c = data["temp_c"]
        condition = data["condition"]

        if preferred_unit == "Fahrenheit":
            temp_value = (temp_c * 9/5) + 32
            temp_unit = "°F"
        else:
            temp_value = temp_c
            temp_unit = "°C"

        report = f"{city.capitalize()}の天気は{condition}で、気温は{temp_value:.0f}{temp_unit}です。"
        result = {"status": "success", "report": report}
        print(f"--- ツール: {preferred_unit}でレポートを生成しました。結果: {result} ---")
        return result
    else:
        # 都市が見つからない場合の処理
        error_msg = f"申し訳ありませんが、'{city}'の天気情報はありません。"
        print(f"--- ツール: 都市 '{city}' が見つかりませんでした。 ---")
        return {"status": "error", "error_message": error_msg}

print("✅ 状態認識ツール 'get_weather_stateful' が定義されました。")

In [None]:
def set_temperature_preference(unit: str, tool_context: ToolContext) -> dict:
    """Sets the user's preferred temperature unit (Celsius or Fahrenheit).

    Args:
        unit (str): The preferred temperature unit ("Celsius" or "Fahrenheit").
        tool_context (ToolContext): The ADK tool context providing access to session state.

    Returns:
        dict: A dictionary confirming the action or reporting an error.
    """
    print(f"--- Tool: set_temperature_preference called with unit: {unit} ---")
    normalized_unit = unit.strip().capitalize()

    if normalized_unit in ["Celsius", "Fahrenheit"]:
        tool_context.state["user_preference_temperature_unit"] = normalized_unit
        print(f"--- Tool: Updated state 'user_preference_temperature_unit': {normalized_unit} ---")
        return {"status": "success", "message": f"Temperature preference set to {normalized_unit}."}
    else:
        error_msg = f"Invalid temperature unit '{unit}'. Please specify 'Celsius' or 'Fahrenheit'."
        print(f"--- Tool: Invalid unit provided: {unit} ---")
        return {"status": "error", "error_message": error_msg}

### 2\.  ルートエージェントを更新する

In [None]:
# ルートエージェントを作成する前に前提条件を確認
root_agent_stateful = LlmAgent(
    name="weather_agent_v3_stateful", # 新しいバージョン名
    model=DEFAULT_MODEL,
    description="メインエージェント: 天気情報を提供し（状態認識ユニット）、挨拶/別れを委任し、レポートを状態に保存します。",
    instruction="あなたはメインの天気エージェントです。あなたの仕事は 'get_weather_stateful' を使って天気情報を提供することです。"
                "このツールは、状態に保存されているユーザーの好みに基づいて温度の形式を設定します。"
                "簡単な挨拶は 'greeting_agent' に、別れの挨拶は 'farewell_agent' に委任してください。"
                "天気に関するリクエスト、挨拶、別れの挨拶のみを処理してください。",
    tools=[get_weather_stateful,set_temperature_preference],
    output_key="last_weather_report" # <<< エージェントの最終的な天気応答を自動保存
)

### 3\. テストしてみましょう！

In [None]:
initial_state = {
    "user_preference_temperature_unit": "Celsius"
}


client = LocalAgent(root_agent_stateful, debug=True, initial_state = initial_state)

_ = await client.stream("ロンドンの天気はどうですか？")

_ = await client.stream("これからは気温を華氏で教えてください")

_ = await client.stream("ニューヨークの天気を教えてください。")

---

## 第４章：Agent Engine にDeploy

In [None]:
from vertexai import agent_engines
from vertexai.preview.reasoning_engines import AdkApp

In [None]:
app = AdkApp(
    agent=weather_agent,
    enable_tracing=True,
)

In [None]:
remote_agent = agent_engines.create(
    app,
    requirements=[
        'google-adk==1.4.1',
        'google-cloud-aiplatform==1.97.0',
        'google-genai==1.20.0'
    ],
    display_name="Weather Agent 1.0",
    description="Agent Engine workshop sample",
)

In [None]:
class RemoteApp:
    def __init__(self, remote_agent, user_id="default_user"):
        self._remote_agent = remote_agent
        self._user_id = user_id
        self._session = remote_agent.create_session(user_id=self._user_id)
    
    def _stream(self, query):
        events = self._remote_agent.stream_query(
            user_id=self._user_id,
            session_id=self._session['id'],
            message=query,
        )
        result = []
        for event in events:
            if ('content' in event and 'parts' in event['content']):
                response = '\n'.join(
                    [p['text'] for p in event['content']['parts'] if 'text' in p]
                )
                if response:
                    print(response)
                    result.append(response)
        return result

    def stream(self, query):
        # Retry 4 times in case of resource exhaustion 
        for c in range(4):
            if c > 0:
                time.sleep(2**(c-1))
            result = self._stream(query)
            if result:
                return result
            if DEBUG:
                print('----\nRetrying...\n----')
        return None # Permanent error

In [None]:
remote_client = RemoteApp(remote_agent)
_ = remote_client.stream('東京の天気はどうですか？')

### 補足

#### ADK web の UI を使用する場合

GUI のチャット画面（ADK web）を試したい場合は、Cloud Workstationから次の手順で試す事ができます。

※ あくまでお試し用の手順なので、ADK web のすべての機能は使用できません。簡易的な動作確認として利用してください。

1. 作業用ディレクトリ `workdir` を作成して、カレントディレクトリに変更します。

```
mkdir workdir
cd workdir
```

2. `google-adk` のパッケージをインストールします。

```
python -m venv .venv
source .venv/bin/activate
pip install google-adk==1.2.1
```

3. リモートエージェントに接続するコードを用意します。

```
mkdir agent
cat <<EOF >agent/agent.py
import os
from uuid import uuid4
from dotenv import load_dotenv
from google.adk.agents.callback_context import CallbackContext
from google.adk.models import LlmResponse, LlmRequest
from google.adk.agents.llm_agent import LlmAgent
from google.genai.types import Content, Part

import vertexai
from vertexai import agent_engines

load_dotenv('.env')
PROJECT_ID = os.environ['PROJECT_ID']
AGENT_ID = os.environ['AGENT_ID']
LOCATION = 'us-central1'

vertexai.init(project=PROJECT_ID, location=LOCATION)
remote_agent = agent_engines.get(AGENT_ID)

async def call_remote_agent(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> LlmResponse:
    session = remote_agent.create_session(user_id='default_user')
    events = remote_agent.stream_query(
                user_id='default_user',
                session_id=session['id'],
                message=str(llm_request.contents)
             )
    content = list(events)[-1]['content']
    remote_agent.delete_session(
        user_id='default_user',
        session_id=session['id'],
    )
    return LlmResponse(content=content)

root_agent = LlmAgent(
    name='remote_agent_proxy',
    model='gemini-2.5-flash', # not used
    description='Interactive agent',
    before_model_callback=call_remote_agent,
)
EOF
```

4. 設定ファイル `agent/.env` を次の内容で作成します。（`your project ID` と `your agent ID` は実際のプロジェクト ID と先ほど確認したエージェントの ID を記入します。）

```
PROJECT_ID="your project ID"
AGENT_ID="your agent ID"
```

5. チャットアプリ（ADK web）を起動します。

```
adk web
```

6. Cloud Shell の「Web でプレビュー」ボタンからポート 8000 に接続して使用します。

#### トレースの確認
Cloud Console の https://console.cloud.google.com/traces/list Trace Explorer から Agent Engine 上で実行されたエージェントのトレースが確認できます。

In [None]:
for remote_agent in agent_engines.list():
  print(remote_agent.resource_name)

### 後片付け
デプロイしたエージェントを削除します。

In [None]:
# 削除
for remote_agent in agent_engines.list():
  remote_agent.delete(force=True)

## [オプション]： 外部データと連携 (OpenAPI Tool, MCP Toolを使用)

In [None]:
# Google Weather APIキー(https://developers.google.com/maps/documentation/weather/get-api-key?hl=ja)
GOOGLE_WEATHER_API_KEY="AIzaSyAZa9wMR7vsRc1VQVnk8RcrDvXnuhWVJ7k" # @param {type:"string"}
# Google Maps APIキー(https://developers.google.com/maps/documentation/geocoding/get-api-key?hl=ja)
GOOGLE_MAPS_API_KEY="AIzaSyAZa9wMR7vsRc1VQVnk8RcrDvXnuhWVJ7k" # @param {type:"string"}
os.environ["GOOGLE_WEATHER_API_KEY"] = GOOGLE_WEATHER_API_KEY
os.environ["GOOGLE_MAPS_API_KEY"] = GOOGLE_MAPS_API_KEY


### REST API を直接呼び出す

In [None]:
# requests ライブラリをインポート
import requests

# 天気情報を取得するための Python 関数ツールを定義
def get_current_weather_conditions_rest(latitude: float, longitude: float) -> dict:
    """
    指定された緯度と経度の現在の天気状況を取得します。

    Args:
        latitude (float): 場所の緯度
        longitude (float): 場所の経度

    Returns:
        dict: 天気データを含む辞書。エラーの場合はエラーメッセージを含む辞書。
    """
    # API キーを環境変数から取得
    google_api_key = os.environ.get("GOOGLE_WEATHER_API_KEY")
    if not google_api_key:
        return {"error": "GOOGLE_WEATHER_API_KEY 環境変数が設定されていません。"}

    # API エンドポイントと必要なパラメータを設定
    base_url = "https://weather.googleapis.com/v1/currentConditions:lookup"
    params = {
        "location.latitude": latitude,
        "location.longitude": longitude,
        "key": google_api_key
    }

    try:
        # API リクエストを送信
        response = requests.get(base_url, params=params)

        # レスポンスのステータスコードを確認
        if response.status_code == 200:
            # レスポンスを JSON として解析して返す
            return response.json()
        else:
            return {
                "error": f"API リクエストが失敗しました。ステータスコード: {response.status_code}",
                "details": response.text
            }
    except Exception as e:
        return {"error": f"API リクエスト中にエラーが発生しました: {str(e)}"}

# ツールをテストするために、ロンドンの緯度と経度を使用
london_latitude = 51.5074
london_longitude = -0.1278

# ツールを使用して天気データを取得
weather_result = get_current_weather_conditions_rest(london_latitude, london_longitude)

# 結果を表示
if "error" in weather_result:
    print(f"❌ {weather_result['error']}")
    if "details" in weather_result:
        print(f"詳細: {weather_result['details']}")
else:
    print("✅ 天気データの取得に成功しました:")
    print(f"日時: {weather_result.get('dateTime', 'N/A')}")

    # 温度情報を取得
    temperature = weather_result.get('temperature', {})
    temp_value = temperature.get('value', 'N/A')
    temp_unit = temperature.get('unit', 'N/A')
    print(f"温度: {temp_value} {temp_unit}")

    # 風の情報を取得
    wind = weather_result.get('wind', {})
    wind_speed = wind.get('speed', 'N/A')
    wind_direction = wind.get('direction', 'N/A')
    print(f"風速: {wind_speed}, 風向: {wind_direction}度")

    # 天気の簡単な説明
    print(f"天気: {weather_result.get('shortDescription', 'N/A')}")

print("\n注意: 上記のコードは、Python 関数をツールとして使用する方法を示しています。")
print("次のセクションでは、同じ API を OpenAPITool を使用して呼び出す方法を示します。")


### OpenAPI Spec からツールを生成する


In [None]:
# この仕様は、デモンストレーション目的で手動作成されたもので、実際のAPIエンドポイントを対象としています。
openapi_spec_google_weather_api_str = """
{
  "openapi": "3.0.0",
  "info": {
    "title": "Simplified Google Weather API",
    "version": "v1",
    "description": "A simplified, manually-created OpenAPI spec for interacting with select Google Maps Weather API functionalities."
  },
  "servers": [
    {
      "url": "https://weather.googleapis.com/v1",
      "description": "Google Maps Weather API Server"
    }
  ],
  "paths": {
    "/currentConditions:lookup": {
      "get": {
        "operationId": "getCurrentWeatherConditions",
        "summary": "Get the current weather conditions for a location using Google Weather API.",
        "description": "Fetches real-time weather data including temperature, wind, and precipitation for the specified latitude and longitude. Requires an API key.",
        "parameters": [
          {
            "name": "location.latitude",
            "in": "query",
            "required": true,
            "description": "The latitude of the location.",
            "schema": {
              "type": "number",
              "format": "double"
            }
          },
          {
            "name": "location.longitude",
            "in": "query",
            "required": true,
            "description": "The longitude of the location.",
            "schema": {
              "type": "number",
              "format": "double"
            }
          },
          {
            "name": "key",
            "in": "query",
            "required": true,
            "description": "Your Google Cloud API key.",
            "schema": {
              "type": "string"
            }
          }
        ],
        "responses": {
          "200": {
            "description": "Successful response with current weather conditions.",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "dateTime": { "type": "string", "format": "date-time"},
                    "temperature": { "type": "object", "properties": {"value": {"type": "number"}, "unit": {"type": "string"}}},
                    "wind": { "type": "object", "properties": {"speed": {"type": "number"}, "direction": {"type": "number"}}},
                    "shortDescription": { "type": "string" }
                  }
                }
              }
            }
          },
          "400": { "description": "Invalid request parameters." },
          "403": { "description": "Forbidden - API key missing, invalid, or Weather API not enabled." }
        }
      }
    }
  },
  "components": {
    "securitySchemes": {
      "ApiKeyAuth": {
        "in": "query",
        "name": "key",
        "required": true,
        "description": "Your Google Cloud API key.",
        "schema": { "type": "string" }
      }
    }
  }
}
"""

print("✅ Simplified Google Weather API OpenAPI spec string defined, targeting actual API endpoint.")

In [None]:
from google.adk.tools.openapi_tool.openapi_spec_parser.openapi_toolset import OpenAPIToolset
from google.adk.tools.openapi_tool.auth.auth_helpers import token_to_scheme_credential
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
import os

# --- OpenAPIツールセットの作成 ---
google_weather_toolset = None

try:
    # Retrieve the API key from environment variable for security
    google_api_key = os.environ.get("GOOGLE_WEATHER_API_KEY")
    if not google_api_key:
        raise ValueError("GOOGLE_WEATHER_API_KEY environment variable not set.")

    # Create auth scheme and credential using the helper function
    # This call is synchronous, and its return values are used directly.
    auth_scheme, auth_credential = token_to_scheme_credential(
        "apikey",  # Authentication type
        "query",   # API key location (query parameter)
        "key",     # Parameter name in the API
        google_api_key  # The actual API key value
    )

    google_weather_toolset = OpenAPIToolset(
        spec_str=openapi_spec_google_weather_api_str, # Use the new spec string
        spec_str_type="json",
        auth_scheme=auth_scheme,        # Use the scheme directly
        auth_credential=auth_credential # Use the credential directly
    )
    generated_google_weather_api_tools = await google_weather_toolset.get_tools()
    print(f"✅ OpenAPI 仕様から {len(generated_google_weather_api_tools)} 個のツールが生成されました:")
    for tool in generated_google_weather_api_tools:
        print(f"  - ツール名: '{tool.name}', 説明: {tool.description[:80]}...")

except ValueError as ve:
    print(f"❌ OpenAPIToolset の作成中の検証エラー: {ve}")
except Exception as e:
    print(f"❌ OpenAPIToolset の作成中の予期しないエラー: {e}")



### Google Maps MCP ツールセットの作成

In [None]:
# Google Maps MCP ツールセットを作成して、都市名から緯度と経度を取得します
from mcp import StdioServerParameters
google_maps_mcp_toolset = None
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset

try:
    google_maps_api_key = os.environ.get("GOOGLE_MAPS_API_KEY")
    if not google_maps_api_key:
        raise ValueError("GOOGLE_MAPS_API_KEY environment variable not set.")

    google_maps_mcp_toolset = MCPToolset(
        connection_params=StdioServerParameters(
            command='npx',
            args=[
                "-y",
                "@modelcontextprotocol/server-google-maps",
            ],
            env={
                "GOOGLE_MAPS_API_KEY": google_maps_api_key
            }
        )
    )
    print("✅ Google Maps MCP ツールセットが作成されました。")
except Exception as e:
    print(f"❌ Google Maps MCP ツールセットの作成中にエラーが発生しました: {e}")


### 生成されたツールをエージェントに統合する

In [None]:
from google.adk.agents import Agent

external_data_weather_agent = Agent(
    name="google_weather_api_assistant",
    model=DEFAULT_MODEL,
    tools=[google_weather_toolset, google_maps_mcp_toolset],
    # tools=[get_current_weather_conditions_rest, google_maps_mcp_toolset],
    instruction=f"""あなたは天気予報アシスタントです。
    ユーザーが都市の天気について尋ねた場合（例：「ロンドンの天気は？」）、Maps tools と　Weather toolsを利用して対応してください。
    ツールはAPIキーを自動的に処理します。
    気象キャスターのように回答してください""",
    description="Maps tools と Weather tools を使用して、都市名から天気情報を提供します。"
)


### エージェントとの対話 (シミュレーション)

In [None]:
client = LocalAgent(external_data_weather_agent, debug = True)

_ = await client.stream("ロンドンの天気はどうですか？")
_ = await client.stream("パリはどうですか？")
_ = await client.stream("ニューヨークの天気を教えてください")


## [オプション]：安全性の追加 - 入力ガードレール (`before_model_callback`)


### 1\.  ガードレールコールバック関数を定義する

In [None]:
# 必要なインポートが利用可能であることを確認
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.genai import types # 応答コンテンツ作成のため
from typing import Optional

def block_keyword_guardrail(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """
    最新のユーザーメッセージに 'BLOCK' が含まれているか検査します。見つかった場合、LLM呼び出しをブロックし、
    事前定義されたLlmResponseを返します。それ以外の場合は、処理を続行するためにNoneを返します。
    """
    # モデル呼び出しがインターセプトされているエージェントの名前を取得
    agent_name = callback_context.agent_name
    print(f"--- コールバック: block_keyword_guardrail がエージェント: {agent_name} のために実行中 ---")

    # リクエスト履歴から最新のユーザーメッセージのテキストを抽出
    last_user_message_text = ""
    if llm_request.contents:
        # 'user' ロールを持つ最新のメッセージを検索
        for content in reversed(llm_request.contents):
            if content.role == 'user' and content.parts:
                # 簡単のため、テキストは最初のパートにあると仮定
                if content.parts[0].text:
                    last_user_message_text = content.parts[0].text
                    break # 最後のユーザーメッセージのテキストを発見

    # 最初の100文字をログに記録
    print(f"--- コールバック: 最後のユーザーメッセージを検査中: '{last_user_message_text[:100]}...' ---")

    # --- ガードレールロジック ---
    keyword_to_block = "パリ"
    # 大文字と小文字を区別しないチェック
    if keyword_to_block in last_user_message_text.upper():
        print(f"--- コールバック: '{keyword_to_block}' が見つかりました。LLM呼び出しをブロックします！ ---")
        # オプションで、ブロックイベントを記録するために状態にフラグを設定
        callback_context.state["guardrail_block_keyword_triggered"] = True
        print(f"--- コールバック: 状態 'guardrail_block_keyword_triggered' を True に設定しました ---")

        # フローを停止し、代わりにこれを送り返すためにLlmResponseを構築して返す
        return LlmResponse(
            content=types.Content(
                role="model", # エージェントの視点からの応答を模倣
                parts=[types.Part(text=f"ブロックされたキーワード '{keyword_to_block}' が含まれているため、このリクエストを処理できません。")],
            )
            # 注: 必要であれば、ここで error_message フィールドを設定することもできます
        )
    else:
        # キーワードが見つからなかったため、LLMへのリクエストを許可
        print(f"--- コールバック: キーワードが見つかりませんでした。{agent_name} のLLM呼び出しを許可します。 ---")
        # Noneを返すと、ADKは通常通り続行することを示します
        return None

print("✅ block_keyword_guardrail 関数が定義されました。")

### 2\. コールバックを使用するようにルートエージェントを更新する

In [None]:
# --- サブエージェントの再定義（このコンテキストに存在することを確認）---
greeting_agent = None
try:
    # 定義済みのモデル定数を使用
    greeting_agent = Agent(
        model=DEFAULT_MODEL,
        name="greeting_agent", # 一貫性を保つため元の名前を維持
        instruction="あなたは挨拶エージェントです。あなたの唯一のタスクは 'say_hello' ツールを使って友好的な挨拶を提供することです。それ以外のことは何もしないでください。",
        description="'say_hello' ツールを使用して簡単な挨拶やハローを処理します。",
        tools=[say_hello],
    )
    print(f"✅ サブエージェント '{greeting_agent.name}' が再定義されました。")
except Exception as e:
    print(f"❌ 挨拶エージェントを再定義できませんでした。モデル/APIキー ({greeting_agent.model}) を確認してください。エラー: {e}")

farewell_agent = None
try:
    # 定義済みのモデル定数を使用
    farewell_agent = Agent(
        model=DEFAULT_MODEL,
        name="farewell_agent", # 元の名前を維持
        instruction="あなたは別れのエージェントです。あなたの唯一のタスクは 'say_goodbye' ツールを使って丁寧な別れのメッセージを提供することです。他のアクションは実行しないでください。",
        description="'say_goodbye' ツールを使用して簡単な別れやさようならを処理します。",
        tools=[say_goodbye],
    )
    print(f"✅ サブエージェント '{farewell_agent.name}' が再定義されました。")
except Exception as e:
    print(f"❌ 別れのエージェントを再定義できませんでした。モデル/APIキー ({farewell_agent.model}) を確認してください。エラー: {e}")


root_agent_model_guardrail = Agent(
    name="weather_agent_model_guardrail", # 明確化のための新しいバージョン名
    model=DEFAULT_MODEL,
    description="メインエージェント: 天気を処理し、挨拶/別れを委任し、入力キーワードのガードレールを含みます。",
    instruction="あなたはメインの天気エージェントです。'get_weather_stateful' を使って天気情報を提供してください。"
                "簡単な挨拶は 'greeting_agent' に、別れの挨拶は 'farewell_agent' に委任してください。"
                "天気に関するリクエスト、挨拶、別れの挨拶のみを処理してください。",
    tools=[get_weather],
    sub_agents=[greeting_agent, farewell_agent], # 再定義されたサブエージェントを参照
    output_key="last_weather_report", # ステップ4のoutput_keyを維持
    before_model_callback=block_keyword_guardrail # <<< ガードレールコールバックを割り当て
)
print(f"✅ ルートエージェント '{root_agent_model_guardrail.name}' が before_model_callback を使用して作成されました。")

### 3\.  対話してガードレールをテストする

In [None]:
client = LocalAgent(root_agent_model_guardrail, debug = True)

_ = await client.stream("ロンドンの天気はどうですか？")
_ = await client.stream("パリはどうですか？")
_ = await client.stream("ニューヨークの天気を教えてください")

## [オプション]：安全性の追加 - ツール引数ガードレール（`before_tool_callback`）

### 1\.  ツールガードレールコールバック関数を定義する

In [None]:
# 必要なインポートが利用可能であることを確認
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext
from typing import Optional, Dict, Any # 型ヒントのため

def block_paris_tool_guardrail(
    tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
    """
    'get_weather_stateful' が 'Paris' に対して呼び出されたかを確認します。
    もしそうなら、ツールの実行をブロックし、特定のエラー辞書を返します。
    それ以外の場合は、Noneを返してツールの呼び出しを許可します。
    """
    tool_name = tool.name
    agent_name = tool_context.agent_name # ツールを呼び出そうとしているエージェント
    print(f"--- コールバック: block_paris_tool_guardrail がエージェント '{agent_name}' のツール '{tool_name}' のために実行中 ---")
    print(f"--- コールバック: 引数を検査中: {args} ---")

    # --- ガードレールロジック ---
    target_tool_name = "get_weather_stateful" # FunctionToolで使用される関数名と一致させる
    blocked_city = "paris"

    # 正しいツールであるか、そして都市の引数がブロックされた都市と一致するかを確認
    if tool_name == target_tool_name:
        city_argument = args.get("city", "") # 'city'引数を安全に取得
        if city_argument and city_argument.lower() == blocked_city:
            print(f"--- コールバック: ブロックされた都市 '{city_argument}' を検出しました。ツールの実行をブロックします！ ---")
            # オプションで状態を更新
            tool_context.state["guardrail_tool_block_triggered"] = True
            print(f"--- コールバック: 状態 'guardrail_tool_block_triggered' を True に設定しました ---")

            # エラー時にツールが期待する出力形式に一致する辞書を返す
            # この辞書がツールの結果となり、実際のツールの実行はスキップされます。
            return {
                "status": "error",
                "error_message": f"ポリシー制限: '{city_argument.capitalize()}' の天気チェックは、ツールガードレールによって現在無効化されています。"
            }
        else:
             print(f"--- コールバック: 都市 '{city_argument}' はツール '{tool_name}' で許可されています。 ---")
    else:
        print(f"--- コールバック: ツール '{tool_name}' は対象のツールではありません。許可します。 ---")


    # 上記のチェックで辞書が返されなかった場合、ツールの実行を許可
    print(f"--- コールバック: ツール '{tool_name}' の続行を許可します。 ---")
    # Noneを返すと、実際のツール関数が実行されます
    return None

print("✅ block_paris_tool_guardrail 関数が定義されました。")

### 2\.  両方のコールバックを使用するようにルートエージェントを更新する


In [None]:
# --- サブエージェントの再定義（このコンテキストに存在することを確認）---
greeting_agent = None
try:
    # 定義済みのモデル定数を使用
    greeting_agent = Agent(
        model=DEFAULT_MODEL,
        name="greeting_agent", # 一貫性を保つため元の名前を維持
        instruction="あなたは挨拶エージェントです。あなたの唯一のタスクは 'say_hello' ツールを使って友好的な挨拶を提供することです。それ以外のことは何もしないでください。",
        description="'say_hello' ツールを使用して簡単な挨拶やハローを処理します。",
        tools=[say_hello],
    )
    print(f"✅ サブエージェント '{greeting_agent.name}' が再定義されました。")
except Exception as e:
    print(f"❌ 挨拶エージェントを再定義できませんでした。モデル/APIキー ({greeting_agent.model}) を確認してください。エラー: {e}")

farewell_agent = None
try:
    # 定義済みのモデル定数を使用
    farewell_agent = Agent(
        model=DEFAULT_MODEL,
        name="farewell_agent", # 元の名前を維持
        instruction="あなたは別れのエージェントです。あなたの唯一のタスクは 'say_goodbye' ツールを使って丁寧な別れのメッセージを提供することです。他のアクションは実行しないでください。",
        description="'say_goodbye' ツールを使用して簡単な別れやさようならを処理します。",
        tools=[say_goodbye],
    )
    print(f"✅ サブエージェント '{farewell_agent.name}' が再定義されました。")
except Exception as e:
    print(f"❌ 別れのエージェントを再定義できませんでした。モデル/APIキー ({farewell_agent.model}) を確認してください。エラー: {e}")

# --- 両方のコールバックを持つルートエージェントを定義 ---
root_agent_tool_guardrail = None

if ('greeting_agent' in globals() and greeting_agent and
    'farewell_agent' in globals() and farewell_agent and
    'get_weather_stateful' in globals() and
    'block_keyword_guardrail' in globals() and
    'block_paris_tool_guardrail' in globals()):

    root_agent_model = DEFAULT_MODEL

    root_agent_tool_guardrail = Agent(
        name="weather_agent_tool_guardrail", # 新しいバージョン名
        model=root_agent_model,
        description="メインエージェント: 天気を処理し、委任を行い、入力とツールの両方のガードレールを含みます。",
        instruction="あなたはメインの天気エージェントです。'get_weather_stateful' を使って天気情報を提供してください。"
                    "挨拶は 'greeting_agent' に、別れは 'farewell_agent' に委任してください。"
                    "天気、挨拶、別れのみを処理してください。",
        tools=[get_weather],
        sub_agents=[greeting_agent, farewell_agent],
        output_key="last_weather_report",
        before_model_callback=block_keyword_guardrail, # モデルガードレールを維持
        before_tool_callback=block_paris_tool_guardrail # <<< ツールガードレールを追加
    )
    print(f"✅ ルートエージェント '{root_agent_tool_guardrail.name}' が両方のコールバックを使用して作成されました。")

else:
    print("❌ ツールガードレール付きのルートエージェントを作成できません。前提条件がありません。")

### 3\.  対話してツールガードレールをテストする

In [None]:
client = LocalAgent(root_agent_tool_guardrail)

_ = await client.stream("ロンドンの天気はどうですか？")
_ = await client.stream("パリはどうですか？")
_ = await client.stream("ニューヨークの天気を教えてください")