<a href="https://colab.research.google.com/github/cwbdayi638/Dayi/blob/main/MCP_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install google-generativeai gradio_client python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.1


In [2]:
import os
import google.generativeai as genai
from gradio_client import Client
from dotenv import load_dotenv
from google.colab import userdata


# --- 1. 初始化與設定 ---
load_dotenv()
genai.configure(api_key=userdata.get('GOOGLE_API_KEY'))
MCP_SERVER_URL = "https://cwadayi-mcp-1.hf.space/"

# --- 2. 「轉接器」函式 ---
def call_mcp_letter_counter(word: str, letter: str) -> int:
    """
    連接到遠端的 Gradio MCP 伺服器並執行 letter_counter 工具。
    """
    try:
        print(f"--- 正在呼叫遠端 MCP 伺服器 ---")
        print(f"    伺服器: {MCP_SERVER_URL}")
        print(f"    參數: word='{word}', letter='{letter}'")

        client = Client(src=MCP_SERVER_URL)

        #    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        #    <<<<< 唯一的修改在這裡 >>>>
        #    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        # 移除 api_name，讓 client 自動呼叫預設的端點
        result = client.predict(word, letter)

        print(f"--- MCP 伺服器回傳結果: {result} ---")
        return int(result)
    except Exception as e:
        # 將更詳細的錯誤訊息印出，方便除錯
        print(f"呼叫 MCP 伺服器失敗: {e}")
        return -1

# --- 3. 向 Gemini 定義可用的工具 ---
letter_counter_tool_declaration = {
    "name": "call_letter_counter_tool",
    "description": "計算一個指定的字母在一段文字中出現了幾次。",
    "parameters": {
        "type": "OBJECT",
        "properties": {
            "word": { "type": "STRING", "description": "要在其中搜尋的完整文字。" },
            "letter": { "type": "STRING", "description": "要計數的單一字元。" }
        },
        "required": ["word", "letter"]
    }
}

# --- 4. 建立 Gemini 模型 ---
model = genai.GenerativeModel(
    model_name='gemini-1.5-flash-latest',
    tools=[letter_counter_tool_declaration]
)

# --- 5. 主執行迴圈 ---
print("你好！我是 Gemini，已經連接了您的字母計數器工具。您可以開始提問了。（輸入 'exit' 結束）")
chat = model.start_chat(enable_automatic_function_calling=False)

while True:
    prompt = input("您: ")
    if prompt.lower() == 'exit':
        break

    response = chat.send_message(prompt)

    function_call = response.candidates[0].content.parts[0].function_call
    if function_call:
        print(f"--- Gemini 決定呼叫工具: {function_call.name} ---")

        if function_call.name == "call_letter_counter_tool":
            args = function_call.args
            tool_result = call_mcp_letter_counter(word=args['word'], letter=args['letter'])

            print("--- 將工具結果回傳給 Gemini，讓它產生最終回覆 ---")
            response = chat.send_message(
                genai.protos.Part(
                    function_response={
                        "name": "call_letter_counter_tool",
                        "response": {"result": tool_result},
                    }
                )
            )
            print(f"Gemini: {response.text}")
    else:
        print(f"Gemini: {response.text}")

你好！我是 Gemini，已經連接了您的字母計數器工具。您可以開始提問了。（輸入 'exit' 結束）
您: 請問下列字串中初先過幾次y :  jjkashkdhsjkafhkahfksjyyfasyasyyasdy
--- Gemini 決定呼叫工具: call_letter_counter_tool ---
--- 正在呼叫遠端 MCP 伺服器 ---
    伺服器: https://cwadayi-mcp-1.hf.space/
    參數: word='jjkashkdhsjkafhkahfksjyyfasyasyyasdy', letter='y'
Loaded as API: https://cwadayi-mcp-1.hf.space/ ✔
--- MCP 伺服器回傳結果: 6 ---
--- 將工具結果回傳給 Gemini，讓它產生最終回覆 ---
Gemini: 有6個y。

您: exit


In [7]:
import os
import json
import google.generativeai as genai
from gradio_client import Client
from dotenv import load_dotenv
from google.colab import userdata # 假設在 Colab 環境
from datetime import datetime

# --- 1. 初始化與設定 ---
# 載入環境變數 (如果您的 API key 儲存在 .env 檔案中)
# load_dotenv()
# genai.configure(api_key=os.getenv('GOOGLE_API_KEY'))

# 直接從 Colab secrets 讀取
genai.configure(api_key=userdata.get('GOOGLE_API_KEY'))

