## 使用langgraph构建代码助手
******
- 针对特定问题参考材料迭代式的生成代码
- 从用户指定的一组文档开始
- 使用长上下文 LLM 来提取它并执行 RAG 来回答基于它的问题
- 调用一个工具来生成结构化输出
- 将解决方案返回给用户之前，将执行两个单元测试（检查导入和代码执行）
![](langgraph4.png)

In [6]:
! pip install -U langchain_community langchain-openai langchain-deepseek langchain langgraph bs4 langchain-anthropic

Collecting langchain-anthropic
  Downloading langchain_anthropic-0.3.10-py3-none-any.whl.metadata (1.9 kB)
Collecting anthropic<1,>=0.49.0 (from langchain-anthropic)
  Using cached anthropic-0.49.0-py3-none-any.whl.metadata (24 kB)
Downloading langchain_anthropic-0.3.10-py3-none-any.whl (25 kB)
Using cached anthropic-0.49.0-py3-none-any.whl (243 kB)
Installing collected packages: anthropic, langchain-anthropic
Successfully installed anthropic-0.49.0 langchain-anthropic-0.3.10

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


#### 加载LCEL文档
****

In [18]:
from bs4 import BeautifulSoup as Soup
from langchain_community.document_loaders.recursive_url_loader import RecursiveUrlLoader

# LCEL docs
url = "https://python.langchain.com/docs/concepts/lcel/"
loader = RecursiveUrlLoader(
    url=url, max_depth=20, extractor=lambda x: Soup(x, "html.parser").text
)
docs = loader.load()

d_sorted = sorted(docs, key=lambda x: x.metadata["source"])
d_reversed = list(reversed(d_sorted))
concatenated_content = "\n\n\n --- \n\n\n".join(
    [doc.page_content for doc in d_reversed]
)

设置大模型

In [19]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_deepseek import ChatDeepSeek
from pydantic import BaseModel, Field
import os

### OpenAI

# 评分提示
code_gen_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """你是一位精通LCEL（LangChain表达式语言）的编程助手。\n 
    这里是LCEL文档的完整集合：\n ------- \n  {context} \n ------- \n 请根据
    上述提供的文档回答用户问题。确保你提供的任何代码都可以执行，\n 
    包含所有必要的导入和已定义的变量。请按照以下结构组织你的回答：首先描述代码解决方案，\n
    然后列出导入语句，最后给出功能完整的代码块。以下是用户问题：""",
        ),
        ("placeholder", "{messages}"),
    ]
)


# 数据模型
class code(BaseModel):
    """LCEL问题代码解决方案的模式。"""

    prefix: str = Field(description="问题和解决方案的描述")
    imports: str = Field(description="代码块导入语句")
    code: str = Field(description="不包括导入语句的代码块")


llm = ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
)
code_gen_chain_oai = code_gen_prompt | llm.with_structured_output(code)
question = "如何在LCEL中构建RAG链？"
solution = code_gen_chain_oai.invoke(
    {"context": concatenated_content, "messages": [("user", question)]}
)
solution


code(prefix='在LCEL中构建RAG（Retrieval Augmented Generation）链通常涉及以下步骤：1) 使用检索器从向量存储中获取相关文档，2) 将这些文档与用户输入结合生成提示，3) 将提示传递给LLM生成最终答案。以下是一个完整的示例代码，展示了如何构建一个简单的RAG链。', imports='from langchain_core.runnables import RunnablePassthrough\nfrom langchain_core.prompts import ChatPromptTemplate\nfrom langchain_core.output_parsers import StrOutputParser\nfrom langchain_community.vectorstores import FAISS\nfrom langchain_community.embeddings import OpenAIEmbeddings\nfrom langchain_community.llms import OpenAI', code='# 假设已经有一个向量存储（FAISS）\nvectorstore = FAISS.from_texts(\n    ["LangChain is a framework for building applications with LLMs.", "RAG stands for Retrieval Augmented Generation."],\n    embedding=OpenAIEmbeddings()\n)\nretriever = vectorstore.as_retriever()\n\n# 定义提示模板\nprompt = ChatPromptTemplate.from_template(\n    """Answer the question based only on the following context:\n    {context}\n\n    Question: {question}\n    """\n)\n\n# 定义LLM\nllm = OpenAI()\n\n# 构建RAG链\nrag_chain = (\n    {"conte

In [20]:
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate

### Anthropic

# 强制使用工具的提示
code_gen_prompt_claude = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """<instructions> 你是一位精通LCEL（LangChain表达式语言）的编程助手。\n 
    这里是LCEL文档：\n ------- \n  {context} \n ------- \n 请根据上述提供的文档回答用户问题。\n 
    确保你提供的任何代码都可以执行，包含所有必要的导入和已定义的变量。\n
    按以下结构组织你的回答：1) 描述代码解决方案的前言，2) 导入语句，3) 功能完整的代码块。\n
    调用code工具来正确构建输出。</instructions> \n 以下是用户问题：""",
        ),
        ("placeholder", "{messages}"),
    ]
)


