# 第 5 章 Agent 的基礎–用 Function Calling 幫 AI 長手腳

## 前置工作

請先執行以下這幾個儲存格，便於後續測試：

In [None]:
from IPython.display import Markdown, display
from google.colab import userdata
from rich.pretty import pprint
from pydantic import BaseModel, Field, ConfigDict
from io import BytesIO
import requests
import openai
import sys
import os
import pickle
import json

In [None]:
client = openai.OpenAI(api_key=userdata.get('OPENAI_API_KEY'))

In [None]:
def upload_file(file_path):
    try:
        if (file_path.startswith('http://')
            or file_path.startswith('https://')):
            response = requests.get(file_path)
            filename = response.headers.get(
                'content-disposition',
                None
            )
            if filename:
                filename = filename.split('filename=')[-1]
                filename = filename.strip('\'"')
            else:
                filename = file_path.split('/')[-1]
            response = client.files.create(
                file=(filename, response.content),
                purpose='user_data'
            )
        else:
            with open(file_path, 'rb') as file:
                response = client.files.create(
                    file=file,
                    purpose='user_data'
                )
    except:
        return None
    return response.id

In [None]:
class BaseComand:
    def __init__(self, command, tool_name, icon, verbose=False):
        self.command = command      # 指令
        self.tool_name = tool_name  # 工具名稱
        self.icon = icon            # 工具圖示字元
        self.verbose = verbose      # 啟用詳細輸出
        self.extra_args = {}        # 要額外送給模型的參數

    def handle_command(self, chat, cmd):
        # 不是正確的指令開頭（空字串會是 True）
        if not cmd.startswith(self.command):
            return False
        return True

    def handle_event(self, chat, stream, event):
        return None # 預設不處理交給下一個指令處理器

In [None]:
from urllib.parse import unquote

class WebSearchCommand(BaseComand):
    def __init__(self, verbose=False):
        super().__init__(
            '/w',
            'web_search_preview',
            '🌐',
            verbose
        )

    def handle_command(self, chat, cmd):
        if not super().handle_command(chat, cmd):
            return False
        idx = chat.find_tool_index(self.tool_name)
        if idx == -1:
            chat.tools.append({
                'type': self.tool_name
            })
        else:
            chat.tools.pop(idx)
        return True

    def show_search_results(self, response):
        if response.output[0].type != "web_search_call":
            return
        content = response.output[1].content[0]
        for i, annotaion in enumerate(
            content.annotations, start=1
        ):
            print(f'{i}. {annotaion.title}')
            print(f'   {unquote(annotaion.url)}')

    def handle_event(self, chat, stream, event):
        if not self.verbose: return None
        if event.type == 'response.completed':
            self.show_search_results(event.response)
        return None

In [None]:
class FileSearchCommand(BaseComand):
    def __init__(self, vector_store_id=None, verbose=False):
        super().__init__(
            '/f',
            'file_search',
            '🔍',
            verbose
        )
        self.vector_store_id = vector_store_id

    def handle_command(self, chat, cmd):
        if not super().handle_command(chat, cmd):
            return False
        idx = chat.find_tool_index(self.tool_name)
        if len(cmd) < 4: # /f[.]，不是冒號加檔名/網址
            if self.vector_store_id == None:
                print('請先使用 /f:[路徑|網址]上傳檔案')
                return True # 沒有向量資料庫無法切換
            turn_on = (idx == -1) # 切換開/關檔案檢索
        else: # /f:檔名|網址，上傳檔案並開啟檢索功能
            turn_on = True
            file_path = cmd[3:]
            file_id= upload_file(file_path)
            if not file_id:
                print(f'無法上傳檔案：{file_path}')
                return True
            if self.vector_store_id == None:
                vector_store = client.vector_stores.create(
                    name="temp",
                    file_ids=[file_id],
                )
                self.vector_store_id = vector_store.id
            else:
                client.vector_stores.files.create(
                    self.vector_store_id,
                    file_id=file_id
                )
        if turn_on:
            chat.tools.append({
                'type': self.tool_name,
                'vector_store_ids': [self.vector_store_id]
            })
            self.extra_args = {
                'include': ['file_search_call.results']
            }
        else:
            chat.tools.pop(idx)
            self.extra_args = {}
        return True

    def remove_vector_store(self, chat):
        if self.vector_store_id == None: return
        idx = chat.find_tool_index('file_search')
        if idx: chat.tools.pop(idx)
        response = client.vector_stores.files.list(
            self.vector_store_id
        )
        for vector_file in response.data:
            client.files.delete(vector_file.id)
        client.vector_stores.delete(self.vector_store_id)

    def show_file_search_results(self, response):
        if response.output[0].type != 'file_search_call':
            return
        results = response.output[0].results
        if not results: return
        for i, result in enumerate(results, start=1):
            display(Markdown('---'))
            print(f'{i}. {result.filename}({result.score})')
            display(Markdown('---'))
            display(Markdown(result.text))

    def handle_event(self, chat, stream, event):
        if not self.verbose: return None
        if event.type == 'response.completed':
            self.show_file_search_results(event.response)
        return None

