# Form Agents


## Setup

I will use different models to create different agents. The models are:
- `gpt-4o-mini` run in OpenAI
- `qwen2.5:7b` run in ollama

In [1]:
import os
from dotenv import load_dotenv
from getpass import getpass

# Load environment variables from .env file
load_dotenv()

# Get API base URL and model with default values
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
OLLAMA_API_BASE = os.getenv("OLLAMA_API_BASE", "http://localhost:11434/v1")
OLLAMA_API_KEY = os.getenv("OLLAMA_API_KEY", "ollama")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")

# Set up OpenAI API configuration
# Try to get API key from environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# If not found, ask user to input it
if not OPENAI_API_KEY:
    OPENAI_API_KEY = getpass("Enter your OpenAI API key:")

from agents import OpenAIChatCompletionsModel, AsyncOpenAI

gpt4om_model= OpenAIChatCompletionsModel(
    model="gpt-4o-mini",
    openai_client=AsyncOpenAI(api_key=OPENAI_API_KEY),
)

qwen_model= OpenAIChatCompletionsModel(
    model="qwen2.5:7b",
    openai_client=AsyncOpenAI(base_url=OLLAMA_API_BASE,api_key=OLLAMA_API_KEY),
)


In [2]:
import json
from agents import function_tool

def get_form_schema() -> list:
    """获取表单数据的结构定义"""
    return [
        {
            "name": "username",
            "type": "string",
            "required": True,
            "description": "用户名，长度需在3-20个字符之间",
            "value": None,
        },
        {
            "name": "age",
            "type": "number",
            "required": False,
            "description": "年龄，请输入正整数",
            "value": None,
        },
        {
            "name": "email",
            "type": "string",
            "required": True,
            "description": "电子邮箱地址，用于接收通知",
            "value": None,
        }
    ]

def ensure_form_file_exists(file_path: str) -> bool:
    """
    确保表单数据文件存在，如果不存在则创建
    """
    try:
        if not os.path.exists(file_path):
            print(f"表单数据文件不存在: {file_path}，创建新文件")
            # 确保目录存在
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            # 创建空的表单数据
            empty_form_data = get_form_schema()
            # 写入空数据到文件
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(empty_form_data, f, ensure_ascii=False, indent=2)
        return True
    except Exception as e:
        print(f"创建表单数据文件出错: {e}")
        return False

@function_tool
def get_form_data_tool() -> str:
    """
    读取用户已填写的表单数据
    """
    file_path = "./form_data.json"
    try:
        if not ensure_form_file_exists(file_path):
            return get_form_schema()

        with open(file_path, 'r', encoding='utf-8') as f:
            form_data = json.load(f)
            # return json.dumps(form_data, indent=2)
            return form_data
    except json.JSONDecodeError:
        return(f"JSON格式错误: {file_path}")
    except Exception as e:
        return(f"读取表单数据出错: {e}")

@function_tool
def update_form_field_tool(field_name:str, field_value:str) -> str:
    """
    更新表单数据中的一个字段数据

    Args:
        field_name (str): 表单字段名称
        field_value (str): 表单字段值
    
    Returns:
        更新后的表单数据
    """
    file_path = "./form_data.json"
    try:
        # 确保文件存在
        if not ensure_form_file_exists(file_path):
            return
            
        with open(file_path, 'r+', encoding='utf-8') as f:
            form_data = json.load(f)
            # 查找要更新的字段
            for field in form_data:
                if field['name'] == field_name:
                    # 根据字段类型进行适当的转换
                    if field['type'] == 'number':
                        try:
                            field['value'] = int(field_value)
                        except ValueError:
                            try:
                                field['value'] = float(field_value)
                            except ValueError:
                                return(f"警告: 无法将 '{field_value}' 转换为数字类型，保持原始字符串")
                                # field['value'] = field_value
                    else:  # 字符串或其他类型
                        field['value'] = field_value
                    break
            f.seek(0)
            json.dump(form_data, f, ensure_ascii=False, indent=2)
            f.truncate()
        return f"更新表单数据成功: {field_name} = {field_value}"
    except Exception as e:
        return(f"更新表单数据出错: {e}")

In [3]:
from agents import Agent, handoff
from agents.extensions.handoff_prompt import prompt_with_handoff_instructions

