In [None]:
#清理显存
import torch
if torch.cuda.is_available():
    device = torch.cuda.current_device()
    print(f"Emptying gpu cache {device}...")
    with torch.cuda.device(device):
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()

In [None]:
import langchain
langchain.debug = True

import os
os.environ["SERPAPI_API_KEY"] = "e4751cedfe6633b2a83aa11c1a0b5659187b691af718d28f377f4190a29959d8"
os.environ["OPENAI_API_BASE"] = "http://localhost:8123/v1"
os.environ["OPENAI_API_KEY"] = "iwillbeback"

https://python.langchain.com/docs/modules/agents/how_to/custom_llm_agent

In [None]:
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser
from langchain.prompts import StringPromptTemplate
from langchain import SerpAPIWrapper, LLMChain
from typing import List, Union
from langchain.schema import AgentAction, AgentFinish, OutputParserException
from pprint import pprint
import re


加载 ChatGlm2 模型

In [None]:
from transformers import AutoTokenizer, AutoModel
# 最新的 162b620e 模型本地加载报错, 上一版本可以正常加载
# 自动从 huggingface 来下来的模型存储在  ~/.cache/huggingface/ 下, 可以复制到别的机器上使用
model_id = "THUDM/chatglm2-6b"
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModel.from_pretrained(model_id, trust_remote_code=True, device='cuda')
model = model.eval()

In [None]:
# https://github.com/hwchase17/langchain/discussions/6969
from typing import Any, List
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM
class Glm2LLM(LLM):
    model: object = None
    tokenizer: object = None
    max_token: int = 10000
    temperature: float = 0.01
    top_p = 0.9
    history = []

    @property
    def _llm_type(self) -> str:
        return "ChatGLM2"
    
    def _call(self, prompt: str, stop: List[str] | None = None, run_manager: CallbackManagerForLLMRun | None = None, **kwargs: Any) -> str:
        response, _ = self.model.chat(
            self.tokenizer,
            prompt,
            history=self.history,
            max_length=self.max_token,
            temperature=self.temperature
        )
        self.history = self.history + [[prompt, response]]
        return response

In [None]:
llm = Glm2LLM(model=model, tokenizer=tokenizer)

使用 Serp 来进行 google 搜索

In [None]:
# Define which tools the agent can use to answer user queries
search = SerpAPIWrapper()
tools = [
    Tool(
        name = "搜索",
        func=search.run,
        description="执行搜索",
    )
]

这是官方示例, 主要针对 GPT, GPT 可以理解并很好的执行, GLM 无法理解, 会自行编造结果
详细见: https://zhuanlan.zhihu.com/p/635724707

In [None]:
# https://python.langchain.com/docs/modules/agents/how_to/custom_llm_agent

template = """请你回答问题. 你可以使用以下工具:

{tools}

使用以下格式输出:

问题: 你必须回答的问题
思考: 你的思考过程
动作: 从工具 [{tool_names}] 中选取一项执行动作
输入: 执行动作的输入参数
观察: 执行动作的结果
... (思考/动作/输入/观察 可以重复多次)
思考: 我得到最终答案了
答案: 问题的最终答案

现在开始回答问题.

问题: {input}
{agent_scratchpad}"""

class CustomPromptTemplate(StringPromptTemplate):
    # The template to use
    template: str
    # The list of tools available
    tools: List[Tool]

    def format(self, **kwargs) -> str:
        # Get the intermediate steps (AgentAction, Observation tuples)
        # Format them in a particular way
        intermediate_steps = kwargs.pop("intermediate_steps")
        thoughts = ""
        for action, observation in intermediate_steps:
            thoughts += action.log
            thoughts += f"\n观察: {observation}\n思考: "
        # Set the agent_scratchpad variable to that value
        kwargs["agent_scratchpad"] = thoughts
        # Create a tools variable from the list of tools provided
        kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
        # Create a list of tool names for the tools provided
        kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools])
        ret = self.template.format(**kwargs)
        print(f"Template: {ret}")
        return ret
    
prompt = CustomPromptTemplate(
    template=template,
    tools=tools,
    # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically
    # This includes the `intermediate_steps` variable because that is needed
    input_variables=["input", "intermediate_steps"]
)

