参考内容：https://liduos.com/the-langgraph-build-ai-programer-agent.html#LangGraph-%E4%BB%8B%E7%BB%8D

In [1]:
import os

# 定义搜索路径，即app目录的绝对路径
search_path = os.path.join(os.getcwd(), "app")

# 定义crud.py文件的路径，该文件位于search_path/src目录下
code_file = os.path.join(search_path, "src/crud.py")

# 定义测试文件test_crud.py的路径，该文件位于search_path/test目录下
test_file = os.path.join(search_path, "test/test_crud.py")

# 检查search_path路径是否存在，如果不存在则创建
if not os.path.exists(search_path):
    os.mkdir(search_path)  # 创建search_path目录
    os.mkdir(os.path.join(search_path, "src"))  # 在search_path下创建src目录
    os.mkdir(os.path.join(search_path, "test"))  # 在search_path下创建test目录

In [2]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(base_url="https://api.deepseek.com/v1",
    api_key="sk-02f0c28749534d369c47776c4081fd96",
    # api_key=os.getenv("DEEPSEEK_KEY"),
    model="deepseek-chat")

# Define StateGraph

In [3]:
from typing import TypedDict, List
from langgraph.graph import StateGraph, END

class AgentState(TypedDict):  # 定义AgentState类型，用于存储代理的状态
    class_source: str
    class_methods: List[str]
    tests_source: str

# 创建StateGraph
workflow = StateGraph(AgentState)

# Define Nodes

### 定义一些全图信息

In [None]:
def extract_code_from_message(message):
    lines = message.split("\n")  # 按行分割消息
    code = ""
    in_code = False  # 标记是否在代码块中
    for line in lines:
        if "```" in line:  # 检查是否是代码块的开始或结束
            in_code = not in_code
        elif in_code:  # 如果在代码块中，则累加代码
            code += line + "\n"
    return code  # 返回提取的代码

# 系统消息模板
system_message_template = """你是一个聪明的开发者，你将使用pytest编写高质量的单元测试。
仅用源代码回复测试。不要在你的回复中包含类。我会自己添加导入语句。
如果没有要写的测试，请回复“# 没有要写的测试”且不要包含类。
示例：
def test_function():
    ...
请务必遵循指令并编写高质量的测试，不要写测试类，只写方法。
"""

## Discover Node

获取需要生成测试用例的函数，生成 import 代码

In [5]:
from langchain_core.messages import HumanMessage, SystemMessage
import colorama

import_prompt_template = """
这是一条包含代码文件路径的信息：{code_file}。
这是一条包含测试文件路径的信息：{test_file}。
请为文件中的类编写正确的导入语句。
"""

# 发现类及其方法
def discover_function(state: AgentState):
    assert os.path.exists(code_file)  # 确保代码文件存在
    with open(code_file, "r") as f:  # 打开代码文件进行读取
        source = f.read()  # 读取文件内容
    state["class_source"] = source  # 将源代码存储在状态中

    # 获取方法
    methods = []
    for line in source.split("\n"):
        if "def " in line:  # 如果行中包含def，表示这是一个方法定义
            methods.append(line.split("def ")[1].split("(")[0])
    state["class_methods"] = methods  # 将方法名存储在状态中

    # 生成导入语句并启动代码
    import_prompt = import_prompt_template.format(
        code_file=code_file,  # 格式化导入提示模板
        test_file=test_file
    )
    message = llm.invoke([HumanMessage(content=import_prompt)]).content  # 调用模型生成消息
    code = extract_code_from_message(message)  # 提取消息中的代码
    state["tests_source"] = code + "\n\n"  # 将测试源代码存储在状态中

    return state  # 返回更新后的状态

# 将节点添加到工作流中
workflow.add_node(
    "discover",  # 节点名称
    discover_function  # 节点对应的函数
)

## Generate Test Node

In [6]:
# 编写测试模板
generate_test_template = """这里是类：
'''
{class_source}
'''
为方法“{class_method}”实现一个测试。
"""