# 更新為新的 MCP-2 伺服器位址
# MCP_SERVER_URL = "cwadayi/MCP-2"
MCP_SERVER_URL = "https://cwadayi-mcp-2.hf.space"
# --- 2. 新的「轉接器」函式 (用於地震查詢) ---
def call_mcp_earthquake_search(
    start_date: str,
    end_date: str,
    min_magnitude: float = 4.5,
    max_magnitude: float = 8.0,
    lat_from: float = 21.0,
    lat_to: float = 26.0,
    lon_from: float = 119.0,
    lon_to: float = 123.0,
    depth_from: float = 0.0,
    depth_to: float = 100.0,
    start_time: str = "00:00:00",
    end_time: str = "23:59:59"
) -> str:
    """
    連接到遠端的 Gradio MCP 伺服器並執行地震資料查詢。
    """
    try:
        print(f"--- 正在呼叫遠端地震 MCP 伺服器 ---")
        print(f"    伺服器: {MCP_SERVER_URL}")
        print(f"    查詢條件: {start_date} 到 {end_date}, 規模 {min_magnitude} 以上")

        client = Client(src=MCP_SERVER_URL)

        # 根據 MCP-2 的 API 文件，呼叫 `/gradio_fetch_and_plot_data`
        # 並按照順序傳入 12 個參數
        result = client.predict(
            param_0=start_date,      # Start Date
            param_1=start_time,      # Start Time
            param_2=end_date,        # End Date
            param_3=end_time,        # End Time
            param_4=lat_from,        # Latitude From
            param_5=lat_to,          # To
            param_6=lon_from,        # Longitude From
            param_7=lon_to,          # To
            param_8=depth_from,      # Depth From
            param_9=depth_to,        # To
            param_10=min_magnitude,  # Magnitude From
            param_11=max_magnitude,  # To
            api_name="/gradio_fetch_and_plot_data"
        )

        # API 回傳一個包含兩個元素的 tuple: (dataframe_dict, plot_dict)
        # 我們需要的是第一個元素的資料部分
        dataframe_dict = result[0]
        headers = dataframe_dict.get('headers', [])
        data = dataframe_dict.get('data', [])

        if not data:
            print("--- MCP 伺服器回傳：未找到符合條件的地震 ---")
            return "查詢完成，但未找到任何符合條件的地震資料。"

        # 將回傳的資料轉換為更易讀的 JSON 格式
        formatted_results = [dict(zip(headers, row)) for row in data]
        json_result = json.dumps(formatted_results, indent=2, ensure_ascii=False)

        print(f"--- MCP 伺服器成功回傳 {len(data)} 筆資料 ---")
        return json_result

    except Exception as e:
        print(f"呼叫 MCP 伺服器失敗: {e}")
        return f"工具執行失敗，錯誤訊息: {e}"


# --- 3. 向 Gemini 定義新的地震查詢工具 ---
earthquake_search_tool_declaration = {
    "name": "call_earthquake_search_tool",
    "description": "根據指定的條件（時間、地點、規模等）從台灣中央氣象署的資料庫中搜尋地震事件。預設搜尋台灣周邊地區。",
    "parameters": {
        "type": "OBJECT",
        "properties": {
            "start_date": {
                "type": "STRING",
                "description": "搜尋的開始日期，格式為 'YYYY-MM-DD'。例如：'2024-01-01'。"
            },
            "end_date": {
                "type": "STRING",
                "description": f"搜尋的結束日期，格式為 'YYYY-MM-DD'。例如：'2024-07-31'。預設為今天: {datetime.now().strftime('%Y-%m-%d')}。"
            },
            "min_magnitude": {
                "type": "NUMBER",
                "description": "要搜尋的最小地震規模。預設為 4.5。"
            },
            "max_magnitude": {
                "type": "NUMBER",
                "description": "要搜尋的最大地震規模。預設為 8.0。"
            },
            "lat_from": {"type": "NUMBER", "description": "緯度範圍的起始值。預設為 21.0。"},
            "lat_to": {"type": "NUMBER", "description": "緯度範圍的結束值。預設為 26.0。"},
            "lon_from": {"type": "NUMBER", "description": "經度範圍的起始值。預設為 119.0。"},
            "lon_to": {"type": "NUMBER", "description": "經度範圍的結束值。預設為 123.0。"},
        },
        "required": ["start_date", "end_date", "min_magnitude"]
    }
}

# --- 4. 建立 Gemini 模型 ---
# 將 tools 參數更新為新的工具定義
model = genai.GenerativeModel(
    model_name='gemini-1.5-flash-latest',
    tools=[earthquake_search_tool_declaration]
)


# --- 5. 主執行迴圈 ---
print("你好！我是 Gemini，已經連接了您的地震查詢工具 🗺️。您可以開始提問了。（輸入 'exit' 結束）")
chat = model.start_chat(enable_automatic_function_calling=False) # 我們手動處理 Function Call

while True:
    prompt = input("您: ")
    if prompt.lower() == 'exit':
        break

    try:
        response = chat.send_message(prompt)
        part = response.candidates[0].content.parts[0]

        if part.function_call:
            function_call = part.function_call
            print(f"--- Gemini 決定呼叫工具: {function_call.name} ---")

            if function_call.name == "call_earthquake_search_tool":
                args = function_call.args
                # 確保 end_date 有預設值
                if 'end_date' not in args:
                    args['end_date'] = datetime.now().strftime('%Y-%m-%d')

                tool_result = call_mcp_earthquake_search(**args) # 使用 **args 將字典解包為關鍵字參數

                print("--- 將工具結果回傳給 Gemini，讓它產生最終回覆 ---")
                response = chat.send_message(
                    genai.protos.Part(
                        function_response={
                            "name": "call_earthquake_search_tool",
                            "response": {"result": tool_result},
                        }
                    )
                )
                print(f"Gemini: {response.text}")
        else:
            print(f"Gemini: {response.text}")

    except Exception as e:
        print(f"發生錯誤：{e}")