# 大语言模型
# 注意选择使用claude-3-opus-20240229模型,因为其上下文窗口更大
expt_llm = "claude-3-opus-20240229"
llm = ChatAnthropic(
    model=expt_llm,
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
    base_url=os.environ.get("ANTHROPIC_BASE_URL"),
    default_headers={"anthropic-beta": "tools-2024-04-04"},
)

structured_llm_claude = llm.with_structured_output(code, include_raw=True)


# 可选：检查工具使用是否出现错误
def check_claude_output(tool_output):
    """检查解析错误或未能调用工具"""

    # 解析错误
    if tool_output["parsing_error"]:
        # 报告输出和解析错误
        print("解析错误！")
        raw_output = str(tool_output["raw"].content)
        error = tool_output["parsing_error"]
        raise ValueError(
            f"解析输出时出错！请确保调用工具。输出：{raw_output}。\n 解析错误：{error}"
        )

    # 未调用工具
    elif not tool_output["parsed"]:
        print("未能调用工具！")
        raise ValueError(
            "你没有使用提供的工具！请确保调用工具来构建输出。"
        )
    return tool_output


# 带输出检查的链
code_chain_claude_raw = (
    code_gen_prompt_claude | structured_llm_claude | check_claude_output
)


def insert_errors(inputs):
    """在消息中插入工具解析错误"""

    # 获取错误
    error = inputs["error"]
    messages = inputs["messages"]
    messages += [
        (
            "assistant",
            f"重试。你需要修复解析错误：{error} \n\n 你必须调用提供的工具。",
        )
    ]
    return {
        "messages": messages,
        "context": inputs["context"],
    }


# 这将作为后备链运行
fallback_chain = insert_errors | code_chain_claude_raw
N = 3  # 最大重试次数
code_gen_chain_re_try = code_chain_claude_raw.with_fallbacks(
    fallbacks=[fallback_chain] * N, exception_key="error"
)


def parse_output(solution):
    """当我们在结构化输出中添加'include_raw=True'时，
    它将返回一个包含'raw'、'parsed'、'parsing_error'的字典。"""

    return solution["parsed"]


# 可选：带重试功能，用于纠正未能调用工具的情况
code_gen_chain = code_gen_chain_re_try | parse_output

# 不重试
code_gen_chain = code_gen_prompt_claude | structured_llm_claude | parse_output


测试

In [21]:
# Test
question = "在LCEL中如何构建一个RAG链?"
solution = code_gen_chain.invoke(
    {"context": concatenated_content, "messages": [("user", question)]}
)
solution

