# 实现一个AutoGPT

## 实现思路

### 构建智能体的一般性思路

![](./images/agent-overview.png)

### 主流程运行逻辑

![](./images/agent-flowchart.png)

## 框架准备

### 加载环境变量

In [39]:
import os
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(), override=True)

True

### 着色打印工具

In [2]:
from colorama import init, Fore, Back, Style
import sys

THOUGHT_COLOR = Fore.GREEN
OBSERVATION_COLOR = Fore.YELLOW
ROUND_COLOR = Fore.RED
CODE_COLOR = Fore.BLUE

def color_print(text, color=None, end="\n"):
    if color is not None:
        content = color + text + Style.RESET_ALL + end
    else:
        content = text + end
    sys.stdout.write(content)
    sys.stdout.flush()

In [3]:
# 使用示例
color_print("我现在开始思考...", color = THOUGHT_COLOR)
color_print("第一轮开始了", color = ROUND_COLOR)
color_print("def hello(): \n    print('hi')", color = CODE_COLOR)

[32m我现在开始思考...[0m
[31m第一轮开始了[0m
[34mdef hello(): 
    print('hi')[0m


## tools 定义

In [4]:
from langchain.agents import Tool
from langchain.tools import StructuredTool

### 文件处理

#### 列举文件

In [5]:
import os

def list_files_in_directory(path: str) -> str:
    """List all file names in the directory"""
    file_names = os.listdir(path)

    return "\n".join(file_names)

#### 定义 tool 函数

In [6]:
# 输出工具
directory_inspection_tool = StructuredTool.from_function(
    func=list_files_in_directory,
    name="ListDirectory",
    description="探查文件夹的内容和结构，展示它的文件名和文件夹名",
)

#### 使用示例

In [8]:
print(list_files_in_directory("./data/autogpt-demo"))
print(directory_inspection_tool)

.DS_Store
2023年8月-9月销售记录.xlsx
供应商名录.xlsx
求职简历.docx
供应商资格要求.pdf
name='ListDirectory' description='ListDirectory(path: str) -> str - 探查文件夹的内容和结构，展示它的文件名和文件夹名' args_schema=<class 'pydantic.v1.main.ListDirectorySchemaSchema'> func=<function list_files_in_directory at 0x1117e31c0>


### 查询文档

实现一个简单的RAG，以便从PDF文档或word文档中查询文本内容。

#### 准备

In [47]:
from typing import List
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.document_loaders import PyPDFLoader
from langchain_community.document_loaders import Docx2txtLoader
from langchain.chains import RetrievalQA
from langchain_openai import OpenAI, OpenAIEmbeddings

In [48]:
def get_file_extension(filename: str) -> str:
    return filename.split(".")[-1]

In [49]:
class FileLoadFactory:
    @staticmethod
    def get_loader(filename: str):
        ext = get_file_extension(filename)
        if ext == "pdf":
            return PyPDFLoader(filename)
        elif ext == "docx" or ext == "doc":
            return Docx2txtLoader(filename)
        else:
            raise NotImplementedError(f"File extension {ext} not supported.")

In [50]:
def load_docs(filename: str) -> List[Document]:
    file_loader = FileLoadFactory.get_loader(filename)
    pages = file_loader.load_and_split()
    return pages

#### 使用 RAG 查询文档

In [51]:
def ask_docment(
        filename: str,
        query: str,
) -> str:
    """根据一个PDF文档的内容，回答一个问题"""

    raw_docs = load_docs(filename)
    if len(raw_docs) == 0:
        return "抱歉，文档内容为空"
    text_splitter = RecursiveCharacterTextSplitter(
                        chunk_size=1000,
                        chunk_overlap=200,
                        length_function=len,
                        add_start_index=True,
                    )
    documents = text_splitter.split_documents(raw_docs)
    if documents is None or len(documents) == 0:
        return "无法读取文档内容"
    db = Chroma.from_documents(documents, OpenAIEmbeddings(model="text-embedding-ada-002"))
    qa_chain = RetrievalQA.from_chain_type(
        llm=OpenAI(
            temperature=0,
            model_kwargs={
                "seed": 42
            },
        ),  # 语言模型
        chain_type="stuff",  # prompt的组织方式
        retriever=db.as_retriever()  # 检索器
    )
    response = qa_chain.run(query+"(请用中文回答)")
    return response

#### 定义 tool 函数

In [52]:
# 输出工具
document_qa_tool = StructuredTool.from_function(
    func=ask_docment,
    name="AskDocument",
    description="根据一个Word或PDF文档的内容，回答一个问题。考虑上下文信息，确保问题对相关概念的定义表述完整。",
)

#### 使用示例

In [53]:
# 使用示例
filename = "./data/autogpt-demo/供应商资格要求.pdf"
query = "销售额达标的标准是多少？"
print(ask_docment(filename, query))

 月销售额达到人民币3万元。


In [323]:
# 使用示例
filename = "./data/autogpt-demo/求职简历.docx"
query = "工作经历如何？"
print(ask_docment(filename, query))

 工作经历包括先后负责蜜林等项目，并取得了阶段性成功。本人性格开朗，热情随和，做事严谨，具有较强的抗压能力，能够从容应对突发状况，推动工作有条不紊地进行。


### Excel结构探查

#### 探查Excel的sheet、列名和前N行数据

In [55]:
import pandas as pd

In [56]:
def get_sheet_names(
        filename : str
) -> str :
    """获取 Excel 文件的工作表名称"""
    excel_file = pd.ExcelFile(filename)
    sheet_names = excel_file.sheet_names
    return f"这是 '{filename}' 文件的工作表名称：\n\n{sheet_names}"

In [57]:
def get_column_names(
        filename : str
) -> str:
    """获取 Excel 文件的列名"""

    # 读取 Excel 文件的第一个工作表
    df = pd.read_excel(filename, sheet_name=0)  # sheet_name=0 表示第一个工作表
    column_names = '\n'.join(
        df.columns.to_list()
    )

    result = f"这是 '{filename}' 文件第一个工作表的列名：\n\n{column_names}"
    return result

In [58]:
def get_first_n_rows(
        filename : str,
        n : int = 3
) -> str :
    """获取 Excel 文件的前 n 行"""

    result = get_sheet_names(filename)+"\n\n"
    result += get_column_names(filename)+"\n\n"

    # 读取 Excel 文件的第一个工作表
    df = pd.read_excel(filename, sheet_name=0)  # sheet_name=0 表示第一个工作表
    n_lines = '\n'.join(
        df.head(n).to_string(index=False, header=True).split('\n')
    )

    result += f"这是 '{filename}' 文件第一个工作表的前{n}行样例：\n\n{n_lines}"
    return result

#### 定义 tool 函数

In [59]:
# 输出工具
excel_inspection_tool = StructuredTool.from_function(
    func=get_first_n_rows,
    name="InspectExcel",
    description="""
    探查表格文件的内容和结构，展示它的列名和前n行，n默认为3。
    
    使用该函数时应当准备提供filename和n两个参数，其中：
    
    - filename：要探查的Excel文件名
    - n: 默认的行数
    
    """,
)

#### 使用示例

In [60]:
print(get_first_n_rows("./data/autogpt-demo/供应商名录.xlsx"))

这是 './data/autogpt-demo/供应商名录.xlsx' 文件的工作表名称：

['Sheet1']

这是 './data/autogpt-demo/供应商名录.xlsx' 文件第一个工作表的列名：

供应商
联系人
联系人邮箱
内部对接人

这是 './data/autogpt-demo/供应商名录.xlsx' 文件第一个工作表的前3行样例：

       供应商 联系人                   联系人邮箱 内部对接人
  北京科技有限公司  张伟 xiaoming123@example.com    李华
  上海音响有限公司  李芳    lihua456@example.org    张伟
深圳创新科技有限公司  陈瑶   wangli789@example.net    王芳


### Excel数据分析

#### 准备

In [61]:
import re
from langchain.tools import StructuredTool
from langchain_core.output_parsers import BaseOutputParser

# from Utils.PythonExecUtil import execute_python_code
from langchain_openai import ChatOpenAI
from langchain_experimental.utilities import PythonREPL

#### 自定义一个OutputParse

In [62]:
class PythonCodeParser(BaseOutputParser):
    """从OpenAI返回的文本中提取Python代码。"""

    def _remove_marked_lines(self, input_str: str) -> str:
        lines = input_str.strip().split('\n')
        if lines and lines[0].strip().startswith('```'):
            del lines[0]
        if lines and lines[-1].strip().startswith('```'):
            del lines[-1]

        ans = '\n'.join(lines)
        return ans

    def parse(self, text: str) -> str:
        # 使用正则表达式找到所有的Python代码块
        python_code_blocks = re.findall(r'```python\n(.*?)\n```', text, re.DOTALL)
        # 从re返回结果提取出Python代码文本
        python_code = None
        if len(python_code_blocks) > 0:
            python_code = python_code_blocks[0]
            python_code = self._remove_marked_lines(python_code)
        return python_code

#### 定义提示语模板

In [63]:
from langchain.prompts import PromptTemplate

excel_analyser_prompt = PromptTemplate.from_template("""
你的任务是先分析，再生成代码。

请根据用户的输入，一步步分析：
（1）用户的输入是否依赖某个条件，而这个条件没有明确赋值？
（2）我是否需要对某个变量的值做假设？

如果我需要对某个变量的值做假设，请直接输出：
```python
print("我需要知道____的值，才能生成代码。请完善你的查询。") # 请将____替换为需要假设的的条件
```

否则，生成一段Python代码，分析指定文件的内容。

你可以使用的库只包括：pandas, re, math, datetime, openpyxl
确保你的代码只使用上述库，否则你的代码将无法运行。

给定文件为：
{filename}

文件内容样例：
{inspections}

你输出的Python代码前后必须有markdown标识符，如下所示：
```python
# example code
print('hello world')
```

确保你的代码是可以运行的，文件名直接写死在代码里即可。
你生成代码中所有的常量都必须来自我给你的信息或来自文件本身。不要编造任何常量。
如果常量缺失，你的代码将无法运行。你可以拒绝生成代码，但是不要生成编造的代码。
确保你生成的代码最终以print的方式输出结果(回答用户的问题)。

用户输入：
{query}
""")

#### 定义执行链

In [64]:
llm = ChatOpenAI(
        model="gpt-4-1106-preview",
        temperature=0,
        model_kwargs={"seed": 42},
    )
analysis_chain = excel_analyser_prompt | llm | PythonCodeParser()

#### 生成 python 代码并执行

In [65]:
from langchain_experimental.utilities import PythonREPL
def excel_analyse(query, filename):
    """分析一个结构化文件（例如excel文件）的内容。"""

    # columns = get_column_names(filename)
    inspections = get_first_n_rows(filename, 3)

    code = ""

    ## 打印详细信息
    color_print("\n#!/usr/bin/env python", CODE_COLOR, end="\n")

    for c in analysis_chain.stream({
        "query": query,
        "filename": filename,
        "inspections": inspections
    }):
        ## 打印详细信息
        color_print(c, CODE_COLOR, end="")
        ## 收集代码成果
        code += c

    if code:
        ans = PythonREPL().run(code)
        return ans
    else:
        return "没有找到可执行的Python代码"

#### 定义 tool 函数

In [66]:
# 输出工具
excel_analysis_tool = StructuredTool.from_function(
    func=excel_analyse,
    name="AnalyseExcel",
    description="""
        通过pandas数据处理脚本分析一个结构化文件（例如excel文件）的内容。
        输人中必须包含文件的完整路径和具体分析方式和分析依据，阈值常量等。
        如果输入信息不完整，你可以拒绝回答。
    """,
)

#### 使用示例

In [392]:
excel_analyse(query="8月销售额", filename="./data/autogpt-demo/2023年8月-9月销售记录.xlsx")

[34m
#!/usr/bin/env python[0m
[34mimport pandas as pd

# 读取Excel文件
df = pd.read_excel('./data/autogpt-demo/2023年8月-9月销售记录.xlsx')

# 确保销售日期列是日期格式
df['销售日期'] = pd.to_datetime(df['销售日期'])

# 计算销售额
df['销售额'] = df['单价(元)'] * df['销售量']

# 设置月份过滤条件为8月
start_date = pd.Timestamp(year=2023, month=8, day=1)
end_date = pd.Timestamp(year=2023, month=8, day=31)

# 过滤8月份的数据并计算总销售额
sales_august = df[(df['销售日期'] >= start_date) & (df['销售日期'] <= end_date)]['销售额'].sum()

# 打印8月份的总销售额
print(f"8月销售额为: {sales_august}元")[0m

'8月销售额为: 2605636元\n'

### 生成文档

#### 准备

In [67]:
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

#### 使用 LLM 生成文档

In [68]:
def write(query: str):
    """按用户要求生成文章"""
    template = ChatPromptTemplate.from_messages(
        [
            SystemMessagePromptTemplate.from_template("你是专业的文档写手。你根据客户的要求，写一份文档。输出中文。"),
            HumanMessagePromptTemplate.from_template("{query}"),
        ]
    )

    chain = {"query": RunnablePassthrough()} | template | ChatOpenAI() | StrOutputParser()

    return chain.invoke(query)

#### 定义 tool 函数

In [69]:
# 输出工具
document_generation_tool = StructuredTool.from_function(
    func=write,
    name="GenerateDocument",
    description="根据需求描述生成一篇正式文档",
)

#### 使用示例

In [27]:
# 示例
print(write("写一封邮件给张三，内容是：你好，我是李四。"))

亲爱的张三，

我是李四。希望你一切都好。我写这封邮件是想和你打个招呼，同时也想了解一下你最近的状况。

祝好，

李四


### 发送Email

#### 准备

In [70]:
import webbrowser
import urllib.parse
import re

#### 检查email格式是否合法

In [71]:
def _is_valid_email(email: str) -> bool:
    receivers = email.split(';')
    # 正则表达式匹配电子邮件
    pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
    for receiver in receivers:
        if not bool(re.match(pattern, receiver.strip())):
            return False
    return True

#### 触发系统调用发送邮件

In [72]:
def send_email(
        to: str,
        subject: str,
        body: str,
        cc: str = None,
        bcc: str = None,
) -> str:
    """给指定的邮箱发送邮件"""

    if not _is_valid_email(to):
        return f"电子邮件地址 {to} 不合法"

    # 对邮件的主题和正文进行URL编码
    subject_code = urllib.parse.quote(subject)
    body_code = urllib.parse.quote(body)

    # 构造mailto链接
    mailto_url = f'mailto:{to}?subject={subject_code}&body={body_code}'
    if cc is not None:
        cc = urllib.parse.quote(cc)
        mailto_url += f'&cc={cc}'
    if bcc is not None:
        bcc = urllib.parse.quote(bcc)
        mailto_url += f'&bcc={bcc}'

    webbrowser.open(mailto_url)

    return f"状态: 成功\n备注: 已发送邮件给 {to}, 标题: {subject}"

#### 定义 tool 函数

In [73]:
# 发送邮件
email_tool = StructuredTool.from_function(
    func=send_email,
    name="SendEmail",
    description="给指定的邮箱发送邮件。确保邮箱地址是xxx@xxx.xxx的格式。多个邮箱地址以';'分割。",
)

#### 使用示例

In [370]:
send_email("43801@qq.com", "hello", "happy new year!")

'状态: 成功\n备注: 已发送邮件给 43801@qq.com, 标题: hello'

### FINISH

In [74]:
finish_placeholder = StructuredTool.from_function(
    func=lambda: None,
    name="FINISH",
    description="用于表示任务完成的占位符工具"
)

## toolkit 定义

In [75]:
# 自定义工具集
tools = [
    directory_inspection_tool,
    document_qa_tool,
    document_generation_tool,
    email_tool,
    excel_inspection_tool,
    excel_analysis_tool,
    finish_placeholder,
]

In [76]:
from langchain.tools.render import render_text_description

print(render_text_description(tools))

ListDirectory: ListDirectory(path: str) -> str - 探查文件夹的内容和结构，展示它的文件名和文件夹名
AskDocument: AskDocument(filename: str, query: str) -> str - 根据一个Word或PDF文档的内容，回答一个问题。考虑上下文信息，确保问题对相关概念的定义表述完整。
GenerateDocument: GenerateDocument(query: str) - 根据需求描述生成一篇正式文档
SendEmail: SendEmail(to: str, subject: str, body: str, cc: str = None, bcc: str = None) -> str - 给指定的邮箱发送邮件。确保邮箱地址是xxx@xxx.xxx的格式。多个邮箱地址以';'分割。
InspectExcel: InspectExcel(filename: str, n: int = 3) -> str - 探查表格文件的内容和结构，展示它的列名和前n行，n默认为3。
    
    使用该函数时应当准备提供filename和n两个参数，其中：
    
    - filename：要探查的Excel文件名
    - n: 默认的行数
AnalyseExcel: AnalyseExcel(query, filename) - 通过pandas数据处理脚本分析一个结构化文件（例如excel文件）的内容。
        输人中必须包含文件的完整路径和具体分析方式和分析依据，阈值常量等。
        如果输入信息不完整，你可以拒绝回答。
FINISH: FINISH() - 用于表示任务完成的占位符工具


## 智能体核心Prompt定义

### main_prompt

主流程prompt较为复杂，可以结合局部模板或pipeline分层构建，以方便维护。<br>
下面是分布构建的过程，最后合并为最终Prompt。

#### 约束条件

In [77]:
constraints = """
1. 每次你的决策只使用一种工具，你可以使用任意多次。
2. 确保你调用的指令或使用的工具在下述给定的工具列表中。
3. 确保你的回答不会包含违法或有侵犯性的信息。
4. 如果你已经完成所有任务，确保以"FINISH"指令结束。
5. 用中文思考和输出。
6. 如果执行某个指令或工具失败，尝试改变参数或参数格式再次调用。
7. 你生成的回复必须遵循上文中给定的事实信息。不可以编造信息。DO NOT MAKE UP INFORMATION.
8. 如果得到的结果不正确，尝试更换表达方式。
9. 已经得到的信息，不要反复查询。
10. 确保你生成的动作是可以精确执行的。动作做中可以包括具体方法和目标输出。
11. 看到一个概念时尝试获取它的准确定义，并分析从哪些输入可以得到它的具体取值。
12. 生成一个自然语言查询时，请在查询中包含全部的已知信息。
13. 在执行分析或计算动作前，确保该分析或计算中涉及的所有子概念都已经得到了定义。
14. 你不可以打印一个文件的全部内容，这样的操作代价太大，且会造成不可预期的后果，是被严格禁止的。
15. 不要向用户提问。
"""

#### 任务描述

`task_description`应当是智能体每次收到的任务目标，在调用智能体循环思考和行动前动态填入。

#### 工作目录

`work_dir`是智能体的工作文件夹，所有数据、文件都会放入这个位置。

In [78]:
work_dir = "./data/autogpt-demo"

#### 工具清单

`tools`是为智能体定义的Tool列表，应当由代码自动生成并作为变量动态填入。

#### 限定资源

In [79]:
resources = """
1. 你可以查阅本地文件列表。
2. 你可以读取本地文件。
3. 你可以通过代码操作本地文件。
4. 你有非常优秀的逻辑分析能力，可以通过因果关系找到最优的解决方案。
"""

#### 表现评估

In [80]:
performance_evaluation = """
1. 尽你最大的努力，用你最好的水平，通过分析和检查，做出最好的决定。
2. 带着全局观，自我反思你计划与动作。
3. 考虑你之前的策略与决策来改善的你的计划。
4. 如果你反复得到相同的结果，修改你的计划和决策，避免死循环。
5. 如果你当前的动作无法获取到需要的信息，尝试展开关键概念的定义，再重新推理。
"""

#### 长时记忆

`long_term_memory`是智能体完成了一次任务之后的最终输出，应当由代码自动生成并作为变量动态填入。

#### 短时记忆

`short_term_memory`是智能体在经过一次思考和行动后产生的输出，应当由代码自动生成并作为变量动态填入。

#### 思考过程

In [81]:
thought_instructions = """
关键概念: 任务中涉及的组合型概念或实体。已经明确获得取值的关键概念，将其取值完整备注在概念后。
概念拆解: 将任务中的关键概念拆解为一系列待查询的子要素。每个关键概念一行，后接这个概念的子要素，每个子要素一行，行前以' -'开始。
反思:
    自我反思，观察以前的执行记录，思考概念拆解是否完整、准确。
    一步步思考是否每一个的关键概念或要素的查询都得到了准确的结果。
    反思你已经得到哪个要素/概念。你得到的要素/概念取值是否正确。从当前的信息中还不能得到哪些要素/概念。
    每个反思一行，行前以' -'开始。
思考: 观察执行记录和你的自我反思，并一步步思考
  （1）分析要素间的依赖关系，例如：
    i. 我是否需要先获得A的值/定义，才能通过A来获得B？
    ii. 如果我先获得A，是否可以通过A筛选B，减少穷举每个B的代价？
    iii. A和B是否存在在同一数据源中，我能否在获取A的同时获取B？
    iv. 是否还有更高效或更聪明的办法来查询一个概念或要素？
    v. 如果上一次尝试查询一个概念或要素时失败了，我是否可以尝试从另一个资源中再次查询？
    vi. 诸如此类，你可以扩展更多的思考 ...
  （2）根据以上分析，排列子要素间的查询优先级
  （3）找出当前需要获得取值的子要素
  注意，不要对要素的取值/定义做任何假设，确保你的信息来自给定的数据源！
推理: 根据你的反思与思考，一步步推理被选择的子要素取值的获取方式。如果前一次的计划失败了，请检查输入中是否包含每个概念/要素的明确定义，并尝试细化你的查询描述。
计划: 严格遵守以下规则，计划你的当前动作。
  （1）详细列出当前动作的执行计划。只计划一步的动作。PLAN ONE STEP ONLY!
  （2）一步步分析，包括数据源，对数据源的操作方式，对数据的分析方法。有哪些已知常量可以直接代入此次分析。
  （3）不要尝试计算文件的每一个元素，这种计算代价太高，是严格禁止的。你可以通过分析找到更有效的方法，比如条件筛选。
  （4）上述分析是否依赖某个要素的取值/定义，且该要素的取值/定义尚未获得。若果是，重新规划当前动作，确保所有依赖的要素的取值/定义都已经获得。
  （5）不要对要素的取值/定义做任何假设，确保你的信息来自给定的数据源。不要编造信息。DO NOT MAKE UP ANY INFORMATION!!!
  （6）确保你执行的动作涉及的所有要素都已获得确切的取值/定义。
  （7）如果全部子任务已完成，请用FINISH动作结束任务。
"""

#### 行动选择

`format_instructions`对LLM提出要求：按照已经定义的 Action 格式要求生成。

#### 主模板结构

In [82]:
from langchain.prompts import PromptTemplate
prompt = PromptTemplate.from_template("""
你是强大的AI助手，可以使用工具与指令自动化解决问题。

## 你必须遵循以下约束来完成任务:
{constraints}

## 你的任务是:
{task_description}
如果此任务显示“无”、“没有了”、“已完成”或类似表达，你直接输出下述工具中的FINISH即可。

## 你需要的所有文件资料都在以下目录:
dir_path={work_dir}

## 你可以使用以下工具或指令，它们又称为动作或actions:
{tools}

## 你可以使用的资源包括:
{resources}

## 你需要评估你的表现:
{performance_evaluation}

## 相关的历史记录:
{long_term_memory}

## 当前的任务执行记录:
{short_term_memory}

## 输出形式：
###（1）首先，根据以下格式说明，输出你的思考过程:
{thought_instructions}

###（2）然后，根据以下格式说明，输出你选择执行的动作/工具:
{format_instructions}
""")

In [83]:
print(prompt)

input_variables=['constraints', 'format_instructions', 'long_term_memory', 'performance_evaluation', 'resources', 'short_term_memory', 'task_description', 'thought_instructions', 'tools', 'work_dir'] template='\n你是强大的AI助手，可以使用工具与指令自动化解决问题。\n\n## 你必须遵循以下约束来完成任务:\n{constraints}\n\n## 你的任务是:\n{task_description}\n如果此任务显示“无”、“没有了”、“已完成”或类似表达，你直接输出下述工具中的FINISH即可。\n\n## 你需要的所有文件资料都在以下目录:\ndir_path={work_dir}\n\n## 你可以使用以下工具或指令，它们又称为动作或actions:\n{tools}\n\n## 你可以使用的资源包括:\n{resources}\n\n## 你需要评估你的表现:\n{performance_evaluation}\n\n## 相关的历史记录:\n{long_term_memory}\n\n## 当前的任务执行记录:\n{short_term_memory}\n\n## 输出形式：\n###（1）首先，根据以下格式说明，输出你的思考过程:\n{thought_instructions}\n\n###（2）然后，根据以下格式说明，输出你选择执行的动作/工具:\n{format_instructions}\n'


#### 构造主模板

In [84]:
main_prompt = prompt.partial(
    task_description="请帮我做一首诗",
    constraints=constraints,
    work_dir=work_dir,
    resources=resources,
    performance_evaluation=performance_evaluation,
    thought_instructions=thought_instructions,
)
print(main_prompt.input_variables)
print(main_prompt.partial_variables['constraints'])
print(main_prompt.template)

['format_instructions', 'long_term_memory', 'short_term_memory', 'tools']

1. 每次你的决策只使用一种工具，你可以使用任意多次。
2. 确保你调用的指令或使用的工具在下述给定的工具列表中。
3. 确保你的回答不会包含违法或有侵犯性的信息。
4. 如果你已经完成所有任务，确保以"FINISH"指令结束。
5. 用中文思考和输出。
6. 如果执行某个指令或工具失败，尝试改变参数或参数格式再次调用。
7. 你生成的回复必须遵循上文中给定的事实信息。不可以编造信息。DO NOT MAKE UP INFORMATION.
8. 如果得到的结果不正确，尝试更换表达方式。
9. 已经得到的信息，不要反复查询。
10. 确保你生成的动作是可以精确执行的。动作做中可以包括具体方法和目标输出。
11. 看到一个概念时尝试获取它的准确定义，并分析从哪些输入可以得到它的具体取值。
12. 生成一个自然语言查询时，请在查询中包含全部的已知信息。
13. 在执行分析或计算动作前，确保该分析或计算中涉及的所有子概念都已经得到了定义。
14. 你不可以打印一个文件的全部内容，这样的操作代价太大，且会造成不可预期的后果，是被严格禁止的。
15. 不要向用户提问。


你是强大的AI助手，可以使用工具与指令自动化解决问题。

## 你必须遵循以下约束来完成任务:
{constraints}

## 你的任务是:
{task_description}
如果此任务显示“无”、“没有了”、“已完成”或类似表达，你直接输出下述工具中的FINISH即可。

## 你需要的所有文件资料都在以下目录:
dir_path={work_dir}

## 你可以使用以下工具或指令，它们又称为动作或actions:
{tools}

## 你可以使用的资源包括:
{resources}

## 你需要评估你的表现:
{performance_evaluation}

## 相关的历史记录:
{long_term_memory}

## 当前的任务执行记录:
{short_term_memory}

## 输出形式：
###（1）首先，根据以下格式说明，输出你的思考过程:
{thought_instructions}

###（2）然后，根据以下格式

#### 使用示例

In [85]:
print(main_prompt
      .partial(task_description="请帮我找出业绩最佳的合作单位")
      .format(
          long_term_memory="",
          short_term_memory="",
          tools="",
          format_instructions="",
      )
     )


你是强大的AI助手，可以使用工具与指令自动化解决问题。

## 你必须遵循以下约束来完成任务:

1. 每次你的决策只使用一种工具，你可以使用任意多次。
2. 确保你调用的指令或使用的工具在下述给定的工具列表中。
3. 确保你的回答不会包含违法或有侵犯性的信息。
4. 如果你已经完成所有任务，确保以"FINISH"指令结束。
5. 用中文思考和输出。
6. 如果执行某个指令或工具失败，尝试改变参数或参数格式再次调用。
7. 你生成的回复必须遵循上文中给定的事实信息。不可以编造信息。DO NOT MAKE UP INFORMATION.
8. 如果得到的结果不正确，尝试更换表达方式。
9. 已经得到的信息，不要反复查询。
10. 确保你生成的动作是可以精确执行的。动作做中可以包括具体方法和目标输出。
11. 看到一个概念时尝试获取它的准确定义，并分析从哪些输入可以得到它的具体取值。
12. 生成一个自然语言查询时，请在查询中包含全部的已知信息。
13. 在执行分析或计算动作前，确保该分析或计算中涉及的所有子概念都已经得到了定义。
14. 你不可以打印一个文件的全部内容，这样的操作代价太大，且会造成不可预期的后果，是被严格禁止的。
15. 不要向用户提问。


## 你的任务是:
请帮我找出业绩最佳的合作单位
如果此任务显示“无”、“没有了”、“已完成”或类似表达，你直接输出下述工具中的FINISH即可。

## 你需要的所有文件资料都在以下目录:
dir_path=./data/autogpt-demo

## 你可以使用以下工具或指令，它们又称为动作或actions:


## 你可以使用的资源包括:

1. 你可以查阅本地文件列表。
2. 你可以读取本地文件。
3. 你可以通过代码操作本地文件。
4. 你有非常优秀的逻辑分析能力，可以通过因果关系找到最优的解决方案。


## 你需要评估你的表现:

1. 尽你最大的努力，用你最好的水平，通过分析和检查，做出最好的决定。
2. 带着全局观，自我反思你计划与动作。
3. 考虑你之前的策略与决策来改善的你的计划。
4. 如果你反复得到相同的结果，修改你的计划和决策，避免死循环。
5. 如果你当前的动作无法获取到需要的信息，尝试展开关键概念的定义，再重新推理。


## 相关的历史记录:


## 当前的任务执行记录:

### final_prompt

In [86]:
final_prompt = PromptTemplate.from_template("""
你的任务是:
{task_description}

经过以下的思考过程，你已经完成任务:
{short_term_memory}

现在请详细给出你的最终答案:
""")

In [87]:
print(final_prompt.template)


你的任务是:
{task_description}

经过以下的思考过程，你已经完成任务:
{short_term_memory}

现在请详细给出你的最终答案:



## 分步骤运行

### 确定示例任务

In [88]:
task = "9月份的销售额是多少"

### 准备执行方法

#### 智能体在每个步骤执行上下文中共享的短时记忆

In [89]:
from langchain.memory.chat_memory import BaseChatMemory
from langchain.memory import ConversationTokenBufferMemory

In [90]:
def _format_short_term_memory(memory: BaseChatMemory) -> str:
    messages = memory.chat_memory.messages
    string_messages = [messages[i].content for i in range(1,len(messages))]
    return "\n".join(string_messages)

In [91]:
# 构造一个基于Token缓存的记忆体
short_term_memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=4000,
)

# 初始化短时记忆
short_term_memory.save_context(
    {"input": "\n初始化"},
    {"output": "\n开始"}
)

#### 定义Action输出解析，提取工具函数名和参数

In [92]:
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any

class Action(BaseModel):
    name: str = Field(description="工具或指令名称")
    args: Optional[Dict[str,Any]] = Field(description="工具或指令参数，由参数名称和参数值组成")

In [93]:
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser

# 解析Action
action_output_parser = PydanticOutputParser(pydantic_object=Action)

# 实现自动纠错
robust_parser = OutputFixingParser.from_llm(parser=action_output_parser, llm=llm)

#### 定义步骤执行函数

In [94]:
def _step(short_term_memory):
    # 输出LLM结果
    response = ""
    for s in reason_chain.stream({
        "short_term_memory": _format_short_term_memory(short_term_memory),
        "long_term_memory": "",
        "tools": render_text_description(tools)
    }):
        color_print(s, THOUGHT_COLOR, end="")
        response += s

    # 输出Action
    action = action_output_parser.parse(response)
    
    return action, response

#### 定义结束处理函数

In [95]:
def _final_step(short_term_memory) -> str:
    """最后一步, 生成最终的输出"""
    response = final_chain.invoke({
        "short_term_memory": _format_short_term_memory(short_term_memory)
    })
    return response

#### 执行获得的Action

In [96]:
from langchain.tools.base import BaseTool

def _find_tool(tool_name: str) -> Optional[BaseTool]:
    for tool in tools:
        if tool.name == tool_name:
            return tool
    return None

In [97]:
from pydantic import ValidationError

def _exec_action(action: Action) -> str:
    # 查找工具
    tool = _find_tool(action.name)
    # action_expr = format_action(action)
    if tool is None:
        observation = (
            f"Error: 找不到工具或指令 '{action.name}'. "
            f"请从提供的工具/指令列表中选择，请确保按对顶格式输出。"
        )

    else:
        try:
            # 执行工具
            observation = tool.run(action.args)
        except ValidationError as e:
            # 工具的入参异常
            observation = (
                f"Validation Error in args: {str(e)}, args: {action.args}"
            )
        except Exception as e:
            # 工具执行异常
            observation = f"Error: {str(e)}, {type(e).__name__}, args: {action.args}"

    return observation

### 构造主流程中使用的chain

#### reason_chain

In [98]:
from langchain_openai import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.tools.render import render_text_description

In [99]:
llm = ChatOpenAI(
    model="gpt-4-1106-preview",
    temperature=0,
    model_kwargs={
        "seed": 42
    },
)

In [100]:
def _chinese_friendly(string) -> str:
    lines = string.split('\n')
    for i, line in enumerate(lines):
        if line.startswith('{') and line.endswith('}'):
            try:
                lines[i] = json.dumps(json.loads(line), ensure_ascii=False)
            except:
                pass
    return '\n'.join(lines)

In [101]:
action_parser = _chinese_friendly(action_output_parser.get_format_instructions())

In [102]:
reason_chain = main_prompt.partial(task_description=task, format_instructions=action_parser)|llm |StrOutputParser()
reason_chain.input_schema()

PromptInput(long_term_memory=None, short_term_memory=None, tools=None)

#### final_chain

In [103]:
final_chain = final_prompt.partial(task_description=task) | llm | StrOutputParser()
final_chain.input_schema()

PromptInput(short_term_memory=None)

### 尝试运行

<div class="alert alert-info">
<b>一步步探索：</b><br/>
    下面一步步探查智能体的每一个执行细节。
</div>

#### step1：规划行动

In [104]:
action, response = _step(short_term_memory)
print("\n\n - - - - - - ")
print(action)

[32m[0m[32m###[0m[32m 思[0m[32m考[0m[32m过[0m[32m程[0m[32m：

[0m[32m关[0m[32m键[0m[32m概[0m[32m念[0m[32m:[0m[32m 销[0m[32m售[0m[32m额[0m[32m -[0m[32m [0m[32m一个[0m[32m时间[0m[32m段[0m[32m内[0m[32m销[0m[32m售[0m[32m产品[0m[32m或[0m[32m服务[0m[32m所[0m[32m获[0m[32m得[0m[32m的[0m[32m总[0m[32m收[0m[32m入[0m[32m。

[0m[32m概[0m[32m念[0m[32m拆[0m[32m解[0m[32m:
[0m[32m-[0m[32m 销[0m[32m售[0m[32m额[0m[32m
[0m[32m [0m[32m -[0m[32m 时间[0m[32m段[0m[32m:[0m[32m 需[0m[32m要[0m[32m确定[0m[32m具[0m[32m体[0m[32m的[0m[32m时间[0m[32m范[0m[32m围[0m[32m，[0m[32m本[0m[32m任务[0m[32m中[0m[32m为[0m[32m9[0m[32m月[0m[32m份[0m[32m。
[0m[32m [0m[32m -[0m[32m 数据[0m[32m来源[0m[32m:[0m[32m 需[0m[32m要[0m[32m确定[0m[32m销[0m[32m售[0m[32m额[0m[32m数据[0m[32m存[0m[32m储[0m[32m在[0m[32m哪[0m[32m个[0m[32m文件[0m[32m或[0m[32m文件[0m[32m类型[0m[32m中[0m[32m。

[0m[32m反[0m[32m思[0m[32m:
[0m[32m-[0m[32m [0m[32m之[0m[32m

#### step1：执行动作

In [67]:
observation = _exec_action(action)

In [68]:
color_print(f"\n----\n结果:\n{observation}", OBSERVATION_COLOR)

[33m
----
结果:
.DS_Store
2023年8月-9月销售记录.xlsx
供应商名录.xlsx
求职简历.docx
供应商资格要求.pdf[0m


#### step1：记录结果

In [69]:
short_term_memory.save_context(
    {"input": response},
    {"output": "返回结果:\n" + observation}
)

#### step2：规划行动

In [70]:
action, response = _step(short_term_memory)

[32m[0m[32m###[0m[32m 思[0m[32m考[0m[32m过[0m[32m程[0m[32m：

[0m[32m关[0m[32m键[0m[32m概[0m[32m念[0m[32m:[0m[32m [0m[32m9[0m[32m月[0m[32m份[0m[32m的[0m[32m销[0m[32m售[0m[32m额[0m[32m

[0m[32m概[0m[32m念[0m[32m拆[0m[32m解[0m[32m:
[0m[32m-[0m[32m 销[0m[32m售[0m[32m额[0m[32m:[0m[32m 某[0m[32m个[0m[32m时间[0m[32m段[0m[32m内[0m[32m完成[0m[32m销[0m[32m售[0m[32m的[0m[32m总[0m[32m金额[0m[32m
[0m[32m-[0m[32m [0m[32m9[0m[32m月[0m[32m份[0m[32m:[0m[32m 表[0m[32m示[0m[32m查询[0m[32m的[0m[32m特[0m[32m定[0m[32m时间[0m[32m段[0m[32m范[0m[32m围[0m[32m

[0m[32m反[0m[32m思[0m[32m:
[0m[32m-[0m[32m 文件[0m[32m列表[0m[32m中[0m[32m有[0m[32m一个[0m[32m名[0m[32m为[0m[32m“[0m[32m202[0m[32m3[0m[32m年[0m[32m8[0m[32m月[0m[32m-[0m[32m9[0m[32m月[0m[32m销[0m[32m售[0m[32m记录[0m[32m.xlsx[0m[32m”的[0m[32m文件[0m[32m，[0m[32m这[0m[32m个[0m[32m文件[0m[32m很[0m[32m可能[0m[32m包[0m[32m含[0m[32m了[0m[32m9[0m[32m月[0

#### step2：执行动作

In [71]:
print(action)

name='InspectExcel' args={'filename': './data/autogpt-demo/2023年8月-9月销售记录.xlsx'}


In [72]:
observation = _exec_action(action)
color_print(f"\n----\n结果:\n{observation}", OBSERVATION_COLOR)

[33m
----
结果:
这是 './data/autogpt-demo/2023年8月-9月销售记录.xlsx' 文件的工作表名称：

['2023年8月-9月销售记录']

这是 './data/autogpt-demo/2023年8月-9月销售记录.xlsx' 文件第一个工作表的列名：

品类
产品名
单价(元)
销售量
销售日期
供应商

这是 './data/autogpt-demo/2023年8月-9月销售记录.xlsx' 文件第一个工作表的前3行样例：

   品类                产品名  单价(元)  销售量       销售日期        供应商
   手机       Xiaomi Mi 11   4999   20 2023-08-02   北京科技有限公司
   耳机    Sony WH-1000XM4   2999   15 2023-08-03   上海音响有限公司
笔记本电脑 Lenovo ThinkPad X1   8999   10 2023-08-05 深圳创新科技有限公司[0m


<div class="alert alert-warning">
<b>参数解析：</b><br/>
    如果想要参数解析这一步准确，就要提供较详细的工具函数参数指引，否则LLM会根据惯用的方法来推测参数名称。
</div>

#### step2：记录结果

In [73]:
short_term_memory.save_context(
    {"input": response},
    {"output": "返回结果:\n" + observation}
)

#### step3：规划行动

In [74]:
action, response = _step(short_term_memory)

[32m[0m[32m###[0m[32m 思[0m[32m考[0m[32m过[0m[32m程[0m[32m：

[0m[32m关[0m[32m键[0m[32m概[0m[32m念[0m[32m:[0m[32m [0m[32m9[0m[32m月[0m[32m份[0m[32m的[0m[32m销[0m[32m售[0m[32m额[0m[32m

[0m[32m概[0m[32m念[0m[32m拆[0m[32m解[0m[32m:
[0m[32m-[0m[32m 销[0m[32m售[0m[32m额[0m[32m:[0m[32m 某[0m[32m个[0m[32m时间[0m[32m段[0m[32m内[0m[32m的[0m[32m销[0m[32m售[0m[32m总[0m[32m金额[0m[32m
[0m[32m-[0m[32m [0m[32m9[0m[32m月[0m[32m份[0m[32m:[0m[32m 表[0m[32m示[0m[32m查询[0m[32m的[0m[32m特[0m[32m定[0m[32m时间[0m[32m段[0m[32m范[0m[32m围[0m[32m

[0m[32m反[0m[32m思[0m[32m:
[0m[32m-[0m[32m 根[0m[32m据[0m[32m文件[0m[32m探[0m[32m查[0m[32m结果[0m[32m得[0m[32m知[0m[32m，“[0m[32m202[0m[32m3[0m[32m年[0m[32m8[0m[32m月[0m[32m-[0m[32m9[0m[32m月[0m[32m销[0m[32m售[0m[32m记录[0m[32m.xlsx[0m[32m”[0m[32m文件[0m[32m包[0m[32m含[0m[32m了[0m[32m销[0m[32m售[0m[32m日期[0m[32m这[0m[32m一[0m[32m列[0m[32m，[0m[32m这[0m[

#### step3：执行动作

In [75]:
observation = _exec_action(action)
color_print(f"\n----\n结果:\n{observation}", OBSERVATION_COLOR)

[34m
#!/usr/bin/env python[0m
[34mimport pandas as pd

# 读取Excel文件
df = pd.read_excel('./data/autogpt-demo/2023年8月-9月销售记录.xlsx', sheet_name='2023年8月-9月销售记录')

# 确保销售日期为datetime类型
df['销售日期'] = pd.to_datetime(df['销售日期'])

# 设置日期范围
start_date = pd.Timestamp('2023-09-01')
end_date = pd.Timestamp('2023-09-30')

# 过滤出符合日期范围的数据
date_filtered_df = df[(df['销售日期'] >= start_date) & (df['销售日期'] <= end_date)]

# 计算总销售额
sales_sum = (date_filtered_df['单价(元)'] * date_filtered_df['销售量']).sum()

# 输出结果
print(f"在2023年9月的销售总额是：{sales_sum}元")[0m

Python REPL can execute arbitrary code. Use with caution.


[33m
----
结果:
在2023年9月的销售总额是：2851099元
[0m


#### step3：记录结果

In [76]:
short_term_memory.save_context(
    {"input": response},
    {"output": "返回结果:\n" + observation}
)

#### step4：规划行动

In [77]:
action, response = _step(short_term_memory)

[32m[0m[32m###[0m[32m 思[0m[32m考[0m[32m过[0m[32m程[0m[32m：

[0m[32m关[0m[32m键[0m[32m概[0m[32m念[0m[32m:[0m[32m [0m[32m9[0m[32m月[0m[32m份[0m[32m的[0m[32m销[0m[32m售[0m[32m额[0m[32m -[0m[32m [0m[32m285[0m[32m109[0m[32m9[0m[32m元[0m[32m

[0m[32m概[0m[32m念[0m[32m拆[0m[32m解[0m[32m:
[0m[32m-[0m[32m 销[0m[32m售[0m[32m额[0m[32m:[0m[32m 某[0m[32m个[0m[32m时间[0m[32m段[0m[32m内[0m[32m完成[0m[32m销[0m[32m售[0m[32m的[0m[32m总[0m[32m金额[0m[32m
[0m[32m-[0m[32m [0m[32m9[0m[32m月[0m[32m份[0m[32m:[0m[32m 表[0m[32m示[0m[32m查询[0m[32m的[0m[32m特[0m[32m定[0m[32m时间[0m[32m段[0m[32m范[0m[32m围[0m[32m

[0m[32m反[0m[32m思[0m[32m:
[0m[32m-[0m[32m 我[0m[32m已[0m[32m经[0m[32m使用[0m[32mAnaly[0m[32mse[0m[32mExcel[0m[32m工[0m[32m具[0m[32m计[0m[32m算[0m[32m出[0m[32m了[0m[32m9[0m[32m月[0m[32m份[0m[32m的[0m[32m销[0m[32m售[0m[32m总[0m[32m额[0m[32m。
[0m[32m-[0m[32m 结[0m[32m果[0m[32m显示[0m[32m

#### step4：捕获结果

In [82]:
print(action)

name='FINISH' args=None


In [83]:
reply = ""
if action.name == "FINISH":
    color_print(f"\n----\nFINISH", OBSERVATION_COLOR)
    reply = _final_step(short_term_memory)

[33m
----
FINISH[0m


In [84]:
print(reply)

完成任务:

9月份的销售额是 2,851,099 元。


## 在自定义循环中运行

### 实现循环框架

In [143]:
class AutoGPT:
    def run(self, task_description):
        print("等我询问GPT...")
        print("假装我经过复杂的计算过程...")
        print("FINISH")
        return "这是我的结论"

In [144]:
def launch_agent():
    human_icon = "\U0001F468"
    ai_icon = "\U0001F916"

    while True:
        task = input(f"{ai_icon}：有什么可以帮您？\n{human_icon}：")
        if task.strip().lower() == "quit":
            break
        agent = AutoGPT()
        reply = agent.run(task)
        print(f"{ai_icon}：{reply}\n")

In [145]:
agent = AutoGPT()

In [142]:
launch_agent()

🤖：有什么可以帮您？
👨： hi


等我询问GPT...
假装我经过复杂的计算过程...
FINISH
🤖：这是我的结论



🤖：有什么可以帮您？
👨： 乐乐


等我询问GPT...
假装我经过复杂的计算过程...
FINISH
🤖：这是我的结论



🤖：有什么可以帮您？
👨： quit


### 构建智能体

#### 执行链

In [146]:
from langchain_openai import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.tools.render import render_text_description

In [147]:
class AgentChain:
    action_parser = _chinese_friendly(action_output_parser.get_format_instructions())

    def __init__(self):
        self.llm = ChatOpenAI(
            model="gpt-4-1106-preview",
            temperature=0,
            model_kwargs={
                "seed": 42
            },
        )

    def get_reason_chain(self, task):
        prompt = main_prompt.partial(task_description=task, format_instructions=action_parser)
        reason_chain = prompt | self.llm | StrOutputParser()
        return reason_chain

    def get_final_chain(self, task):
        prompt = final_prompt.partial(task_description=task)
        final_chain = prompt | self.llm | StrOutputParser()
        return final_chain

#### 记忆体

In [148]:
from langchain.memory.chat_memory import BaseChatMemory
from langchain.memory import ConversationTokenBufferMemory

class AgentMemory:
    # AgentMemory所需要的 LLM 仅用于计算token，并不用于生成
    def __init__(self, llm = ChatOpenAI()):        
        # 构造一个基于Token缓存的记忆体
        self.memory = ConversationTokenBufferMemory(
            llm=llm,
            max_token_limit=4000,
        )
        
        # 初始化短时记忆
        self.memory.save_context(
            {"input": "\n初始化"},
            {"output": "\n开始"}
        )

    # 保存记忆
    def save(self, input, output):
        self.memory.save_context(input, output)

    # 提取记忆
    def load(self) -> str:
        messages = self.memory.chat_memory.messages
        string_messages = [messages[i].content for i in range(1,len(messages))]
        return "\n".join(string_messages)

#### 智能体

In [149]:
class AutoGPT():
    def __init__(self):
        # 最大思考步骤数
        self.max_thought_steps = 10
        # 链
        self.chain = AgentChain()
        # 记忆体
        self.short_memory = AgentMemory()

    def _step(self, task):
        # 输出LLM结果
        response = ""
        chain = self.chain.get_reason_chain(task)
        for s in chain.stream({
            "short_term_memory": self.short_memory.load(),
            "long_term_memory": "",
            "tools": render_text_description(tools)
        }):
            color_print(s, THOUGHT_COLOR, end="")
            response += s
    
        # 输出Action
        action = action_output_parser.parse(response)
        
        return action, response

    def _final_step(self, task) -> str:
        """最后一步, 生成最终的输出"""
        chain = self.chain.get_final_chain(task)
        response = chain.invoke({
            "short_term_memory": self.short_memory.load()
        })
        return response
        
    def run(self, task_description):
        # 实际思考步骤数
        thought_step_count = 0

        # 结论
        reply = ""

        # 思考循环
        while thought_step_count < self.max_thought_steps:
            # 规划行动
            action, response = self._step(task_description)

            # 捕获结果
            if action.name == "FINISH":
                # 如果返回的动作是 FINISH 就中断循环
                color_print(f"\n----\nFINISH", OBSERVATION_COLOR)
                reply = self._final_step(task_description)
                break
            else:
                # 否则执行动作中指定的工具
                observation = _exec_action(action)
                color_print(f"\n----\n结果:\n{observation}", OBSERVATION_COLOR)
                
                # 记录结果
                self.short_memory.save(
                    {"input": response},
                    {"output": "返回结果:\n" + observation}
                )

                # 累加思考步骤数
                thought_step_count += 1

        # 处理无法得出结论的情况
        if not reply:
            reply = "抱歉，我没能完成您的任务。"
            
        # 返回结论
        return reply

#### 运行

In [151]:
launch_agent()

🤖：有什么可以帮您？
👨： hi


[32m[0m[32m###[0m[32m 思[0m[32m考[0m[32m过[0m[32m程[0m[32m:

[0m[32m关[0m[32m键[0m[32m概[0m[32m念[0m[32m:[0m[32m [0m[32m任务[0m[32m状态[0m[32m -[0m[32m [0m[32m无[0m[32m明[0m[32m确[0m[32m任务[0m[32m描述[0m[32m，[0m[32m需要[0m[32m确认[0m[32m是否[0m[32m有[0m[32m任务[0m[32m需要[0m[32m执行[0m[32m。

[0m[32m概[0m[32m念[0m[32m拆[0m[32m解[0m[32m:
[0m[32m-[0m[32m [0m[32m任务[0m[32m状态[0m[32m:[0m[32m 是否[0m[32m存在[0m[32m未[0m[32m完成[0m[32m的[0m[32m任务[0m[32m。

[0m[32m反[0m[32m思[0m[32m:
[0m[32m-[0m[32m [0m[32m之[0m[32m前[0m[32m的[0m[32m执行[0m[32m记录[0m[32m显示[0m[32m任务[0m[32m开始[0m[32m，[0m[32m但[0m[32m没有[0m[32m具[0m[32m体[0m[32m的[0m[32m任务[0m[32m描述[0m[32m。
[0m[32m-[0m[32m 目[0m[32m前[0m[32m没有[0m[32m得[0m[32m到[0m[32m任[0m[32m何[0m[32m具[0m[32m体[0m[32m任务[0m[32m的[0m[32m信息[0m[32m。
[0m[32m-[0m[32m [0m[32m从[0m[32m当前[0m[32m的[0m[32m信息[0m[32m中[0m[32m不能[0m[32m确定[0m[32m是否[0m[32m存在[0m[

🤖：有什么可以帮您？
👨： quit


### 长期记忆

In [None]:
from langchain.memory.chat_memory import BaseChatMemory
def _format_long_term_memory(task_description: str, memory: BaseChatMemory) -> str:
    return memory.load_memory_variables(
        {"prompt": task_description}
    )["history"]

In [141]:
# 连接向量数据库
from langchain.schema import Document
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.memory import VectorStoreRetrieverMemory

In [157]:
db = Chroma.from_documents([Document(page_content="")], OpenAIEmbeddings(model="text-embedding-ada-002"))

In [158]:
# 清理langchain集合中的数据
def clear_db():
    for c in db._client.list_collections():
        if(c.name == "langchain"):
            db._client.delete_collection("langchain")
# clear_db()

In [159]:
# 将向量库转换为langchain检索器
db_retriever = db.as_retriever(
    search_kwargs={"k": 2}
)
db._client.list_collections()

[Collection(name=langchain)]

In [160]:
# 定义向量库记忆
my_term_memory = VectorStoreRetrieverMemory(
    retriever=db_retriever
)

In [161]:
# 保存到长时记忆
my_term_memory.save_context(
    {"input": "你是谁？"},
    {"output": "我的名字是乐乐"}
)

In [162]:
# 保存到长时记忆
my_term_memory.save_context(
    {"input": "你的全名是？"},
    {"output": "我叫薛一乐"}
)

In [163]:
my_term_memory.save_context(
    {"input": "你在哪里上班？"},
    {"output": "我还在上莞英实验小学读书。"}
)

In [164]:
# 提取记忆
my_term_memory.load_memory_variables({"prompt": "名字"})

{'history': 'input: 你的全名是？\noutput: 我叫薛一乐\ninput: 你是谁？\noutput: 我的名字是乐乐'}

In [105]:
db_retriever.get_relevant_documents("名字", return_docs = True)

[Document(page_content='input: 你的全名是？\noutput: 我叫薛一乐'),
 Document(page_content='input: 你是谁？\noutput: 我的名字是乐乐')]

## 在langchain智能体架构中运行

### Tools

In [105]:
# 自定义工具集
tools = [
    directory_inspection_tool,
    document_qa_tool,
    document_generation_tool,
    email_tool,
    excel_inspection_tool,
    excel_analysis_tool,
    finish_placeholder,
]

### 输出解析

In [106]:
# from pydantic import BaseModel, Field
# from typing import List, Optional, Dict, Any

from langchain.agents.agent import AgentOutputParser

class Action(AgentOutputParser):
    name: str = Field(description="工具或指令名称")
    args: Optional[Dict[str,Any]] = Field(description="工具或指令参数，由参数名称和参数值组成")

In [107]:
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser

# 解析Action
action_output_parser = PydanticOutputParser(pydantic_object=Action)

# 实现自动纠错
robust_parser = OutputFixingParser.from_llm(parser=action_output_parser, llm=llm)

### 提示语

In [108]:
from langchain.prompts import PromptTemplate
prompt = PromptTemplate.from_template("""
你是强大的AI助手，可以使用工具与指令自动化解决问题。

## 你必须遵循以下约束来完成任务:
{constraints}

## 你的任务是:
{task_description}
如果此任务显示“无”、“没有了”、“已完成”或类似表达，你直接输出下述工具中的FINISH即可。

## 你需要的所有文件资料都在以下目录:
dir_path={work_dir}

## 你可以使用以下工具或指令，它们又称为动作或actions:
{tools}

## 你可以使用的资源包括:
{resources}

## 你需要评估你的表现:
{performance_evaluation}

## 相关的历史记录:
{long_term_memory}

## 当前的任务执行记录:
{agent_scratchpad}

## 输出形式：
###（1）首先，根据以下格式说明，输出你的思考过程:
{thought_instructions}

###（2）然后，根据以下格式说明，输出你选择执行的动作/工具:
{format_instructions}
""")

In [109]:
main_prompt = prompt.partial(
    long_term_memory="",
    constraints=constraints,
    work_dir=work_dir,
    resources=resources,
    performance_evaluation=performance_evaluation,
    thought_instructions=thought_instructions,
    tools=render_text_description(tools),
    format_instructions=_chinese_friendly(action_output_parser.get_format_instructions())
)

In [110]:
print(main_prompt.input_variables)

['agent_scratchpad', 'task_description']


### 定义Agent链

In [111]:
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.tools.render import render_text_description
from langchain.agents.format_scratchpad import format_log_to_str

In [112]:
llm = ChatOpenAI(
    model="gpt-4-1106-preview",
    temperature=0,
    model_kwargs={
        "seed": 42
    },
)

In [113]:
main_prompt.input_variables

['agent_scratchpad', 'task_description']

In [114]:
RunnablePassthrough.assign(
    task_description=lambda x: x["input"],
    agent_scratchpad=lambda x: format_log_to_str.parse(x["intermediate_steps"])
)

RunnableAssign(mapper={
  task_description: RunnableLambda(lambda x: x['input']),
  agent_scratchpad: RunnableLambda(lambda x: format_log_to_str.parse(x['intermediate_steps']))
})

In [125]:
agent = (
    RunnablePassthrough.assign(
        task_description=lambda x: x["input"],
        agent_scratchpad=lambda x: (x["intermediate_steps"]) # intermediate_steps是agent_executor要求的参数
    )
    | main_prompt
    | llm
    | robust_parser
)

In [126]:
# 看看应该准备什么样的输入
agent.first.input_schema()

RunnableParallel<task_description,agent_scratchpad>Input(input=None, intermediate_steps=None)

In [118]:
agent.invoke({"input": "9月份销售额", "intermediate_steps": ""})

Action(name='ListDirectory', args={'path': './data/autogpt-demo'})

In [127]:
from langchain.agents import AgentExecutor

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

In [None]:
agent_executor.invoke({"input": "9月份销售额"})

## 提问示例

- 9月份的销售额是多少
- 销售总额最大的产品是什么
- 帮我找出销售额不达标的供应商
- 给这两家供应商发一封邮件通知此事
- 对比8月和9月销售情况，写一份报告