你好！我是 Gemini，已經連接了您的地震查詢工具 🗺️。您可以開始提問了。（輸入 'exit' 結束）
您: what is the largest earthquake in 2024 ? show lon,lat,mag,dep,time
--- Gemini 決定呼叫工具: call_earthquake_search_tool ---
--- 正在呼叫遠端地震 MCP 伺服器 ---
    伺服器: https://cwadayi-mcp-2.hf.space
    查詢條件: 2024-01-01 到 2024-12-31, 規模 0.0 以上
Loaded as API: https://cwadayi-mcp-2.hf.space/ ✔
--- MCP 伺服器成功回傳 1000 筆資料 ---
--- 將工具結果回傳給 Gemini，讓它產生最終回覆 ---


ERROR:tornado.access:503 POST /v1beta/models/gemini-1.5-flash-latest:generateContent?%24alt=json%3Benum-encoding%3Dint (127.0.0.1) 46951.54ms
ERROR:tornado.access:503 POST /v1beta/models/gemini-1.5-flash-latest:generateContent?%24alt=json%3Benum-encoding%3Dint (127.0.0.1) 5569.23ms
ERROR:tornado.access:503 POST /v1beta/models/gemini-1.5-flash-latest:generateContent?%24alt=json%3Benum-encoding%3Dint (127.0.0.1) 5014.81ms
ERROR:tornado.access:503 POST /v1beta/models/gemini-1.5-flash-latest:generateContent?%24alt=json%3Benum-encoding%3Dint (127.0.0.1) 1503.26ms


發生錯誤：429 POST https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?%24alt=json%3Benum-encoding%3Dint: You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits.




您: exit


In [6]:
import os
import json
import google.generativeai as genai
from gradio_client import Client
from dotenv import load_dotenv
from datetime import datetime

# --- Colab 使用者請注意 ---
# 如果您在 Google Colab 中執行，請使用以下程式碼來載入您的 API Key。
# 在左側面板點擊鑰匙圖示 (Secrets)，新增一個名為 'GOOGLE_API_KEY' 的 secret，
# 並將您的 API Key 貼入。
try:
    from google.colab import userdata
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
# --- 本地端使用者請注意 ---
# 如果您在本地端執行，請確保您的 .env 檔案中有 'GOOGLE_API_KEY="您的金鑰"'
except (ImportError, ModuleNotFoundError):
    load_dotenv()
    GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')

# --- 1. 初始化與設定 ---
genai.configure(api_key=GOOGLE_API_KEY)

# 地震資料查詢 MCP 伺服器位址
MCP_SERVER_URL = "https://cwadayi-mcp-2.hf.space"


# --- 2. 「轉接器」函式 (用於地震查詢) ---
def call_mcp_earthquake_search(
    start_date: str,
    end_date: str,
    min_magnitude: float = 4.0,
    max_magnitude: float = 9.0,
    lat_from: float = 21.0,
    lat_to: float = 26.0,
    lon_from: float = 119.0,
    lon_to: float = 123.0,
    depth_from: float = 0.0,
    depth_to: float = 100.0,
    start_time: str = "00:00:00",
    end_time: str = "23:59:59"
) -> str:
    """
    連接到遠端的 Gradio MCP 伺服器並執行地震資料查詢。
    """
    try:
        print(f"--- 正在呼叫遠端地震 MCP 伺服器 ---")
        print(f"    伺服器: {MCP_SERVER_URL}")
        print(f"    查詢條件: {start_date} 到 {end_date}, 規模 {min_magnitude} 以上")

        client = Client(src=MCP_SERVER_URL)

        # 根據 MCP-2 的 API 文件，呼叫 `/gradio_fetch_and_plot_data`
        # 並按照順序傳入 12 個參數
        result = client.predict(
            param_0=start_date,      # Start Date
            param_1=start_time,      # Start Time
            param_2=end_date,        # End Date
            param_3=end_time,        # End Time
            param_4=lat_from,        # Latitude From
            param_5=lat_to,          # To
            param_6=lon_from,        # Longitude From
            param_7=lon_to,          # To
            param_8=depth_from,      # Depth From
            param_9=depth_to,        # To
            param_10=min_magnitude,  # Magnitude From
            param_11=max_magnitude,  # To
            api_name="/gradio_fetch_and_plot_data"
        )

        # API 回傳一個包含兩個元素的 tuple: (dataframe_dict, plot_dict)
        # 我們需要的是第一個元素的資料部分
        dataframe_dict = result[0]
        headers = dataframe_dict.get('headers', [])
        data = dataframe_dict.get('data', [])

        if not data:
            print("--- MCP 伺服器回傳：未找到符合條件的地震 ---")
            return "查詢完成，但未找到任何符合條件的地震資料。"

        # 將回傳的資料轉換為更易讀的 JSON 格式
        formatted_results = [dict(zip(headers, row)) for row in data]
        json_result = json.dumps(formatted_results, indent=2, ensure_ascii=False)

        print(f"--- MCP 伺服器成功回傳 {len(data)} 筆資料 ---")
        return json_result

    except Exception as e:
        print(f"呼叫 MCP 伺服器失敗: {e}")
        return f"工具執行失敗，錯誤訊息: {e}"