def generate_tests_function(state: AgentState):

    # 获取下一个要编写测试的方法
    class_method = state["class_methods"].pop(0)
    print(f"为{class_method}编写测试。")

    # 获取源代码
    class_source = state["class_source"]

    # 创建提示
    generate_test_prompt = generate_test_template.format(
        class_source=class_source,
        class_method=class_method
    )
    print(colorama.Fore.CYAN + generate_test_prompt + colorama.Style.RESET_ALL)  # 打印提示信息

    # 获取测试源代码
    system_message = SystemMessage(system_message_template)  # 创建系统消息
    human_message = HumanMessage(generate_test_prompt)  # 创建人类消息
    test_source = llm.invoke([system_message, human_message]).content  # 调用模型生成测试代码
    test_source = extract_code_from_message(test_source)  # 提取消息中的测试代码
    print(colorama.Fore.GREEN + test_source + colorama.Style.RESET_ALL)  # 打印测试代码
    state["tests_source"] += test_source + "\n\n"  # 将测试源代码添加到状态中

    return state  # 返回更新后的状态

# 将编写测试节点添加到工作流中
workflow.add_node(
    "generate_tests",
    generate_tests_function
)

## Write File Node

In [7]:

# 编写文件
def write_file(state: AgentState):
    with open(test_file, "w") as f:  # 打开测试文件进行写入
        f.write(state["tests_source"])  # 写入测试源代码
    return state  # 返回状态

# 将写文件节点添加到工作流中
workflow.add_node(
    "write_file",
    write_file
)

# Define Edges of Graph

In [8]:
# 定义入口点，这是流程开始的地方
workflow.set_entry_point("discover")

# 总是从discover跳转到generate_tests
workflow.add_edge("discover", "generate_tests")

In [9]:
# 判断是否完成
def should_continue(state: AgentState):
    if len(state["class_methods"]) == 0:  # 如果没有更多的方法要测试
        return "end"  # 结束流程
    else:
        return "continue"  # 继续流程

# 添加条件边
workflow.add_conditional_edges(
    "generate_tests",  # 条件边的起始节点
    should_continue,  # 条件函数
    {
        "continue": "generate_tests",  # 如果应该继续，则再次执行generate_tests节点
        "end": "write_file"  # 如果结束，则跳转到write_file节点
    }
)

# 总是从write_file跳转到END
workflow.add_edge("write_file", END)


# Compile and Run

In [10]:
from langchain_core.runnables import RunnableConfig
from langgraph.errors import GraphRecursionError

app = workflow.compile()  # 编译工作流
inputs = {}  # 输入参数
config = RunnableConfig(recursion_limit=100)  # 设置递归限制
try:
    result = app.invoke(inputs, config)  # 运行应用
    print(result)  # 打印结果
except GraphRecursionError:  # 如果达到递归限制
    print("达到图递归限制。")  # 打印错误信息

为__init__编写测试。
[36m这里是类：
'''

class Item:
    def __init__(self, id, name, description=None):
        self.id = id  # 初始化Item对象的id属性
        self.name = name  # 初始化Item对象的name属性
        self.description = description  # 初始化Item对象的description属性，可省略

    def __repr__(self):
        # 定义对象的字符串表示方法，便于打印和调试
        return f"Item(id={self.id}, name={self.name}, description={self.description})"

class CRUDApp:
    def __init__(self):
        self.items = []  # 初始化一个空列表，用于存储Item对象

    def create_item(self, id, name, description=None):
        # 创建一个Item对象，并将其添加到items列表中
        item = Item(id, name, description)
        self.items.append(item)
        return item  # 返回创建的Item对象

    def read_item(self, id):
        # 根据id读取Item对象
        for item in self.items:
            if item.id == id:
                return item  # 如果找到匹配的id，返回Item对象
        return None  # 如果没有找到匹配的id，返回None

    def update_item(self, id, name=None, description=None):
        # 根据id更新Item对象的name和/或description属性
       