<a href="https://colab.research.google.com/github/Huangjian2013/ai-demo/blob/main/code/02-swarm-be.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
!pip install openai --quiet


In [11]:
from openai import OpenAI
from pydantic import BaseModel
from typing import Optional
import json
from google.colab import userdata

client = OpenAI(api_key = userdata.get('REAL_OPENAI_KEY'))

In [13]:
#########################
#### 第1次迭代
#########################
# Customer Service Routine

system_message = (
    "你是我们公司的的客户支持代理。"
    "始终在一句话内作答。"
    "按照以下流程与用户互动："
    "1. 首先，提问以更深入了解用户的问题。"
    " - 除非用户已经提供了解释。"
    "2. 提出解决方法（自行编造）。"
    "3. 仅在用户不满意的情况下，提供退款选项。"
    "4. 如果用户接受，搜索ID并执行退款。"
    ""
)

def run_full_turn(system_message, messages):
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "system", "content": system_message}] + messages,
    )
    message = response.choices[0].message
    messages.append(message)

    if message.content: print("Assistant:", message.content)

    return message


messages = []
while True:
    user = input("User: ")
    if (user == "exit"): break
    messages.append({"role": "user", "content": user})

    run_full_turn(system_message, messages)


User: 我的杯子坏了
Assistant: 您能告诉我杯子坏了的具体情况吗？
User: 碎了
Assistant: 您是否能告诉我杯子是如何破损的？是否是在运输过程中，还是使用时损坏的？
User: 我要退款
Assistant: 我可以为您处理退款，您能提供与订单相关的ID吗？
User: 8978
Assistant: 请稍等，我正在处理您的退款请求。
User: exit


In [None]:
#########################
#### 第2次迭代调用tools
#########################

In [None]:
# 将函数转换成可以传给LLM的方法定义
import inspect

def function_to_schema(func) -> dict:
    type_map = {
        str: "string",
        int: "integer",
        float: "number",
        bool: "boolean",
        list: "array",
        dict: "object",
        type(None): "null",
    }

    try:
        signature = inspect.signature(func)
    except ValueError as e:
        raise ValueError(
            f"Failed to get signature for function {func.__name__}: {str(e)}"
        )

    parameters = {}
    for param in signature.parameters.values():
        try:
            param_type = type_map.get(param.annotation, "string")
        except KeyError as e:
            raise KeyError(
                f"Unknown type annotation {param.annotation} for parameter {param.name}: {str(e)}"
            )
        parameters[param.name] = {"type": param_type}

    required = [
        param.name
        for param in signature.parameters.values()
        if param.default == inspect._empty
    ]

    return {
        "type": "function",
        "function": {
            "name": func.__name__,
            "description": (func.__doc__ or "").strip(),
            "parameters": {
                "type": "object",
                "properties": parameters,
                "required": required,
            },
        },
    }


In [14]:
# 测试function_to_schema的输出
def sample_function(param_1, param_2, the_third_one: int, some_optional="John Doe"):
    """
    This is my docstring. Call this function when you want.
    """
    print("Hello, world")

schema =  function_to_schema(sample_function)
print(json.dumps(schema, indent=2))

{
  "type": "function",
  "function": {
    "name": "sample_function",
    "description": "This is my docstring. Call this function when you want.",
    "parameters": {
      "type": "object",
      "properties": {
        "param_1": {
          "type": "string"
        },
        "param_2": {
          "type": "string"
        },
        "the_third_one": {
          "type": "integer"
        },
        "some_optional": {
          "type": "string"
        }
      },
      "required": [
        "param_1",
        "param_2",
        "the_third_one"
      ]
    }
  }
}


In [15]:
def look_up_item(search_query):
    """用来查找Item ID
    可以与描述或者关键词查找."""

    # return hard-coded item ID - in reality would be a lookup
    print("执行查找Item ID操作")
    return "item_132612938"


def execute_refund(item_id, reason="not provided"):
    """用来退款.
    """
    print("执行退款操作：Summary:", item_id, reason) # lazy summary
    return "success"

tools = [execute_refund, look_up_item]


def run_full_turn(system_message, tools, messages):

    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # turn python functions into tools and save a reverse map
        tool_schemas = [function_to_schema(tool) for tool in tools]
        tools_map = {tool.__name__: tool for tool in tools}

        # === 1. get openai completion ===
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": system_message}] + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print assistant response
            print("Assistant:", message.content)

        if not message.tool_calls:  # if finished handling tool calls, break
            break

        # === 2. handle tool calls ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. return new messages =====
    return messages[num_init_messages:]