# --- 3. 向 Gemini 定義新的地震查詢工具 ---
earthquake_search_tool_declaration = {
    "name": "call_earthquake_search_tool",
    "description": "根據指定的條件（時間、地點、規模等）從台灣中央氣象署的資料庫中搜尋地震事件。預設搜尋台灣周邊地區。",
    "parameters": {
        "type": "OBJECT",
        "properties": {
            "start_date": {
                "type": "STRING",
                "description": "搜尋的開始日期，格式為 'YYYY-MM-DD'。例如：'2025-01-01'。"
            },
            "end_date": {
                "type": "STRING",
                "description": f"搜尋的結束日期，格式為 'YYYY-MM-DD'。例如：'2025-08-14'。預設為今天。"
            },
            "min_magnitude": {
                "type": "NUMBER",
                "description": "要搜尋的最小地震規模。預設為 4.0。"
            },
            "max_magnitude": {
                "type": "NUMBER",
                "description": "要搜尋的最大地震規模。預設為 9.0。"
            },
            "lat_from": {"type": "NUMBER", "description": "緯度範圍的起始值。預設為 21.0。"},
            "lat_to": {"type": "NUMBER", "description": "緯度範圍的結束值。預設為 26.0。"},
            "lon_from": {"type": "NUMBER", "description": "經度範圍的起始值。預設為 119.0。"},
            "lon_to": {"type": "NUMBER", "description": "經度範圍的結束值。預設為 123.0。"},
        },
        "required": ["start_date", "end_date"] # 讓 Gemini 必須提供日期
    }
}

# --- 4. 建立 Gemini 模型 ---
model = genai.GenerativeModel(
    model_name='gemini-1.5-flash-latest',
    tools=[earthquake_search_tool_declaration]
)


# --- 5. 主執行迴圈 (修正後版本) ---
print("你好！我是 Gemini，已經連接了您的地震查詢工具 🗺️。您可以開始提問了。（輸入 'exit' 結束）")
chat = model.start_chat(enable_automatic_function_calling=False) # 我們手動處理 Function Call

while True:
    prompt = input("您: ")
    if prompt.lower() == 'exit':
        break

    try:
        response = chat.send_message(prompt)

        # --- 穩健的回應處理邏輯 ---
        has_function_call = False
        text_response = ""

        for part in response.candidates[0].content.parts:
            if part.function_call:
                function_call = part.function_call
                print(f"--- Gemini 決定呼叫工具: {function_call.name} ---")

                if function_call.name == "call_earthquake_search_tool":
                    has_function_call = True
                    args = function_call.args
                    # 如果使用者沒提供結束日期，自動設為今天
                    if 'end_date' not in args:
                        args['end_date'] = datetime.now().strftime('%Y-%m-%d')

                    tool_result = call_mcp_earthquake_search(**args)

                    print("--- 將工具結果回傳給 Gemini，讓它產生最終回覆 ---")
                    # 將工具的執行結果回傳給模型
                    response = chat.send_message(
                        genai.protos.Part(
                            function_response={
                                "name": "call_earthquake_search_tool",
                                "response": {"result": tool_result},
                            }
                        )
                    )
                    # 處理工具執行後的最終回覆
                    for final_part in response.candidates[0].content.parts:
                        text_response += final_part.text

            elif part.text:
                 text_response += part.text

        # 迴圈結束後，印出收集到的所有文字回應
        if text_response:
             print(f"Gemini: {text_response}")
        # 如果模型沒有產生任何文字，也沒有呼叫函式，給一個通用回覆
        elif not has_function_call:
             print("Gemini: 我不確定該如何回應。")

    except Exception as e:
        print(f"發生錯誤：{e}")

你好！我是 Gemini，已經連接了您的地震查詢工具 🗺️。您可以開始提問了。（輸入 'exit' 結束）
您: what is the largest earthquake in 2024 ?
Gemini: I need to know the available data to answer your question.  The available tools don't specify how to get the largest earthquake within a year.  Can you provide more information or different tools?

您: exit


In [None]:
import google.generativeai as genai
import os
from dotenv import load_dotenv

# 載入您的 API 金鑰
load_dotenv()
genai.configure(api_key=userdata.get('GOOGLE_API_KEY'))

print("正在查詢您帳號可用的模型...\n")

# 遍歷所有模型
for model in genai.list_models():
  # 檢查該模型是否支援 'generateContent' 方法（聊天和生成內容所必需的）
  if 'generateContent' in model.supported_generation_methods:
    print(f"-> 可用模型: {model.name}")

