In [7]:
from typing import List, Dict, Any, Tuple, Union
from openai import OpenAI 
import re
import json
import traceback

client = OpenAI(
    api_key="43aa6570992d01fab078e157e07b4f52.ocKHwNGxhIBrDZD4",
    base_url="https://open.bigmodel.cn/api/paas/v4/"
)

In [14]:
class CodeAgent:
    # 初始化
    def __init__(self):
        self.history_question = [] # 历史的提问
        self.history_code = [] # 提问提问的对应的代码
        
        self._planer_prompt = """你是一个代码执行专家擅长将自然语言转换为Python的执行程序，现在请规划一下面的应用程序需要调什么函数并且大致需要什么步骤，不需要输出代码。需要执行的任务：{#task#} """
        self._code_gen_prompt = """你是一个代码执行专家擅长将自然语言转换为Python的执行程序，将给定的任务写为python程序：{#task#}

实现步骤参考：{#thought#}
需要输出一段python代码，包含完整的环境依赖的引用，先写出代码，并加上调用案例。
"""
        self._code_reflection_prompt = """上述代码执行存在错误，错误信息为{#error#}，请在原有代码基础上进行改进，先写出代码，并加上调用案例。

```python
{#code#}
```
"""
        self._summary_prompt = """你是答案汇总专家，将用户的提问{#task#} 和 执行结果 {#result#} 汇总为自然语言回答。"""
        
        self.retry_time = 10

    # 智能体入口
    def action(self, question) -> Any:
        # 类似rag过程，找一下历史最接近的提问和代码
        
        # 思维链 -》 思考完成特定的步骤 需要 做什么
        init_thought = self.llm(
            messages=[    
                {"role": "user", "content": self._planer_prompt.replace("{#task#}", question)} 
            ],
        )

        # 生成代码 -》 借助思考过程生成代码
        messages=[    
            {"role": "user", "content": self._code_gen_prompt.replace("{#task#}", question).replace("{#thought#}", init_thought)} 
        ]
        init_code = self.llm(
            messages=messages, model="glm-4"
        )
        
        messages.append({
            "role": "system", "content": init_code
        })
        init_code = self.extract_code_from_llm(init_code) # 先抽取代码
        for retry_idx in range(self.retry_time):
            if init_code == "":
                messages.append({
                    "role": "user", "content": "the output do not contain any pythnon code using ```python ```, please generate."
                })
                init_code = self.llm(messages=messages, model="glm-4")
                init_code = self.extract_code_from_llm(init_code)
            else: # 如果之前抽取了代码
                execute_issucess, execute_result, code, msg = self.execute(init_code)
                if execute_issucess:
                    # messages = [{
                    #     "role": "user", "content": self._summary_prompt.replace("{#task#}", question).replace("{#result#}", execute_result)
                    # }]
                    # final_answer = self.llm(messages=messages, model="glm-4")
                    # return final_answer
                    
                    # 记忆成功提问和代码
                    return code
                    

                messages.append({
                    "role": "user", "content": self._code_reflection_prompt.replace("{#error#}", msg).replace("{#code#}", init_code)
                })
                init_code = self.llm(messages=messages, model="glm-4")
                init_code = self.extract_code_from_llm(init_code)

        print("生成失败")
        return None

    # 调用大模型
    def llm(self, messages, model="glm-4-flashx", top_p=0.7, temperature=0.9):
        print(json.dumps(messages, indent=4, ensure_ascii=False))
        try:
            completion = client.chat.completions.create(
                model=model,  
                messages=messages,
                top_p=top_p,
                temperature=temperature
            ) 
            return completion.choices[0].message.content
        except:
            print('调用大模型失败')
            return None

    # 执行代码
    def execute(self, code) -> Union[bool, str, str, str]:
        # 超时机制
        try:
            print('---')
            print(code)
            print('---')
            result = exec(code) # 执行代码， 或 生成代码写为文件单独执行 或 在容器里面执行
            return True, result, code, ""
        except Exception as e:
            error_message = traceback.format_exc()
            return False, "", code, error_message

    # 从大模型回答抽取代码，markdown 抽取 代码
    def extract_code_from_llm(self, text) -> str:
        pattern = '```python\n(.*?)```'
        try:
            matches = re.findall(pattern, text, re.DOTALL)
            return matches[0]
        except:
            print(traceback.format_exc())
            return ""

In [None]:
CodeAgent().action("获取一下当前代码的目录，并统计目录的层级的层级，几级目录。")

[
    {
        "role": "user",
        "content": "你是一个代码执行专家擅长将自然语言转换为Python的执行程序，现在请规划一下面的应用程序需要调什么函数并且大致需要什么步骤，不需要输出代码。需要执行的任务：获取一下当前代码的目录，并统计目录的层级的层级，几级目录。 "
    }
]
[
    {
        "role": "user",
        "content": "你是一个代码执行专家擅长将自然语言转换为Python的执行程序，将给定的任务写为python程序：获取一下当前代码的目录，并统计目录的层级的层级，几级目录。\n\n实现步骤参考：为了执行这个任务，我们需要规划以下步骤和函数调用：\n\n1. 使用一个函数来获取当前代码的目录。在Python中，我们可以使用`os`模块中的`os.getcwd()`函数来获取当前工作目录。\n\n2. 使用一个函数来遍历目录，并计算目录的层级。这可以通过递归遍历目录树来实现。\n\n3. 创建一个递归函数，该函数接受当前目录路径作为参数。\n\n4. 在递归函数中，检查当前目录是否包含子目录。如果包含，递归调用自身，每次递归调用时增加层级计数。\n\n5. 如果当前目录不包含子目录，则返回层级计数。\n\n6. 将步骤1和步骤4结合，获取当前目录并计算其层级。\n\n以下是大致的步骤总结：\n\n- 获取当前工作目录。\n- 遍历目录并计算层级。\n  - 递归遍历目录树。\n  - 每进入一层目录，增加层级计数。\n- 返回目录层级总数。\n需要输出一段python代码，包含完整的环境依赖的引用，先写出代码，并加上调用案例。\n"
    }
]
---
import os

def get_directory_level(path):
    """
    递归函数，计算目录的层级数。
    :param path: 要检查的目录路径
    :return: 目录的层级数
    """
    level = 0
    # 获取当前目录下的所有子目录和文件
    with os.scandir(path) as it:
        for entry in it:
            # 如果是目录，递归调用，并增加层级计数