In [None]:
class CustomOutputParser(AgentOutputParser):

    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
        print(f"LLM output: {llm_output}")
        # Check if agent should finish
        if "答案:" in llm_output:
            return AgentFinish(
                # Return values is generally always a dictionary with a single `output` key
                # It is not recommended to try anything else at the moment :)
                return_values={"output": llm_output.split("答案:")[-1].strip()},
                log=llm_output,
            )
        # Parse out the action and action input
        regex = r"动作\s*\d*\s*:(.*?)\n输入:[\s]*(.*)"
        match = re.search(regex, llm_output, re.DOTALL)

        pprint(match)

        if not match:
            # raise OutputParserException(f"Could not parse LLM output: `{llm_output}`")
            return AgentFinish(
                # Return values is generally always a dictionary with a single `output` key
                # It is not recommended to try anything else at the moment :)
                return_values={"output": llm_output.strip()},
                log=llm_output,
            )
        action = match.group(1).strip()
        action_input = match.group(2)
        # Return the action and action input
        return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)
    
output_parser = CustomOutputParser()

In [None]:
from langchain.memory import ConversationBufferWindowMemory
# LLM chain consisting of the LLM and a prompt
llm_chain = LLMChain(llm=llm, prompt=prompt)
tool_names = [tool.name for tool in tools]
agent = LLMSingleActionAgent(
    llm_chain=llm_chain,
    output_parser=output_parser,
    stop=["\n观察:"],
    allowed_tools=tool_names,
)
# memory = ConversationBufferWindowMemory(k=2)
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True)

In [None]:

# https://github.com/jayli/langchain-GLM_Agent/tree/main
glm_template = """
你现在是一个{role}。这里是一些已知信息：
{background_infomation}
{question_guide}：{input}

{answer_format}
"""

glm_tools = [
        Tool.from_function(
            func=search.run,
            name="DeepSearch",
            description=""
        )
    ]

class GlmCustomPromptTemplate(StringPromptTemplate):
    template: str
    tools: List[Tool]

    def format(self, **kwargs) -> str:
        # pprint(kwargs)
        intermediate_steps = kwargs.pop("intermediate_steps")
        # 没有互联网查询信息
        if len(intermediate_steps) == 0:
            background_infomation = "\n"
            role = "傻瓜机器人"
            question_guide = "我现在有一个问题"
            answer_format = "如果你知道答案，请直接给出你的回答！如果你不知道答案，请你只回答\"DeepSearch('搜索词')\"，并将'搜索词'替换为你认为需要搜索的关键词，除此之外不要回答其他任何内容。\n\n下面请回答我上面提出的问题！"

        # 返回了背景信息
        else:
            # 根据 intermediate_steps 中的 AgentAction 拼装 background_infomation
            background_infomation = "\n\n你还有这些已知信息作为参考：\n\n"
            action, observation = intermediate_steps[0]
            background_infomation += f"{observation}\n"
            role = "聪明的 AI 助手"
            question_guide = "请根据这些已知信息回答我的问题"
            answer_format = ""

        kwargs["background_infomation"] = background_infomation
        kwargs["role"] = role
        kwargs["question_guide"] = question_guide
        kwargs["answer_format"] = answer_format
        return self.template.format(**kwargs)
    
glm_cpt = GlmCustomPromptTemplate(template=glm_template, tools=glm_tools, input_variables=["input", "intermediate_steps"])

class GlmCustomOutputParser(AgentOutputParser):
    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:

        # print(f"GlmCustomOutput: {llm_output}")
        # group1 = 调用函数名字
        # group2 = 传入参数
        match = re.match(r'^[\s\w]*(DeepSearch)\(([^\)]+)\)', llm_output, re.DOTALL)

        # 如果 llm 没有返回 DeepSearch() 则认为直接结束指令
        if not match:
            return AgentFinish(
                return_values={"output": llm_output.strip()},
                log=llm_output,
            )
        # 否则的话都认为需要调用 Tool
        else:
            action = match.group(1).strip()
            action_input = match.group(2).strip()
            return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)

glm_cop = GlmCustomOutputParser()
glm_chain = LLMChain(llm=llm, prompt=glm_cpt)
glm_agent = LLMSingleActionAgent(llm_chain=glm_chain, output_parser=glm_cop, stop=["\nObservation:"], allowed_tools=[tool.name for tool in glm_tools])

agent_executor = AgentExecutor.from_agent_and_tools(agent=glm_agent, tools=glm_tools, verbose=True)
# print(agent_executor.run(related_content="", input="请问近期携程有什么大的新闻", tool_name="DeepSearch"))


In [None]:
agent_executor.run("截至2023年7月, 金山办公的市值是多少?")

In [None]:
agent_executor.run("今天北京的天气怎样?")