# 最佳实践-基于Assistant API的Multi-Agent示例


## 1. 简介
本文介绍一个基于Assistant API构建的Multi-Agent应用，针对金融场景检索最新时事，并总结分析相关投资建议。
在构建具体的Agent应用的过程中，用户可以自定义多种AgentModule类来完成一系列子任务，并根据需要实现复杂、灵活的应用编排。以下提供一种示范性的编排实例。

### 1.1. 依赖功能
* Assistant API的基础API参数生成能力。
* Assistant API的Function calling能力。

### 1.2. 前期准备
* 已开通百炼服务：[开通阿里云百炼大模型服务产品](https://help.aliyun.com/document_detail/2586399.html?spm=a2c4g.2784257.0.i6)。
* 已创建API_KEY: [获取API-KEY](https://help.aliyun.com/document_detail/2712195.html)。
* 已安装最新版SDK：[安装SDK](https://help.aliyun.com/document_detail/2712193.html)。

## 2. 实现代码

### 2.1. 设置环境变量
安装环境依赖，设置您的API-KEY，替换YOUR_DASHSCOPE_API_KEY为您自己的API key。

In [1]:
# 安装dashscope SDK。
!pip3 install dashscope



In [2]:
# 通过环境变量设置API-KEY
%env DASHSCOPE_API_KEY=YOUR_API_KEY

env: DASHSCOPE_API_KEY=sk-0MOEpyHAR1


### 2.2. 具体实现

#### 2.2.1. Step 1: 创建Agent类

首先，定义基类AgentModule，及其子类APIAssistantAgent，对各个API接口顺序实现封装。

用户通过name、description、model、instructions以及tools、functions等参数创建APIAssistantAgent。

用户通过functions参数为APIAssistantAgent传入function-call可能调用的函数实体，在APIAssistantAgent.query()中自动识别、调用函数并向AssistantAPI上传结果。

In [4]:
from typing import Dict, List
import json
import dashscope
from http import HTTPStatus
import sys

In [11]:
def verify_status_code(res):
    if res.status_code != HTTPStatus.OK:
        print('Failed: ')
        print(res)
        sys.exit(res.status_code)


class AgentModule:
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description

    def __call__(self, query: str):
        return self.query(query)

    def query(self, query: str):
        pass


class APIAssistantAgent(AgentModule):
    """
    Assistant with LLM capability, which is built with Assistant API.
    """

    def __init__(self, name: str = "",
                 description: str = "",
                 model: str = "",
                 instructions: str = "",
                 tools: List = [],
                 functions: Dict = {},
                 assistant_id: str = None):
        super().__init__(name, description)
        # create a new assistant
        if not assistant_id:
            self.assistant = dashscope.Assistants.create(
                model=model,
                name=name,
                description=description,
                instructions=instructions,
                tools=tools,
            )
            verify_status_code(self.assistant)
        else:  # reload the assistant via id
            self.assistant = dashscope.Assistants.retrieve(assistant_id=assistant_id)
            self.name, self.description = self.assistant.name, self.assistant.description

        self.assistant_id = self.assistant.id
        self.funcs = functions
        self.thread = dashscope.Threads.create()
        verify_status_code(self.thread)

    def _forward_and_submit_outputs(self, run_status):
        """get the function name, run and return the function output to the server"""
        func_obj = run_status.required_action.submit_tool_outputs.tool_calls[0].function
        func_name = func_obj.name
        arguments = json.loads(func_obj.arguments)
        tool_outputs = [{
            'tool_call_id':
                run_status.required_action.submit_tool_outputs.tool_calls[0].id,
            'output': json.dumps(self.funcs[func_name](**arguments))
        }]
        responses = dashscope.Runs.submit_tool_outputs(run_status.id,
                                                       thread_id=self.thread.id,
                                                       tool_outputs=tool_outputs,
                                                       stream=True)
        content_str = ""
        for event, run in responses:
            if event == "thread.message.delta":
                content_str += run.delta.content.text.value
                print(content_str, flush=True)
        return

    def query(self, query: str):
        """
        query: string, the query string to the assistant. e.g. Who is the Jack Chou ?
        """
        message = dashscope.Messages.create(self.thread.id, content=query)
        response = dashscope.Runs.create(self.thread.id, assistant_id=self.assistant.id, stream=True)

        content_str = ""
        for event, message_run in response:
            if event == "thread.message.delta":
                content_str += message_run.delta.content.text.value
                print(content_str, flush=True)

        run_status = dashscope.Runs.wait(message_run.id, thread_id=self.thread.id)
        if run_status.required_action:
            self._forward_and_submit_outputs(run_status)
            run_status = dashscope.Runs.wait(run_status.id, thread_id=self.thread.id)

        msgs = dashscope.Messages.list(self.thread.id)
        answer = json.loads(json.dumps(msgs, default=lambda o: o.__dict__))['data'][0]['content'][0]['text']['value']
        return answer


#### 2.2.2. Step 2: 编排多个Agent实现特定应用

用户可以将多个Agent进行编排，实现特定的应用。

编排后的应用本身也作为一个Agent类，以供更上层的应用编排。

以下提供一个智能财经助手的例子，通过顺序编排新闻检索专家与新闻分析专家来进行实现。

In [12]:
class SequentialAgent(AgentModule):
    """
    用于顺序编排的Agent Module
    """

    def __init__(self, name: str, description: str, sequential: List, ):
        """
        sequential: List[assistant_object or function]
        """
        super().__init__(name, description)
        self.sequential = sequential

    def query(self, query: str):
        msg = query
        for node in self.sequential:
            msg = node(msg)
        return msg

In [13]:
news_args = {"name": "新闻检索专家",
             "description": "新闻检索专家",
             "instructions": "你是一个新闻检索专家。请根据用户提供的信息，适当使用工具查询与其相关的新闻资讯。请务必查询真实的新闻。",
             "model": "qwen-max",
             "tools": [{
                 'type': 'quark_search'
             }]}
news_assistant = APIAssistantAgent(**news_args)

analytic_args = {"name": "公司新闻分析专家",
                 "description": "一个公司新闻分析专家",
                 "instructions": "你是一个公司新闻分析专家，请根据用户提供的新闻，对相关公司进行分析并给出适当的详细的投资建议。",
                 "model": "qwen-max",
                 }
analytic_assistant = APIAssistantAgent(**analytic_args)

finance_assistant = SequentialAgent(name="财经助手", description="一个财经助手",
                                    sequential=[news_assistant, analytic_assistant])

print(finance_assistant.name)
print(finance_assistant("你对SpaceX怎么看?"))

财经助手
Space
SpaceX
SpaceX是
SpaceX是Space Exploration Technologies Corp.
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和太空运输公司。这家公司以其创新性和
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和太空运输公司。这家公司以其创新性和颠覆性技术闻名，特别是在可重复
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和太空运输公司。这家公司以其创新性和颠覆性技术闻名，特别是在可重复使用的运载火箭技术方面，这
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和太空运输公司。这家公司以其创新性和颠覆性技术闻名，特别是在可重复使用的运载火箭技术方面，这显著降低了太空发射成本。以下是我
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和太空运输公司。这家公司以其创新性和颠覆性技术闻名，特别是在可重复使用的运载火箭技术方面，这显著降低了太空发射成本。以下是我整理的关于SpaceX的一些关键看点
SpaceX是Space Exploration Technologies Corp.的简称，由企业家埃隆·马斯克于2002年创立，是一家美国航天制造商和太空运输公司。这家公司以其创新性和颠覆性技术闻名，特别是在可重复使