getdata_agent = Agent(
    name="查询表单数据专员",
    instructions="""
    你是一个表单查询专跋山涉水。你帮助客户查询他们已填写的表单数据。
    如果客户询问表单数据，请直接返回表单数据。
    如果客户想了解还需要填写哪些字段，请从查询表单数据中没有完成的字段并询问客户想要填写哪个字段。
    """,
    tools=[get_form_data_tool],
    model=qwen_model,
)

update_agent= Agent(
    name="更新表单数据专员",
    instructions="""
    你是一个表单更新专员。你帮助客户更新他们已填写的表单数据。
    如果更新成功，告诉客户更新成功，并询问客户是否需要更新其他字段。
    如果更新失败，告诉客户失败原因，并提供正确的填写指导。
    """,
    tools=[update_form_field_tool, get_form_data_tool],
    model=qwen_model,
)

# 设置交接
transfer_to_getdata_specialist= handoff(
    agent=getdata_agent,
    tool_name_override="transfer_to_getdata_specialist",
    tool_description_override="当客户需要查询或了解表单数据时使用些工具。例如：'开始填写表单'、'我想填写表单'、'请给我表单'、'表单在哪里'、'能告诉我xxx是什么吗？'",
)

transfer_to_update_specialist= handoff(
    agent=update_agent,
    tool_name_override="transfer_to_update_specialist",
    tool_description_override="当客户需要更新表单数据时使用些工具。例如：'更新xxx为yyy'、'我想把xxx改成yyy'、'请帮我修改xxx为yyy'",
)
    

main_agent = Agent(
    name="客服前台",
    instructions=prompt_with_handoff_instructions("""
    你是一个智能客服助手，你的工作是了解客户需求并将他们引导到合适的专业客服。请根据以下明确的指引决定如何处理客户的请求：
                                                  
    1. 查询表单类问题：
      - 示例：'开始填写表单'、'我想填写表单'、'请给我表单'、'表单在哪里'、'能告诉我xxx是什么吗？'
      - 操作：使用transfer_to_getdata_specialist工具
                                                  
    2. 更新表单类问题：
      - 示例："更新xxx为yyy"、"我想把xxx改成yyy"、"请帮我修改xxx为yyy"
      - 操作：使用transfer_to_update_specialist工具
                                                  
    重要规则：
    - 请严格按照上述分类选择适当的交接工具
    - 不要过度解读客户意图，根据客户明确表达的需求选择工具
    - 如果不确定，先询问更多信息，而不是急于交接
    - 首次交流时，除非客户明确表达了投诉或退款需求，否则应该先用get_form_data_tool处理
    - 如果客户的请求不符合上述分类，请礼貌地告知客户并结束对话
""")
)

In [None]:
from agents import Runner, RunResultStreaming
from openai.types.responses import (
    ResponseTextDeltaEvent,
    ResponseFunctionCallArgumentsDeltaEvent,
)

async def show_streaming_response(
    response: RunResultStreaming,
) -> None:
    async for event in response.stream_events():
        if event.type == "raw_response_event": 
            if isinstance(event.data, ResponseFunctionCallArgumentsDeltaEvent):
                # this is streamed parameters for our tool call
                print(event.data.delta, end="", flush=True)
            elif isinstance(event.data, ResponseTextDeltaEvent):
                # this is streamed final answer tokens
                print(event.data.delta, end="", flush=True)
        elif event.type == "agent_updated_stream_event":
            # this tells us which agent is currently in use
            print(f"> Current Agent: {event.new_agent.name}")
        elif event.type == "run_item_stream_event":
            # these are events containing info that we'd typically
            # stream out to a user or some downstream process
            if event.name == "tool_called":
                # this is the collection of our _full_ tool call after our tool
                # tokens have all been streamed
                print()
                print(f"> Tool Called, name: {event.item.raw_item.name}")
                print(f"> Tool Called, args: {event.item.raw_item.arguments}")
            elif event.name == "tool_output":
                # this is the response from our tool execution
                print(f"> Tool Output: {event.item.raw_item['output']}")
        else:
            print(event)

async def handle_customer_query(query: str) -> None:
    response = Runner.run_streamed(main_agent,query)
    await show_streaming_response(response)

In [6]:

await handle_customer_query("我想填写表单")

> Current Agent: 客服前台
我将为您转接到相关的专业客服，稍等一下。