# LangChain自定义Agent开发

## 功能介绍
本文档展示了使用LangChain框架开发自定义智能体(Agent)的关键步骤和技术。该Agent能够通过使用工具来回答用户问题，并保持对话上下文。

## 核心组件

1. **工具定义**
   - 单词计数工具：计算文本中的单词数量
   - 天气查询工具：基于心知天气API获取实时天气

2. **提示词模板**
   - 使用JSON格式化输出结构
   - 包含工具使用和直接回答两种输出模式

3. **Agent结构**
   - 使用`RunnablePassthrough.assign`组合工作流
   - `MessagesPlaceholder`维护对话历史和工作记忆
   - 基于JSON格式的输出解析器

4. **交互式界面**
   - 用户输入处理
   - 聊天历史记录维护
   - 异常处理和错误恢复

## 常见问题及解决方案

1. **JSON解析错误**
   - 增强解析器识别能力，支持多种格式文本
   - 添加错误捕获和回退机制

2. **上下文维护问题**
   - 每次查询重新创建AgentExecutor
   - 避免传递重复参数

3. **聊天历史管理**
   - 正确使用全局变量
   - 在错误情况下清理历史记录

## 最佳实践

1. 明确的提示词格式化
2. 健壮的错误处理
3. 干净的状态管理
4. 有效的聊天历史维护

这个实现展示了如何将LLM、工具使用和交互式界面结合，创建功能完备的聊天智能体。

In [38]:

import os
from typing import List
import requests
from langchain.agents import tool,AgentExecutor
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate,MessagesPlaceholder
from langchain.tools.render import render_text_description
from langchain.agents.json_chat.prompt import TEMPLATE_TOOL_RESPONSE
from langchain_core.messages import AIMessage,BaseMessage,HumanMessage
from langchain.schema.runnable import RunnablePassthrough
from langchain.agents.agent import AgentOutputParser
from langchain_core.output_parsers.json import parse_json_markdown
from langchain_core.exceptions import OutputParserException
from langchain_core.agents import AgentAction,AgentFinish

# 初始化大模型
llm = ChatOpenAI(
    model= "qwen-turbo",
    api_key= os.getenv("API_KEY"),
    base_url= os.getenv("API_BASEURL")
)

# 定义需要使用的工具
# 工具一,计算输入文本的单词数量
@tool
def get_word_count(text: str) -> int:
    """计算输入文本的单词数量"""
    return len(text.split())

# 工具二,获取天气数据
@tool
def get_weather(location):
        """根据城市获取天气数据"""
        api_key = "SqRLyH57b4IUtiH1W"
        url = f"https://api.seniverse.com/v3/weather/now.json?key={api_key}&location={location}&language=zh-Hans&unit=c"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            print(f"API返回的结果：{data}")
            # 这里定义返回结构
            weather = {
                "text" : data["results"][0]["now"]["text"],
                "temperature" :data["results"][0]["now"]["temperature"]
            }
            return weather
        else:
            raise Exception(f"天气API调用失败：{response.status_code}")

# 定义工具列表
tools = [get_word_count,get_weather]

# 定义提示词模板
promptTemplete = """
你是一个智能助手，可以回答用户的问题，也可以执行一些任务。
您可以使用以下工具来帮忙解决问题,如果已经知道了答案,也可以直接回答:
{tools}
回复格式说明
--------------------------------------
必须使用以下JSON格式之一作为回复:

选项1:如果您希望使用工具,请使用此选项。
```json
{{
"reason": "使用工具的原因",
"action": "工具名称",  // 必须是{tool_names}之一
"action_input": "工具的输入"
}}
```

选项2:如果您已经知道答案,请使用此选项。
```json
{{
"action": "Final Answer",
"answer": "您的最终答案"
}}
```

用户的输入
--------------------------------------
这是用户的输入(请记住通过单个选项,以JSON模式格式化的回复内容,不要回复其他内容):
{input}

"""

# 历史记录存储列表
chat_history = []

# 创建提示词
prompt = ChatPromptTemplate.from_messages(
    [
        ("system","你是智能助手，可以使用工具来完成任务和回答问题"),
        # 保存用户的输入和AI的输出存入历史记录
        MessagesPlaceholder(variable_name="chat_history"),
        ("user", promptTemplete),
        # 记录Agent工作流程中的思维链
        MessagesPlaceholder(variable_name="agent_scratchpad")
    ]
)

# 将工具列表和工具名称添加到提示词中
prompt = prompt.partial(
    tools=render_text_description(tools),
    tool_names=", ".join([t.name for t in tools])
)

# 最终提示词
print(f"\n\n\n最终提示词: {prompt} \n\n\n")

# 将日志转换为消息
def format_log_to_messages(query, intermediate_steps, template_tool_response):
    """将日志转换为消息"""
    thoughts: List[BaseMessage] = []
    for action, observation in intermediate_steps:
        print(f"\n\n\n处理中间步骤 - Action类型: {type(action)}\n\n\n")
        # 检查action是什么类型的对象，并相应地处理
        if hasattr(action, "log"):
            # 如果action有log属性
            action_content = action.log
        elif hasattr(action, "tool_input"):
            # 如果是AgentAction类型
            action_content = f"Tool: {action.tool}\nInput: {action.tool_input}"
        elif isinstance(action, dict):
            # 如果是字典
            action_content = str(action)
        else:
            # 如果是字符串或其他类型
            action_content = str(action)
            
        thoughts.append(AIMessage(content=action_content))
        thoughts.append(HumanMessage(content=template_tool_response.format(input=query, observation=observation)))
    return thoughts