code(prefix='要在LCEL中构建一个基本的RAG（Retrieval Augmented Generation）链，可以这样做：\n\n1. 使用一个向量存储作为检索器(Retriever)，用于检索与查询最相关的文档。\n2. 将检索器与一个文档转换器(DocumentTransformer)结合，将检索到的文档格式化为字符串。 \n3. 使用一个提示模板(PromptTemplate)将查询和格式化后的文档组合成一个提示。\n4. 将提示作为输入传给一个语言模型(LanguageModel)，生成最终的回答。\n\n下面是具体的代码实现：', imports='from langchain_core.runnables import RunnableSequence\nfrom langchain_core.retrievers import VectorStoreRetriever\nfrom langchain_core.transformers import TextSplitter, DocumentTransformer  \nfrom langchain_core.prompts import PromptTemplate\nfrom langchain.chat_models import ChatOpenAI', code='# 假设已经有一个向量存储vs\nretriever = VectorStoreRetriever.from_langchain(vs)\n\n# 文档转换器，将文档分割并格式化为字符串\nsplitter = TextSplitter(chunk_size=1000, chunk_overlap=0) \ndoc_transformer = DocumentTransformer(text_splitter=splitter)\n\n# 定义提示模板\ntemplate = """基于以下背景信息回答问题:\n{context}  \n\n问题: {question}\n"""\nprompt = PromptTemplate.from_format(template)\n\n# 定义语言模型\nllm = ChatOpenAI()\n\n# 组装RAG链\nrag_chain = retriever | do

定义state

In [22]:
from typing import List
from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    表示我们图的状态。

    属性：
        error : 用于控制流的二进制标志，表示是否触发了测试错误
        messages : 包含用户问题、错误消息、推理过程
        generation : 代码解决方案
        iterations : 尝试次数
    """

    error: str
    messages: List
    generation: str
    iterations: int


创建节点

In [23]:
### 参数

# 最大尝试次数
max_iterations = 3
# 反思
# flag = 'reflect'
flag = "do not reflect"  # 不进行反思

### 节点


def generate(state: GraphState):
    """
    生成代码解决方案

    参数：
        state (dict): 当前图状态

    返回：
        state (dict): 向状态添加新键，generation
    """

    print("---正在生成代码解决方案---")

    # 状态
    messages = state["messages"]
    iterations = state["iterations"]
    error = state["error"]

    # 我们因错误被重新路由到生成
    if error == "yes":
        messages += [
            (
                "user",
                "现在，请重试。调用code工具来构建包含前言、导入和代码块的输出：",
            )
        ]

    # 解决方案
    code_solution = code_gen_chain.invoke(
        {"context": concatenated_content, "messages": messages}
    )
    messages += [
        (
            "assistant",
            f"{code_solution.prefix} \n 导入: {code_solution.imports} \n 代码: {code_solution.code}",
        )
    ]

    # 增加计数
    iterations = iterations + 1
    return {"generation": code_solution, "messages": messages, "iterations": iterations}


def code_check(state: GraphState):
    """
    检查代码

    参数：
        state (dict): 当前图状态

    返回：
        state (dict): 向状态添加新键，error
    """

    print("---正在检查代码---")

    # 状态
    messages = state["messages"]
    code_solution = state["generation"]
    iterations = state["iterations"]

    # 获取解决方案组件
    imports = code_solution.imports
    code = code_solution.code

    # 检查导入
    try:
        exec(imports)
    except Exception as e:
        print("---代码导入检查：失败---")
        error_message = [("user", f"你的解决方案未通过导入测试：{e}")]
        messages += error_message
        return {
            "generation": code_solution,
            "messages": messages,
            "iterations": iterations,
            "error": "yes",
        }

    # 检查执行
    try:
        exec(imports + "\n" + code)
    except Exception as e:
        print("---代码块检查：失败---")
        error_message = [("user", f"你的解决方案未通过代码执行测试：{e}")]
        messages += error_message
        return {
            "generation": code_solution,
            "messages": messages,
            "iterations": iterations,
            "error": "yes",
        }

    # 无错误
    print("---无代码测试失败---")
    return {
        "generation": code_solution,
        "messages": messages,
        "iterations": iterations,
        "error": "no",
    }


def reflect(state: GraphState):
    """
    对错误进行反思

    参数：
        state (dict): 当前图状态

    返回：
        state (dict): 向状态添加新键，generation
    """

    print("---正在生成代码解决方案---")

    # 状态
    messages = state["messages"]
    iterations = state["iterations"]
    code_solution = state["generation"]

    # 提示反思

    # 添加反思
    reflections = code_gen_chain.invoke(
        {"context": concatenated_content, "messages": messages}
    )
    messages += [("assistant", f"以下是对错误的反思：{reflections}")]
    return {"generation": code_solution, "messages": messages, "iterations": iterations}


### 边缘


def decide_to_finish(state: GraphState):
    """
    确定是否结束。

    参数：
        state (dict): 当前图状态

    返回：
        str: 要调用的下一个节点
    """
    error = state["error"]
    iterations = state["iterations"]

    if error == "no" or iterations == max_iterations:
        print("---决定：完成---")
        return "end"
    else:
        print("---决定：重新尝试解决方案---")
        if flag == "reflect":
            return "reflect"
        else:
            return "generate"


创建工作流

In [24]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

workflow.add_node("generate", generate)  # generation solution
workflow.add_node("check_code", code_check)  # check code
workflow.add_node("reflect", reflect)  # reflect

# Build graph
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "check_code")
workflow.add_conditional_edges(
    "check_code",
    decide_to_finish,
    {
        "end": END,
        "reflect": "reflect",
        "generate": "generate",
    },
)
workflow.add_edge("reflect", "generate")
app = workflow.compile()

测试

In [25]:
question = "如何在runnable中传递原始的输入?"
solution = app.invoke({"messages": [("user", question)], "iterations": 0, "error": ""})

---正在生成代码解决方案---
---正在检查代码---
---代码导入检查：失败---
---决定：重新尝试解决方案---
---正在生成代码解决方案---
---正在检查代码---
---代码导入检查：失败---
---决定：重新尝试解决方案---
---正在生成代码解决方案---
---正在检查代码---
---代码块检查：失败---
---决定：完成---


In [26]:
solution["generation"]

code(prefix='要在LCEL链中传递原始的输入，可以使用一个lambda函数。lambda函数可以将输入直接传递给后面的Runnable。\n\n使用lambda函数和其他Runnable组合，可以在链中传递原始输入。例如:', imports='from langchain_core.runnables import RunnableSequence, RunnableLambda', code='identity_lambda = RunnableLambda(lambda x: x)\nother_runnable = SomeOtherRunnable()\n\n# 使用pipe操作符组合\nchain = identity_lambda | other_runnable\n\n# 或者显式使用RunnableSequence \n# chain = RunnableSequence([identity_lambda, other_runnable])\n\n# 调用链，raw_input会通过identity_lambda原样传递给other_runnable\noutput = chain.invoke(raw_input)')