正在查詢您帳號可用的模型...

-> 可用模型: models/gemini-1.5-pro-latest
-> 可用模型: models/gemini-1.5-pro-002
-> 可用模型: models/gemini-1.5-pro
-> 可用模型: models/gemini-1.5-flash-latest
-> 可用模型: models/gemini-1.5-flash
-> 可用模型: models/gemini-1.5-flash-002
-> 可用模型: models/gemini-1.5-flash-8b
-> 可用模型: models/gemini-1.5-flash-8b-001
-> 可用模型: models/gemini-1.5-flash-8b-latest
-> 可用模型: models/gemini-2.5-pro-preview-03-25
-> 可用模型: models/gemini-2.5-flash-preview-05-20
-> 可用模型: models/gemini-2.5-flash
-> 可用模型: models/gemini-2.5-flash-lite-preview-06-17
-> 可用模型: models/gemini-2.5-pro-preview-05-06
-> 可用模型: models/gemini-2.5-pro-preview-06-05
-> 可用模型: models/gemini-2.5-pro
-> 可用模型: models/gemini-2.0-flash-exp
-> 可用模型: models/gemini-2.0-flash
-> 可用模型: models/gemini-2.0-flash-001
-> 可用模型: models/gemini-2.0-flash-exp-image-generation
-> 可用模型: models/gemini-2.0-flash-lite-001
-> 可用模型: models/gemini-2.0-flash-lite
-> 可用模型: models/gemini-2.0-flash-preview-image-generation
-> 可用模型: models/gemini-2.0-flash-lite-preview-02-05
-

In [None]:
import os
import google.generativeai as genai
from gradio_client import Client
from dotenv import load_dotenv
from google.colab import userdata


# --- 1. 初始化與設定 ---
load_dotenv()
genai.configure(api_key=userdata.get('GOOGLE_API_KEY'))
MCP_SERVER_URL = "https://cwadayi-mcp-2.hf.space/"

# --- 2. 「轉接器」函式 ---
def call_mcp_letter_counter(word: str, letter: str) -> int:
    """
    連接到遠端的 Gradio MCP 伺服器並執行 letter_counter 工具。
    """
    try:
        print(f"--- 正在呼叫遠端 MCP 伺服器 ---")
        print(f"    伺服器: {MCP_SERVER_URL}")
        print(f"    參數: word='{word}', letter='{letter}'")

        client = Client(src=MCP_SERVER_URL)

        #    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        #    <<<<< 唯一的修改在這裡 >>>>
        #    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        # 移除 api_name，讓 client 自動呼叫預設的端點
        result = client.predict(word, letter)

        print(f"--- MCP 伺服器回傳結果: {result} ---")
        return int(result)
    except Exception as e:
        # 將更詳細的錯誤訊息印出，方便除錯
        print(f"呼叫 MCP 伺服器失敗: {e}")
        return -1

# --- 3. 向 Gemini 定義可用的工具 ---
letter_counter_tool_declaration = {
    "name": "call_letter_counter_tool",
    "description": "計算一個指定的字母在一段文字中出現了幾次。",
    "parameters": {
        "type": "OBJECT",
        "properties": {
            "word": { "type": "STRING", "description": "要在其中搜尋的完整文字。" },
            "letter": { "type": "STRING", "description": "要計數的單一字元。" }
        },
        "required": ["word", "letter"]
    }
}

# --- 4. 建立 Gemini 模型 ---
model = genai.GenerativeModel(
    model_name='gemini-1.5-flash-latest',
    tools=[letter_counter_tool_declaration]
)

# --- 5. 主執行迴圈 ---
print("你好！我是 Gemini，已經連接了您的字母計數器工具。您可以開始提問了。（輸入 'exit' 結束）")
chat = model.start_chat(enable_automatic_function_calling=False)

while True:
    prompt = input("您: ")
    if prompt.lower() == 'exit':
        break

    response = chat.send_message(prompt)

    function_call = response.candidates[0].content.parts[0].function_call
    if function_call:
        print(f"--- Gemini 決定呼叫工具: {function_call.name} ---")

        if function_call.name == "call_letter_counter_tool":
            args = function_call.args
            tool_result = call_mcp_letter_counter(word=args['word'], letter=args['letter'])

            print("--- 將工具結果回傳給 Gemini，讓它產生最終回覆 ---")
            response = chat.send_message(
                genai.protos.Part(
                    function_response={
                        "name": "call_letter_counter_tool",
                        "response": {"result": tool_result},
                    }
                )
            )
            print(f"Gemini: {response.text}")
    else:
        print(f"Gemini: {response.text}")

你好！我是 Gemini，已經連接了您的字母計數器工具。您可以開始提問了。（輸入 'exit' 結束）
您: 2015年以後規模大於5.0的地震有幾個
Gemini: I do not have access to real-time information, including earthquake data.  Therefore, I cannot answer how many earthquakes larger than magnitude 5.0 occurred after 2015.  To find this information, you would need to consult a seismic database such as those maintained by the USGS (United States Geological Survey) or other similar organizations.

您: exit