# 定义JSON格式化输出解析器
class JsonAgentOutputParser(AgentOutputParser):
    """JSON格式化输出解析器"""
    def parse(self,text):
        try:
            # 如果文本包含```json，提取JSON部分
            if "```json" in text:
                import re
                pattern = r"```json\s*([\s\S]*?)\s*```"
                match = re.search(pattern, text)
                if match:
                    text = match.group(1)
            
            # 尝试处理直接生成的最终答案（没有JSON格式但直接回答了问题）
            if not text.strip().startswith("{") and not "```" in text:
                return AgentFinish(
                    return_values={"output": text.strip()},
                    log=text
                )
            
            response = parse_json_markdown(text)
            
            if isinstance(response,list):
                print(f"大模型返回多个结果{response},只取第一个结果: {response[0]}")
                response = response[0]
            if response.get("action") == "Final Answer":
                return AgentFinish(
                    return_values={"output": response["answer"]},
                    log=text
                )
            else:
                return AgentAction(response.get("action",""),response.get("action_input",{}),log=text)
        except OutputParserException as e:
            print(f"解析错误: {str(e)}\n收到的文本: '{text}'")
            # 如果文本看起来像是直接的回答而不是JSON，就直接返回
            if len(text.strip()) > 10 and not text.strip().startswith("{"):
                return AgentFinish(
                    return_values={"output": text.strip()},
                    log=text
                )
            return AgentFinish(
                return_values={"output": "无法解析模型响应，请重试。"},
                log=f"解析错误: {str(e)}\n文本: {text}"
                )
    
    @property
    def _type(self) -> str:
        return "json_agent_output_parser"

# 定义Agent
agent = (
    RunnablePassthrough.assign(agent_scratchpad= lambda x: format_log_to_messages(x["input"],x.get("intermediate_steps",[]),TEMPLATE_TOOL_RESPONSE))
    | prompt
    | llm
    | JsonAgentOutputParser()
)

# 添加用户输入方法
def get_user_input():
    """获取用户输入并处理"""
    global chat_history  # 确保使用全局变量
    print("\n智能助手已准备就绪。输入'exit'退出。")
    
    while True:
        user_input = input("\n请输入您的问题: ")
        if user_input.lower() == 'exit':
            print("感谢使用，再见！")
            break
            
        try:
            # 添加用户消息到历史记录
            chat_history.append(HumanMessage(content=user_input))
            
            # 每次查询创建一个全新的AgentExecutor，确保状态干净
            agent_executor = AgentExecutor(
                agent=agent, 
                tools=tools, 
                verbose=True,
                max_iterations=4,  # 限制最大迭代次数
                early_stopping_method="generate"  # 使用生成式早停方法
            )
            
            # 调用agent_executor处理用户输入
            result = agent_executor.invoke({
                "input": user_input,
                "chat_history": chat_history
            })
            
            print(f"\n\n\nResult: {result}\n\n\n")
            
            # 检查结果是否有效
            if "output" in result and result["output"]:
                # 输出结果
                print(f"\n助手回答: {result['output']}")
                
                # 添加AI回复到历史记录
                chat_history.append(AIMessage(content=result['output']))
            else:
                # 处理空结果情况
                print("\n助手未能生成回答，请重试。")
                # 移除最后添加的用户消息，避免重复
                if chat_history:
                    chat_history.pop()
                                
        except Exception as e:
            print(f"发生错误: {str(e)}")

# 调用用户输入方法
if __name__ == "__main__":
    get_user_input()





最终提示词: input_variables=['agent_scratchpad', 'chat_history', 'input'] input_types={'chat_history': list[typing.Annotated[typing.Union[typing.Annotated[langchain_core.messages.ai.AIMessage, Tag(tag='ai')], typing.Annotated[langchain_core.messages.human.HumanMessage, Tag(tag='human')], typing.Annotated[langchain_core.messages.chat.ChatMessage, Tag(tag='chat')], typing.Annotated[langchain_core.messages.system.SystemMessage, Tag(tag='system')], typing.Annotated[langchain_core.messages.function.FunctionMessage, Tag(tag='function')], typing.Annotated[langchain_core.messages.tool.ToolMessage, Tag(tag='tool')], typing.Annotated[langchain_core.messages.ai.AIMessageChunk, Tag(tag='AIMessageChunk')], typing.Annotated[langchain_core.messages.human.HumanMessageChunk, Tag(tag='HumanMessageChunk')], typing.Annotated[langchain_core.messages.chat.ChatMessageChunk, Tag(tag='ChatMessageChunk')], typing.Annotated[langchain_core.messages.system.SystemMessageChunk, Tag(tag='SystemMessageChunk')], typing.A