def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # call corresponding function with provided arguments
    return tools_map[name](**args)


messages = []
while True:
    user = input("User: ")
    if (user == "exit"): break
    messages.append({"role": "user", "content": user})

    new_messages = run_full_turn(system_message, tools, messages)
    messages.extend(new_messages)

User: 我的杯子坏了
Assistant: 请问杯子是在哪种情况下坏的？
User: 破裂了
Assistant: 我们可以为您提供一个替换方案，或者如果您需要，可以选择退款。您更倾向于哪种解决方法？
User: 我要退款
Assistant: 请您提供杯子的具体名称或描述，以便我查找并处理退款。
User: 9878
Assistant: execute_refund({'item_id': '9878', 'reason': '破裂'})
执行退款操作：Summary: 9878 破裂
Assistant: 您的退款已成功处理。如有其他问题，请随时告诉我！
User: exit


In [None]:
#########################
#### 第3次迭代手工执行agent
#########################

In [16]:
class Agent(BaseModel):
    name: str = "Agent"
    model: str = "gpt-4o-mini"
    instructions: str = "你是一个乐于助人的智能体"
    tools: list = []

def run_full_turn_V1(agent, messages):

    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # turn python functions into tools and save a reverse map
        tool_schemas = [function_to_schema(tool) for tool in agent.tools]
        tools_map = {tool.__name__: tool for tool in agent.tools}

        # === 1. get openai completion ===
        response = client.chat.completions.create(
            model=agent.model,
            messages=[{"role": "system", "content": agent.instructions}] + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print assistant response
            print("Assistant:", message.content)

        if not message.tool_calls:  # if finished handling tool calls, break
            break

        # === 2. handle tool calls ===

        for tool_call in message.tool_calls:
            result = execute_tool_call_v1(tool_call, tools_map)

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. return new messages =====
    return messages[num_init_messages:]


def execute_tool_call_v1(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # call corresponding function with provided arguments
    return tools_map[name](**args)

def execute_refund_V1(item_name):
    print("开始执行execute_refund_V1 " + item_name)
    return "退款成功"

refund_agent = Agent(
    name="Refund Agent",
    instructions="你是一个退款智能体，直接进行退款.",
    tools=[execute_refund_V1],
)

def custom_service_V1(item_name):
    return "客户服务成功"

custom_service_agent = Agent(
    name="Custom Service Assistant",
    instructions="""
    您是我们公司的的客户支持代理。
    始终在一句话内作答。
    按照以下流程与用户互动：
    1. 首先，提问以更深入了解用户的问题。
     - 除非用户已经提供了解释。
    2. 提出解决方法（自行编造）。
    3. 仅在用户不满意的情况下，提供退款选项。
    4. 如果用户接受，直接进行退款。
    """,
    tools=[custom_service_V1],
)


messages = []
user_query = "我的杯子坏了."
print("User:", user_query)
messages.append({"role": "user", "content": user_query})

response = run_full_turn_V1(custom_service_agent, messages) # sales assistant
messages.extend(response)


user_query = "我想要退款." # implitly refers to the last item
print("User:", user_query)
messages.append({"role": "user", "content": user_query})
response = run_full_turn_V1(refund_agent, messages) # refund agent



User: 我的杯子坏了.
Assistant: 请问杯子是如何坏的？
User: 我想要退款.
Assistant: execute_refund_V1({'item_name': '杯子'})
开始执行execute_refund_V1 杯子
Assistant: 您的退款已成功处理。请查收相关金额。


In [None]:
#########################
#### 第4次迭代自动执行Agent
#########################

In [17]:
def execute_refund_V2(item_name):
    print("开始执行execute_refund_V2 " + item_name)
    return "退款成功"

refund_agent_v2 = Agent(
    name="Refund Agent",
    instructions="你是一个退款智能体，直接进行退款.",
    tools=[execute_refund_V2],
)

def transfer_to_refunds_v2():
    print("开始执行transfer_to_refunds_v2")
    return refund_agent_v2

def custom_service_V2(item_name):
    return "客户服务成功"

custom_service_agent_v2 = Agent(
    name="Custom Service Assistant",
    instructions="""
    您是我们公司的的客户支持代理。
    始终在一句话内作答。
    按照以下流程与用户互动：
    1. 首先，提问以更深入了解用户的问题。
     - 除非用户已经提供了解释。
    2. 提出解决方法（自行编造）。
    3. 仅在用户不满意的情况下，提供退款选项。
    4. 如果用户接受，直接进行退款。
    """,
    tools=[custom_service_V2, transfer_to_refunds_v2],
)

class Response(BaseModel):
    agent: Optional[Agent]
    messages: list

def run_full_turn_v2(agent, messages):

    current_agent = agent
    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # turn python functions into tools and save a reverse map
        tool_schemas = [function_to_schema(tool) for tool in current_agent.tools]
        tools = {tool.__name__: tool for tool in current_agent.tools}

        # === 1. get openai completion ===
        response = client.chat.completions.create(
            model=agent.model,
            messages=[{"role": "system", "content": current_agent.instructions}]
            + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print agent response
            print(f"{current_agent.name}:", message.content)

        if not message.tool_calls:  # if finished handling tool calls, break
            break

        # === 2. handle tool calls ===

        for tool_call in message.tool_calls:
            result = execute_tool_call_v2(tool_call, tools, current_agent.name)

            if type(result) is Agent:  # if agent transfer, update current agent
                print (f"从 {current_agent.name} 转移到 {result.name}")
                current_agent = result
                result = (
                    f"Transfered to {current_agent.name}. Adopt persona immediately."
                )

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. return last agent used and new messages =====
    return messages[num_init_messages:]


def execute_tool_call_v2(tool_call, tools, agent_name):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"{agent_name}:", f"{name}({args})")

    return tools[name](**args)  # call corresponding function with provided arguments


messages = []
while True:
    user = input("User: ")
    if (user == "exit"): break
    messages.append({"role": "user", "content": user})

    new_messages = run_full_turn_v2(custom_service_agent_v2, messages)
    messages.extend(new_messages)


User: 我的杯子破了
Custom Service Assistant: 您能告诉我杯子破裂的具体情况吗？是运输过程中损坏还是其他原因？
User: 我要退款
Custom Service Assistant: transfer_to_refunds_v2({})
开始执行transfer_to_refunds_v2
从 Custom Service Assistant 转移到 Refund Agent
Refund Agent: execute_refund_V2({'item_name': '杯子'})
开始执行execute_refund_V2 杯子
Refund Agent: 您的退款申请已经成功处理。请注意查看您的账户，以确认款项已到账。如果还有其他问题，请随时告诉我！
User: exit


In [None]:
#########################
#### 第5次迭代SWarm
#########################

In [None]:
!pip install git+https://github.com/openai/swarm.git --quiet


In [23]:
from swarm import Swarm, Agent
import openai
openai.api_key = userdata.get('REAL_OPENAI_KEY')


def execute_refund_V3(item_name):
    print("开始执行execute_refund_V3 " + item_name)
    return "退款成功"

refund_agent_v3 = Agent(
    name="Refund Agent",
    instructions="你是一个退款智能体，直接进行退款.",
    functions=[execute_refund_V3],
)

def transfer_to_refunds_v3():
    print("开始执行transfer_to_refunds_v3")
    return refund_agent_v3

def custom_service_V2(item_name):
    return "客户服务成功"

custom_service_agent_v3 = Agent(
    name="Custom Service Assistant",
    instructions="""
    您是我们公司的的客户支持代理。
    始终在一句话内作答。
    按照以下流程与用户互动：
    1. 首先，提问以更深入了解用户的问题。
     - 除非用户已经提供了解释。
    2. 提出解决方法（自行编造）。
    3. 仅在用户不满意的情况下，提供退款选项。
    4. 如果用户接受，直接进行退款。
    """,
    functions=[custom_service_V2, transfer_to_refunds_v3],
)


client = Swarm(client=openai)

messages = []
while True:
    user = input("User: ")
    if (user == "exit"): break
    messages.append({"role": "user", "content": user})

    response = client.run(
        agent=custom_service_agent_v3,
        messages=messages,
    )

    print(response.messages[-1]["content"])

User: 我的杯子坏了
请问您希望我们如何帮助您处理这个问题？
User: 破裂了
很抱歉听到您的杯子破裂了。您是否愿意使用我们的定制服务来修复您的杯子，或者选择退款？
User: 退款
开始执行transfer_to_refunds_v3
开始执行execute_refund_V3 杯子
您的杯子退款已成功处理。
User: exit