In [None]:
api_key = userdata.get('GOOGLE_API_KEY')

In [None]:
#!/usr/bin/env python3
"""
Google Gemini MCP Client for Earthquake Data Server
Connects to the earthquake data MCP server deployed on Hugging Face Spaces
and uses Gemini AI to provide intelligent earthquake data analysis.
"""

import asyncio
import json
import logging
import aiohttp
import google.generativeai as genai
from typing import Dict, Any, List, Optional
from datetime import datetime
import os
from dataclasses import dataclass

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class MCPResponse:
    """Data class for MCP response"""
    success: bool
    data: Optional[Dict[str, Any]] = None
    error: Optional[str] = None

class EarthquakeMCPClient:
    """MCP Client for connecting to earthquake data server"""

    def __init__(self, server_url: str):
        self.server_url = server_url.rstrip('/')
        self.mcp_endpoint = f"{self.server_url}/mcp"  # Assuming MCP endpoint
        self.session = None
        self.request_id = 0

    async def __aenter__(self):
        """Async context manager entry"""
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        if self.session:
            await self.session.close()

    def _get_next_id(self) -> int:
        """Get next request ID"""
        self.request_id += 1
        return self.request_id

    async def _make_mcp_request(self, method: str, params: Dict[str, Any] = None) -> MCPResponse:
        """Make MCP request to the server"""
        if not self.session:
            raise RuntimeError("Client not initialized. Use async context manager.")

        request_data = {
            "jsonrpc": "2.0",
            "id": self._get_next_id(),
            "method": method,
            "params": params or {}
        }

        try:
            # Try direct HTTP request to Gradio MCP interface
            async with self.session.post(
                f"{self.server_url}/api/predict",
                json={
                    "data": [json.dumps(request_data)],
                    "fn_index": 1  # Assuming MCP handler is at index 1
                },
                headers={"Content-Type": "application/json"},
                timeout=30
            ) as response:
                if response.status == 200:
                    result = await response.json()
                    # Extract response from Gradio format
                    if "data" in result and result["data"]:
                        mcp_response = json.loads(result["data"][0])
                        if "error" in mcp_response:
                            return MCPResponse(success=False, error=mcp_response["error"]["message"])
                        return MCPResponse(success=True, data=mcp_response.get("result"))
                    else:
                        return MCPResponse(success=False, error="Invalid response format")
                else:
                    return MCPResponse(success=False, error=f"HTTP {response.status}")

        except Exception as e:
            logger.error(f"MCP request failed: {e}")
            return MCPResponse(success=False, error=str(e))

    async def initialize(self) -> MCPResponse:
        """Initialize MCP connection"""
        return await self._make_mcp_request("initialize", {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {
                "name": "gemini-mcp-client",
                "version": "1.0.0"
            }
        })

    async def list_tools(self) -> MCPResponse:
        """List available tools"""
        return await self._make_mcp_request("tools/list")

    async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> MCPResponse:
        """Call a specific tool"""
        return await self._make_mcp_request("tools/call", {
            "name": tool_name,
            "arguments": arguments or {}
        })