## 5-1 認識 function calling 機制

## 5-2 提供客製搜尋功能

### 提供 Google 搜尋的函式

In [None]:
!pip -q install googlesearch-python

In [None]:
from googlesearch import search
for url in search('高中排球聯賽'):
    print(url)

- `lang` 參數使用[這裡](https://developers.google.com/custom-search/docs/json_api_reference#supported-interface-languages)的值。
- `region` 參數可用這裡查到的[國別碼](https://developers.google.com/custom-search/docs/json_api_reference#country-codes)

In [None]:
for i, result in enumerate(
    search(
        '高中排球聯賽',
        advanced=True, # 進階模式可取得標題、摘要與網址
        num_results=3, # 搜尋筆數
        lang='zh-TW',  # 限定語言
        region='tw',   # 限定區域
        unique=True    # 篩除重複連結
    ),
    start=1
):
    print(f'{i}. {result.title}')
    print(result.description)
    print(result.url)

用 JSON 整理搜尋結果經過測試效果似乎沒有 Markdown 好：

In [None]:
def google_res(keyword, num_results=5):
    content = ""
    num_results = max(num_results, 5) # 最少 5 筆
    for result in search( # 一一串接搜尋結果
        keyword,
        advanced=True,
        num_results=num_results,
        lang='zh-TW'
    ):
        # 使用 markdown 格式整理搜尋結果
        content += (f"- [{result.title}]({result.url})\n"
                    f"    {result.description}\n")
    return content

In [None]:
display(Markdown(google_res('高中排球聯賽冠軍')))

### 使用 JSON Schema 描述函式

In [None]:
# 描述 google_res 工具函式的參數
class GoogleRes(BaseModel):
    keyword: str = Field(description='要搜尋的關鍵字')
    num_results: int = Field(
        description='搜尋結果數目，不提供預設 5 筆'
    )

# 實際要送給模型描述參數的內容 (字典)
pprint(GoogleRes.model_json_schema())

In [None]:
tools = [{
    "type":"function",
    "name": "google_res",                  # 函式名稱
    "description": "取得 Google 搜尋結果", # 函式說明
    "parameters": GoogleRes.model_json_schema(), # 參數規格
}]

### 使用 function calling

In [None]:
query = "高中排球聯賽冠軍" # 接著要詢問的問題

response = client.responses.create(
    model = "gpt-4.1-nano",
    input = query,
    tools = tools,
    parallel_tool_calls=False, # 限制只能叫用一次
)

In [None]:
pprint(response.output_text)

In [None]:
pprint(response)

### 依據模型指示叫用函式

In [None]:
# 取得建議叫用函式的資訊
if response.output[0].type == 'function_call':
    tool_call = response.output[0]
    tool_info = f'{tool_call.name}(**{tool_call.arguments})'
    print(tool_info)

In [None]:
response1 = client.responses.create(
    model='gpt-4.1-nano',
    input=[
        {"role": "user", "content": query},
        # 傳回 AI 傳給我們的 function calling 指示
        tool_call,
        {   # 建立可傳回函式執行結果的字典
            "type": "function_call_output", # 工具執行結果類型
            "call_id": tool_call.call_id, # 叫用函式的識別碼
            "output": eval(tool_info) # 叫用函式取得結果
        }
    ],
    tools=tools
)

In [None]:
print(response1.output_text)

#### 利用串接回應的方式使用 function calling

In [None]:
response2 = client.responses.create(
    model='gpt-4.1-nano',
    input=[
        # 不用重傳提示內容
        # {"role": "user", "content": query},
        # 傳回 AI 傳給我們的 function calling 指示
        # tool_call,
        {   # 建立可傳回函式執行結果的字典
            "type": "function_call_output", # 以工具角色送出回覆
            "call_id": tool_call.call_id, # 叫用函式的識別碼
            "output": eval(tool_info) # 叫用函式取得結果
        }
    ],
    tools=tools,
    previous_response_id=response.id
)

print(response2.output_text)

In [None]:
pprint(response1.usage.input_tokens)
pprint(response2.usage.input_tokens)

### 建立方便進行 function calling 的輔助函式

In [None]:
# 叫用單一函式並且將函式執行結果組成訊息後傳回
def make_tool_msg(tool_call):
    tool_info = f'{tool_call.name}(**{tool_call.arguments})'
    result = eval(tool_info)
    return {   # 建立可傳回函式執行結果的字典
        "type": "function_call_output", # 以工具角色送出回覆
        "call_id": tool_call.call_id, # 叫用函式的識別碼
        "output": result # 函式傳回值
    }

In [None]:
response = client.responses.create(
    model='gpt-4.1-mini',
    input=query,
    tools=tools
)

# 取得建議叫用函式的資訊
if response.output[0].type == 'function_call':
    tool_call = response.output[0]
    response = client.responses.create(
        model='gpt-4.1-mini',
        input=[
            {"role":"user", "content":query},
            # 傳回 AI 傳給我們的 function calling 指示
            tool_call,
            # 再加上包含函式執行結果的字典
            make_tool_msg(tool_call)
        ],
        tools=tools
    )

In [None]:
print(response.output_text)

### 同時叫用多個函式（parallel function calling）

In [None]:
query = "2025 奧斯卡最佳男主角與 2024 奧斯卡最佳女主角各是誰？"

In [None]:
response = client.responses.create(
    model = "gpt-4.1-mini",
    input = query,
    tools = tools,
)

pprint(response)

In [None]:
def call_tools(tool_calls):
    msgs = []
    for tool_call in tool_calls:
        if tool_call.type == 'function_call':
            msgs.append(make_tool_msg(tool_call))
    return msgs

In [None]:
tool_calls = response.output

response = client.responses.create(
    model='gpt-4.1-mini',
    input=[{"role": "user", "content": query}]
        # 傳回 AI 傳給我們的 function calling 指示
        + tool_calls
        # 我們執行函式的結果
        + call_tools(tool_calls)
)

print(response.output_text)

In [None]:
response = client.responses.create(
    model = "gpt-4.1-mini",
    input = query,
    tools = tools,
    parallel_tool_calls=False
)

pprint(response)

### 串流模式下的 function calling

In [None]:
response = client.responses.create(
    model = "gpt-4.1-mini",
    input = query,
    tools = tools,
    stream=True
)

for chunk in response:
    pprint(chunk)

## 5-3 幫簡易聊天應用程式加入 function calling 功能

### 設計處理自訂函式工具的類別

def make_tool_msg(tool_call):
這裡有更新程式碼，原本初版（2025/6）書上的 `make_tool_msg` 方法的程式碼如下：

```python
    # 叫用單一函式並且將函式執行結果組成訊息後傳回
    def make_tool_msg(self, tool_call):
        tool_info = f'{tool_call.name}(**{tool_call.arguments})'
        if self.verbose: print(f'叫用：{tool_info}')
        result = eval(tool_info)
        return {   # 建立可傳回函式執行結果的字典
            "type": "function_call_output", # 以工具角色送出回覆
            "call_id": tool_call.call_id, # 叫用函式的識別碼
            "output": result # 函式傳回值
        }
```

但由於 `tool_call.arguments` 是 JSON，其中像是 `true/false` 大小寫與 Python 不同，若直接以字串傳入 `eval` 會出錯，雖然本章不會出現有問題的狀況，但在其他應用時就可能會遇到，因此底下 29～31 行改為先轉成 Python 字典的方式：

In [None]:
class FunctionCallingCommand(BaseComand):
    def __init__(self, tools=None, verbose=False):
        super().__init__(
            '/t',
            'function',
            '',
            verbose
        )
        self.tools = tools or []
        self.enabled = False

    def handle_command(self, chat, cmd):
        if not super().handle_command(chat, cmd):
            return False
        if not self.enabled: # 加入自訂函式工具
            chat.tools.extend(self.tools)
        else: # 移除自訂工具函式
            chat.tools = [
                tool for tool in chat.tools
                if tool['type'] != self.tool_name
            ]
        self.enabled = not self.enabled
        return True

    # 叫用單一函式並且將函式執行結果組成訊息後傳回
    def make_tool_msg(self, tool_call):
        tool_info = f'{tool_call.name}(**{tool_call.arguments})'
        if self.verbose: print(f'叫用：{tool_info}')
        func = eval(tool_call.name)
        args = json.loads(tool_call.arguments)
        result = func(**args)
        return {   # 建立可傳回函式執行結果的字典
            "type": "function_call_output", # 以工具角色送出回覆
            "call_id": tool_call.call_id, # 叫用函式的識別碼
            "output": result # 函式傳回值
        }

    def call_tools(self, tool_calls):
        msgs = []
        for tool_call in tool_calls:
            if tool_call.type == 'function_call':
                msgs.append(self.make_tool_msg(tool_call))
        return msgs if msgs else None

    def handle_event(self, chat, stream, event):
        if not self.enabled: return None
        if event.type != 'response.completed':
            return None
        # 呼叫函式
        tool_calls = event.response.output
        tool_results = self.call_tools(tool_calls)
        if tool_results: return tool_calls + tool_results
        return None

### 修改 Chat 類別搭配 function calling 運作

In [None]:
class Chat:
    def __init__(self, client, **kwargs):
        self._client = client
        self._last_id = kwargs.pop('last_id', None)
        # 限制工具執行圈數，避免無窮盡叫用工具
        self._max_tools_rounds = kwargs.pop('max_tools_rounds', 4)
        self.tools = [] # 預設沒有使用工具
        self._commands = kwargs.pop('commands', [])

    def get_reply_text(self, msg, **kwargs) -> str:
        instructions = kwargs.pop('instructions', '使用繁體中文')
        model = kwargs.pop('model', 'gpt-4.1-nano')
        stream = kwargs.pop('stream', False)
        tool_results = [] # 函式叫用的相關資訊
        for command in self._commands:
            kwargs.update(command.extra_args)
        try:
            messages = [{'role': 'user', 'content': msg}]
            for _ in range(self._max_tools_rounds):
                # 方便稍後串接函式叫用資訊
                if tool_results: # 串接叫用函式的資訊
                    messages += tool_results
                response = self._client.responses.create(
                    instructions=instructions,
                    model=model,
                    input=messages,
                    stream=True, # 都以串流方式處理，簡化程式邏輯
                    previous_response_id=self._last_id, # 串接回應
                    **kwargs
                )
                for event in response:
                    for command in self._commands:
                        result = command.handle_event(
                            self, stream, event
                        )
                        if result: break
                    tool_results = []
                    if isinstance(result, list):
                        # 工具要送回給模型的訊息串列
                        tool_results = result
                        break
                    if event.type == 'response.output_text.delta':
                        if stream: yield event.delta
                    elif event.type == (
                        'response.output_text.done'
                    ):
                        # 非串流模式要傳回完整內容
                        if not stream:
                            yield event.text
                    elif event.type == 'response.completed':
                        # 更新最後回應的識別碼
                        self._last_id = event.response.id
                if not tool_results:
                    # 沒有要送回給模型的訊息
                    # 表示已經成功生成內容
                    break
        except openai.APIError as err:
            print(f'Error:{err.body["message"]}', file=sys.stderr)
            return ''

    def find_tool_index(self, tool_type):
        for i, tool in enumerate(self.tools):
            if tool['type'] == tool_type: return i
        return -1

    def _get_prompt(self):
        prompt = ''
        for command in self._commands:
            idx = self.find_tool_index(command.tool_name)
            if idx != -1:
                prompt += f'{command.icon}'
        user_tools_count = len(self.tools) - len(prompt)
        prompt += f'(🛠️{user_tools_count})>>> '
        return prompt

    def _process_command(self, cmd):
        for command in self._commands:
            if command.handle_command(self, cmd):
                return True
        return False

    def loop(self, **kwargs) -> None:
        print("直接按 ↵ 可結束對話")
        while True:
            user_msg = input(self._get_prompt())
            if not user_msg.strip(): break # 直接 ↵ 就結束
            if self._process_command(user_msg):
                continue # 指令不需回覆，回頭讓使用者輸入
            text = ''
            display_handle = display(text, display_id=True)
            for reply in self.get_reply_text(
                user_msg,
                tools=self.tools, # 傳入要使用的工具
                **kwargs
            ):
                text += reply
                display_handle.update(Markdown(text))

    def save(self, filename) -> None:
        with open(filename, 'wb') as f:
            pickle.dump(
                {
                    'last_id': self._last_id,
                    'tools': self.tools,
                    'commands': self._commands
                },
                f
            )

    def load(self, filename) -> None:
        with open(filename, 'rb') as f:
            data = pickle.load(f)
            self._last_id = data['last_id']
            self.tools = data['tools']
            self._commands = data['commands']

    def show_thread(self):
        if not self._last_id: return
        inputs = client.responses.input_items.list(self._last_id)
        response = client.responses.retrieve(self._last_id)
        for item in inputs.data[::-1]:
            # 略過函式叫用指示等非訊息內容
            if item.type != 'message': continue
            prompt = ">>> " if item.role == 'user' else ''
            for content in item.content:
                print(f'{prompt}{content.text}')
        print(response.output_text)

    def delete_thread(self):
        if not self._last_id: return
        last_id = self._last_id
        while last_id:
            response = client.responses.retrieve(last_id)
            last_id, curr_id = (
                response.previous_response_id,
                last_id
            )
            client.responses.delete(curr_id)
        self._last_id = None

### 測試使用自訂工具函式聊天

可用來測試上傳 PDF 的耳機說明書檔案網址：

```
https://coolermaster.egnyte.com/dd/BtL7gG2IW6/
```

In [None]:
file_search_command = FileSearchCommand()

chat = Chat(
    client,
    commands=[
        FunctionCallingCommand(tools=tools),
        file_search_command
    ]
)
chat.loop(
    model='gpt-4.1-mini',
    stream=True,
)

In [None]:
chat.delete_thread()
file_search_command.remove_vector_store(chat)

### 檢視函式叫用的指示

In [None]:
chat = Chat(
    client,
    commands=[
        FunctionCallingCommand(tools=tools, verbose=True),
        FileSearchCommand(),
        WebSearchCommand()
    ]
)

chat.loop(
    model='gpt-4.1-mini',
    stream=True,
)

### 強制使用內建工具

In [None]:
chat.loop(
    model='gpt-4.1-mini',
    stream=True,
    tool_choice={'type': 'web_search_preview'}
)

### 不允許單回叫用多個函式

In [None]:
chat.loop(
    model='gpt-4.1-mini',
    stream=True,
    parallel_tool_calls=False
)

移除剛剛建立的討論串以及向量儲存區：

In [None]:
FileSearchCommand.remove_vector_store(chat)
chat.delete_thread()

## 5-4 讓 AI 長出手腳打造智慧 CLI 指令介面

### 執行 shell 指令的自訂函式工具

- [Warp 終端機軟體](https://on.warp.dev)

In [None]:
import subprocess

def shell_helper(comment, shell_command):
    # 啟動子行程
    process = subprocess.Popen(
        shell_command,
        shell=True,             # 在 shell 中執行
        stdout=subprocess.PIPE, # 擷取標準輸出
        stderr=subprocess.PIPE, # 擷取錯誤輸出
        text=True               # 以文字形式返回
    )

    result = '執行結果：\n\n```\n'

    # 即時讀取輸出
    while True:
        output = process.stdout.readline()
        # 如果沒有輸出且行程結束
        if output == '' and process.poll() is not None:
            break
        if output:
            result += output

    result += "```"

    # 檢查錯誤輸出
    error = process.stderr.read()
    if error:
        result += f"\n\n錯誤: {error}"

    # 等待行程結束並取得返回碼
    return_code = process.wait()
    result += f"\n\n命令執行完成，返回碼: {return_code}\n\n"
    return result

In [None]:
display(Markdown(
    shell_helper('列出檔案', 'ls sample_data')
))

### 提供模型自訂的 shell 指令執行函式

In [None]:
class ShellHelper(BaseModel):
    comment: str = Field(
        description='判斷要執行指定的 shell 指令的原因'
    )
    shell_command: str = Field(
        description='要執行的 shell 指令'
    )

In [None]:
shell_helper_tool = {
    'type': 'function',
    "name": "shell_helper",
    "description": "我可以執行 shell 指令操控電腦",
    "parameters": ShellHelper.model_json_schema(),
}

### 測試用說的就可以操控電腦的樂趣

In [None]:
chat = Chat(
    client,
    commands=[FunctionCallingCommand(
        tools=[shell_helper_tool],
        verbose=True
    )]
)

chat.loop(
    model='gpt-4.1-mini',
)

### 處理 IPython 特有的問題

- [IPython 的奇特 feature](https://dev.to/codemee/ipython-de-qi-te-feature-2fn8)

In [None]:
chat.loop(
    model='gpt-4.1-mini',
    stream=True,
)

In [None]:
class Chat:
    def __init__(self, client, **kwargs):
        self._client = client
        self._last_id = kwargs.pop('last_id', None)
        # 限制工具執行圈數，避免無窮盡叫用工具
        self._max_tools_rounds = kwargs.pop('max_tools_rounds', 4)
        self.tools = [] # 預設沒有使用工具
        self._commands = kwargs.pop('commands', [])

    def get_reply_text(self, msg, **kwargs) -> str:
        instructions = kwargs.pop('instructions', '使用繁體中文')
        model = kwargs.pop('model', 'gpt-4.1-nano')
        stream = kwargs.pop('stream', False)
        tool_results = [] # 函式叫用的相關資訊
        for command in self._commands:
            kwargs.update(command.extra_args)
        try:
            messages = [{'role': 'user', 'content': msg}]
            for _ in range(self._max_tools_rounds):
                # 方便稍後串接函式叫用資訊
                if tool_results: # 串接叫用函式的資訊
                    messages += tool_results
                response = self._client.responses.create(
                    instructions=instructions,
                    model=model,
                    input=messages,
                    stream=True, # 都以串流方式處理，簡化程式邏輯
                    previous_response_id=self._last_id, # 串接回應
                    **kwargs
                )
                for event in response:
                    for command in self._commands:
                        result = command.handle_event(
                            self, stream, event
                        )
                        if result: break
                    tool_results = []
                    if isinstance(result, list):
                        # 工具要送回給模型的訊息串列
                        tool_results = result
                        break
                    if event.type == 'response.output_text.delta':
                        if stream: yield event.delta
                    elif event.type == (
                        'response.output_text.done'
                    ):
                        # 非串流模式要傳回完整內容
                        if not stream:
                            yield event.text
                    elif event.type == 'response.completed':
                        # 更新最後回應的識別碼
                        self._last_id = event.response.id
                if not tool_results:
                    # 沒有要送回給模型的訊息
                    # 表示已經成功生成內容
                    break
        except openai.APIError as err:
            print(f'Error:{err.body["message"]}', file=sys.stderr)
            return ''

    def find_tool_index(self, tool_type):
        for i, tool in enumerate(self.tools):
            if tool['type'] == tool_type: return i
        return -1

    def _get_prompt(self):
        prompt = ''
        for command in self._commands:
            idx = self.find_tool_index(command.tool_name)
            if idx != -1:
                prompt += f'{command.icon}'
        user_tools_count = len(self.tools) - len(prompt)
        prompt += f'(🛠️{user_tools_count})>>> '
        return prompt

    def _process_command(self, cmd):
        for command in self._commands:
            if command.handle_command(self, cmd):
                return True
        return False

    def loop(self, **kwargs) -> None:
        print("直接按 ↵ 可結束對話")
        while True:
            user_msg = input(self._get_prompt())
            if not user_msg.strip(): break # 直接 ↵ 就結束
            if self._process_command(user_msg):
                continue # 指令不需回覆，回頭讓使用者輸入
            text = ''
            display_handle = display(text, display_id=True)
            for reply in self.get_reply_text(
                user_msg,
                tools=self.tools, # 傳入要使用的工具
                **kwargs
            ):
                text += reply
                if os.path.exists(text):
                    display_handle.update(Markdown(f' text'))
                else:
                    display_handle.update(Markdown(text))

    def save(self, filename) -> None:
        with open(filename, 'wb') as f:
            pickle.dump(
                {
                    'last_id': self._last_id,
                    'tools': self.tools,
                    'commands': self._commands
                },
                f
            )

    def load(self, filename) -> None:
        with open(filename, 'rb') as f:
            data = pickle.load(f)
            self._last_id = data['last_id']
            self.tools = data['tools']
            self._commands = data['commands']

    def show_thread(self):
        if not self._last_id: return
        inputs = client.responses.input_items.list(self._last_id)
        response = client.responses.retrieve(self._last_id)
        for item in inputs.data[::-1]:
            # 略過函式叫用指示等非訊息內容
            if item.type != 'message': continue
            prompt = ">>> " if item.role == 'user' else ''
            for content in item.content:
                print(f'{prompt}{content.text}')
        print(response.output_text)

    def delete_thread(self):
        if not self._last_id: return
        last_id = self._last_id
        while last_id:
            response = client.responses.retrieve(last_id)
            last_id, curr_id = (
                response.previous_response_id,
                last_id
            )
            client.responses.delete(curr_id)
        self._last_id = None

In [None]:
chat = Chat(
    client,
    commands=[FunctionCallingCommand(
        tools=[shell_helper_tool],
        verbose=True
    )]
)

chat.loop(
    model='gpt-4.1-mini',
    stream=True,
)

### 設立防護機制

In [None]:
class SafeCommandChecker(BaseModel):
    model_config = ConfigDict(extra='forbid')
    safe: bool = Field(
        description='是否安全'
    )

In [None]:
s = SafeCommandChecker.model_validate_json('{"safe": true}')
s.safe

In [None]:
def is_safe_command(shell_command):
    client = openai.OpenAI(
        api_key=userdata.get('OPENAI_API_KEY')
    )
    json_schema = SafeCommandChecker.model_json_schema()
    response = client.responses.create(
        model = "gpt-4.1-mini",
        instructions="判斷以下 shell 指令是否安全",
        input = (
            "執行以下 shell 指令是否安全？n\n"
            "```\n"
            f"{shell_command}\n"
            "```"
        ),
        text = {
            "format": {
                "type": "json_schema",
                "name": json_schema["title"],
                "schema": json_schema
            }
        },
        store=False
    )
    return SafeCommandChecker.model_validate_json(
        response.output_text
    ).safe

In [None]:
print(is_safe_command('rm -f /*'))
print(is_safe_command('rm sample_data/test.txt'))

In [None]:
def shell_helper(comment, shell_command):

    if not is_safe_command(shell_command):
        return '指令不安全，無法執行'

    # 啟動子行程
    process = subprocess.Popen(
        shell_command,
        shell=True,             # 在 shell 中執行
        stdout=subprocess.PIPE, # 擷取標準輸出
        stderr=subprocess.PIPE, # 擷取錯誤輸出
        text=True               # 以文字形式返回
    )

    result = '執行結果：\n\n```\n'

    # 即時讀取輸出
    while True:
        output = process.stdout.readline()
        # 如果沒有輸出且行程結束
        if output == '' and process.poll() is not None:
            break
        if output:
            result += output

    result += "```"

    # 檢查錯誤輸出
    error = process.stderr.read()
    if error:
        result += f"\n\n錯誤: {error}"

    # 等待行程結束並取得返回碼
    return_code = process.wait()
    result += f"\n\n命令執行完成，返回碼: {return_code}\n\n"
    return result

In [None]:
chat = Chat(
    client,
    commands=[FunctionCallingCommand(
        tools=[shell_helper_tool],
        verbose=True
    )]
)

chat.loop(
    model='gpt-4.1-mini',
    stream=True,
)