class GeminiEarthquakeAnalyst:
    """Gemini AI analyst for earthquake data"""

    def __init__(self, api_key: str, model_name: str = "gemini-pro"):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel(model_name)
        self.mcp_client = None

    async def connect_to_mcp_server(self, server_url: str):
        """Connect to MCP server"""
        self.mcp_client = EarthquakeMCPClient(server_url)

    async def initialize_connection(self) -> bool:
        """Initialize MCP connection"""
        if not self.mcp_client:
            logger.error("MCP client not connected")
            return False

        async with self.mcp_client as client:
            # Initialize connection
            init_response = await client.initialize()
            if not init_response.success:
                logger.error(f"Failed to initialize: {init_response.error}")
                return False

            # List available tools
            tools_response = await client.list_tools()
            if not tools_response.success:
                logger.error(f"Failed to list tools: {tools_response.error}")
                return False

            logger.info("Successfully connected to MCP server")
            logger.info(f"Available tools: {list(tools_response.data.get('tools', []))}")
            return True

    async def query_earthquakes(self, **kwargs) -> Optional[Dict[str, Any]]:
        """Query earthquake data"""
        async with self.mcp_client as client:
            response = await client.call_tool("query_earthquakes", kwargs)
            if response.success:
                return json.loads(response.data["content"][0]["text"])
            else:
                logger.error(f"Query failed: {response.error}")
                return None

    async def get_earthquake_stats(self, **kwargs) -> Optional[Dict[str, Any]]:
        """Get earthquake statistics"""
        async with self.mcp_client as client:
            response = await client.call_tool("get_earthquake_stats", kwargs)
            if response.success:
                return json.loads(response.data["content"][0]["text"])
            else:
                logger.error(f"Stats query failed: {response.error}")
                return None

    async def create_earthquake_map(self, **kwargs) -> Optional[Dict[str, Any]]:
        """Create earthquake map"""
        async with self.mcp_client as client:
            response = await client.call_tool("create_earthquake_map",
                                            {**kwargs, "return_base64": True})
            if response.success:
                return json.loads(response.data["content"][0]["text"])
            else:
                logger.error(f"Map creation failed: {response.error}")
                return None

    def _format_earthquake_data_for_gemini(self, data: Dict[str, Any]) -> str:
        """Format earthquake data for Gemini analysis"""
        if not data.get("success"):
            return f"Error retrieving data: {data.get('error', 'Unknown error')}"

        if data.get("count", 0) == 0:
            return "No earthquake data found for the specified criteria."

        formatted = f"""
Earthquake Data Summary:
- Total earthquakes: {data.get('count', 0)}
- Date range: Query parameters used
        """

        if "summary" in data:
            summary = data["summary"]
            formatted += f"""
- Magnitude range: {summary.get('magnitude_range', [])}
- Depth range: {summary.get('depth_range', [])} km
- Location bounds: {summary.get('location_bounds', {})}
            """

        if "data" in data and data["data"]:
            formatted += f"\nSample earthquakes:\n"
            for i, eq in enumerate(data["data"][:5]):  # Show first 5
                formatted += f"- Date: {eq.get('date', 'N/A')}, Magnitude: {eq.get('ML', 'N/A')}, Depth: {eq.get('depth', 'N/A')}km\n"

        return formatted.strip()

    def _format_stats_for_gemini(self, stats: Dict[str, Any]) -> str:
        """Format statistics for Gemini analysis"""
        if not stats.get("success"):
            return f"Error retrieving statistics: {stats.get('error', 'Unknown error')}"

        formatted = f"""
Earthquake Statistics:
- Total earthquakes: {stats.get('total_earthquakes', 0)}
- Date range: {stats.get('date_range', {})}

Magnitude Statistics:
- Range: {stats.get('magnitude_stats', {}).get('min', 'N/A')} - {stats.get('magnitude_stats', {}).get('max', 'N/A')}
- Mean: {stats.get('magnitude_stats', {}).get('mean', 'N/A'):.2f}
- Median: {stats.get('magnitude_stats', {}).get('median', 'N/A'):.2f}

Depth Statistics:
- Range: {stats.get('depth_stats', {}).get('min', 'N/A')} - {stats.get('depth_stats', {}).get('max', 'N/A')} km
- Mean: {stats.get('depth_stats', {}).get('mean', 'N/A'):.2f} km

Magnitude Distribution:
{json.dumps(stats.get('magnitude_distribution', {}), indent=2)}

Depth Distribution:
{json.dumps(stats.get('depth_distribution', {}), indent=2)}
        """

        return formatted.strip()

    async def analyze_earthquake_data(self, query_params: Dict[str, Any] = None) -> str:
        """Analyze earthquake data using Gemini"""
        try:
            # Get earthquake data
            data = await self.query_earthquakes(**(query_params or {}))
            if not data:
                return "Failed to retrieve earthquake data."

            # Get statistics
            stats = await self.get_earthquake_stats(**(query_params or {}))

            # Format data for Gemini
            data_text = self._format_earthquake_data_for_gemini(data)
            stats_text = self._format_stats_for_gemini(stats) if stats else ""

            # Create prompt for Gemini
            prompt = f"""
Please analyze the following earthquake data and provide insights:

{data_text}

{stats_text}

Please provide:
1. A summary of the earthquake activity
2. Notable patterns or trends
3. Risk assessment based on the data
4. Recommendations for further analysis
5. Any significant observations about magnitude, depth, or location patterns

Be specific and reference the actual data in your analysis.
            """

            # Generate analysis using Gemini
            response = self.model.generate_content(prompt)
            return response.text

        except Exception as e:
            logger.error(f"Analysis failed: {e}")
            return f"Analysis failed: {str(e)}"

    async def answer_earthquake_question(self, question: str,
                                       query_params: Dict[str, Any] = None) -> str:
        """Answer specific questions about earthquake data"""
        try:
            # Get relevant data based on the question
            data = await self.query_earthquakes(**(query_params or {}))
            stats = await self.get_earthquake_stats(**(query_params or {}))

            if not data:
                return "I couldn't retrieve earthquake data to answer your question."

            # Format data for context
            data_context = self._format_earthquake_data_for_gemini(data)
            stats_context = self._format_stats_for_gemini(stats) if stats else ""

            prompt = f"""
Based on the following earthquake data, please answer this question: "{question}"

Earthquake Data:
{data_context}

Statistics:
{stats_context}

Please provide a detailed, accurate answer based on the actual data provided. If the data doesn't contain enough information to answer the question, please say so.
            """

            response = self.model.generate_content(prompt)
            return response.text

        except Exception as e:
            logger.error(f"Question answering failed: {e}")
            return f"I encountered an error: {str(e)}"

async def main():
    """Main function demonstrating the MCP client usage"""

    # Configuration
    GEMINI_API_KEY = userdata.get('GOOGLE_API_KEY')
    MCP_SERVER_URL = "https://cwadayi-mcp-2.hf.space"

    if not GEMINI_API_KEY:
        print("Please set GEMINI_API_KEY environment variable")
        return

    # Initialize Gemini analyst
    analyst = GeminiEarthquakeAnalyst(GEMINI_API_KEY)
    await analyst.connect_to_mcp_server(MCP_SERVER_URL)

    # Test connection
    if not await analyst.initialize_connection():
        print("Failed to connect to MCP server")
        return

    print("=== Gemini Earthquake Data Analyst ===\n")

    # Example 1: General analysis
    print("1. General earthquake analysis for Taiwan region (2024):")
    analysis = await analyst.analyze_earthquake_data({
        "start_date": "2024-01-01",
        "end_date": "2024-12-31",
        "ML_min": 4.0
    })
    print(analysis)
    print("\n" + "="*50 + "\n")

    # Example 2: Answer specific question
    print("2. Answering specific question:")
    question = "What was the strongest earthquake in the first quarter of 2024?"
    answer = await analyst.answer_earthquake_question(
        question,
        {
            "start_date": "2024-01-01",
            "end_date": "2024-03-31",
            "ML_min": 4.0
        }
    )
    print(f"Q: {question}")
    print(f"A: {answer}")
    print("\n" + "="*50 + "\n")

    return analyst  # Return analyst for interactive use

def run_main():
    """Run main function with proper event loop handling"""
    try:
        # Check if we're in a running event loop (like Jupyter)
        loop = asyncio.get_running_loop()
        if loop and loop.is_running():
            # We're in a running event loop, create a task instead
            import nest_asyncio
            nest_asyncio.apply()
            return asyncio.run(main())
        else:
            return asyncio.run(main())
    except RuntimeError:
        # Fallback: try to run in existing loop
        try:
            import nest_asyncio
            nest_asyncio.apply()
            return asyncio.run(main())
        except ImportError:
            print("For Jupyter notebook, please install nest_asyncio: pip install nest_asyncio")
            print("Or use the notebook-friendly functions below:")
            return None

# Notebook-friendly wrapper functions
async def quick_analysis(api_key: str,
                        start_date: str = "2024-01-01",
                        end_date: str = "2024-12-31",
                        ml_min: float = 4.0):
    """Quick analysis function for notebook use"""
    analyst = GeminiEarthquakeAnalyst(api_key)
    await analyst.connect_to_mcp_server("https://cwadayi-mcp-2.hf.space")

    if await analyst.initialize_connection():
        analysis = await analyst.analyze_earthquake_data({
            "start_date": start_date,
            "end_date": end_date,
            "ML_min": ml_min
        })
        return analysis
    return "Failed to connect to MCP server"

async def ask_earthquake_question(api_key: str,
                                question: str,
                                start_date: str = "2024-01-01",
                                end_date: str = "2024-12-31",
                                ml_min: float = 4.0):
    """Ask a question about earthquakes"""
    analyst = GeminiEarthquakeAnalyst(api_key)
    await analyst.connect_to_mcp_server("https://cwadayi-mcp-2.hf.space")

    if await analyst.initialize_connection():
        answer = await analyst.answer_earthquake_question(
            question,
            {
                "start_date": start_date,
                "end_date": end_date,
                "ML_min": ml_min
            }
        )
        return answer
    return "Failed to connect to MCP server"

async def create_analyst(api_key: str):
    """Create and initialize analyst for interactive use"""
    analyst = GeminiEarthquakeAnalyst(api_key)
    await analyst.connect_to_mcp_server("https://cwadayi-mcp-2.hf.space")

    if await analyst.initialize_connection():
        return analyst
    return None

if __name__ == "__main__":
    # Try to run main, handle event loop issues
    analyst = run_main()

    # If main didn't run due to event loop issues, provide alternative
    if analyst is None:
        print("\n" + "="*60)
        print("EVENT LOOP ISSUE DETECTED")
        print("="*60)
        print("If you're in Jupyter notebook, use these commands instead:")
        print()
        print("# Set your API key")
        print("api_key = 'your-gemini-api-key'")
        print()
        print("# Quick analysis")
        print("analysis = await quick_analysis(api_key)")
        print("print(analysis)")
        print()
        print("# Ask a question")
        print("answer = await ask_earthquake_question(api_key, 'What was the strongest earthquake?')")
        print("print(answer)")
        print()
        print("# Create analyst for multiple queries")
        print("analyst = await create_analyst(api_key)")
        print("if analyst:")
        print("    result = await analyst.analyze_earthquake_data({'ML_min': 5.0})")
        print("    print(result)")
        print("="*60)

ERROR:__main__:Failed to initialize: HTTP 404


Failed to connect to MCP server

EVENT LOOP ISSUE DETECTED
If you're in Jupyter notebook, use these commands instead:

# Set your API key
api_key = 'your-gemini-api-key'

# Quick analysis
analysis = await quick_analysis(api_key)
print(analysis)

# Ask a question
answer = await ask_earthquake_question(api_key, 'What was the strongest earthquake?')
print(answer)

# Create analyst for multiple queries
analyst = await create_analyst(api_key)
if analyst:
    result = await analyst.analyze_earthquake_data({'ML_min': 5.0})
    print(result)
