In [None]:
import asyncio
import json
import os
import venv
import sys
from pathlib import Path
from typing import List, Dict, Optional, Any, Union, Tuple
import re
import time
import openai  # 或其他支持的AI模型API

In [None]:
class CancellationToken:
    """简单的取消令牌实现"""
    def __init__(self):
        self._cancelled = False
        
    def cancel(self):
        self._cancelled = True
        
    @property
    def cancelled(self):
        return self._cancelled


class CodeBlock:
    """代码块类，包含代码内容和语言"""
    def __init__(self, code: str, language: str):
        self.code = code
        self.language = language.lower()
        
    def __repr__(self):
        return f"CodeBlock(language={self.language}, code={self.code[:20]}...)"


class ExecutionResult:
    """执行结果类"""
    def __init__(self, exit_code: int, output: str, error: str):
        self.exit_code = exit_code
        self.output = output
        self.error = error
        
    def is_success(self) -> bool:
        return self.exit_code == 0
    
    def __repr__(self):
        if self.is_success():
            return f"执行成功:\n{self.output}"
        else:
            return f"执行失败 (代码: {self.exit_code}):\n{self.error}"


class LocalCommandLineCodeExecutor:
    """本地命令行代码执行器"""
    
    def __init__(self, work_dir: Union[str, Path], virtual_env_context: Optional[Any] = None):
        """
        初始化本地命令行代码执行器
        
        Args:
            work_dir: 工作目录
            virtual_env_context: 虚拟环境上下文
        """
        self.work_dir = Path(work_dir)
        self.virtual_env_context = virtual_env_context
        self.work_dir.mkdir(exist_ok=True, parents=True)
    
    async def execute_code_blocks(self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken) -> Dict[str, Any]:
        """
        执行一系列代码块
        
        Args:
            code_blocks: 代码块列表
            cancellation_token: 取消令牌
            
        Returns:
            执行结果字典
        """
        results = {}
        
        for i, block in enumerate(code_blocks):
            if cancellation_token.cancelled:
                results[f"block_{i}"] = {"status": "cancelled"}
                continue
                
            try:
                if block.language in ["python", "py"]:
                    result = await self._execute_python(block.code)
                elif block.language in ["bash", "shell", "sh"]:
                    result = await self._execute_shell(block.code)
                else:
                    result = ExecutionResult(1, "", f"不支持的语言: {block.language}")
                    
                results[f"block_{i}"] = {
                    "language": block.language,
                    "exit_code": result.exit_code,
                    "output": result.output,
                    "error": result.error,
                    "status": "success" if result.is_success() else "error"
                }
            except Exception as e:
                results[f"block_{i}"] = {
                    "language": block.language,
                    "status": "error",
                    "error": str(e)
                }
        
        return results
    
    async def _execute_python(self, code: str) -> ExecutionResult:
        """执行Python代码"""
        # 写入临时文件
        temp_file = self.work_dir / "temp_script.py"
        with open(temp_file, "w", encoding="utf-8") as f:
            f.write(code)
        
        # 确定Python解释器路径
        if self.virtual_env_context:
            if sys.platform == "win32":
                python_executable = Path(self.virtual_env_context.env_exe)
            else:
                python_executable = Path(self.virtual_env_context.env_dir) / "bin" / "python"
        else:
            python_executable = Path(sys.executable)
        
        # 执行命令
        process = await asyncio.create_subprocess_exec(
            str(python_executable),
            str(temp_file),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(self.work_dir)
        )
        
        stdout, stderr = await process.communicate()
        
        return ExecutionResult(
            process.returncode,
            stdout.decode("utf-8", errors="replace"),
            stderr.decode("utf-8", errors="replace")
        )
    
    async def _execute_shell(self, code: str) -> ExecutionResult:
        """执行Shell代码"""
        # 写入临时文件
        temp_file = self.work_dir / "temp_script.sh"
        with open(temp_file, "w", encoding="utf-8") as f:
            f.write(code)
        
        # 设置为可执行
        temp_file.chmod(temp_file.stat().st_mode | 0o111)
        
        # 确定Shell路径
        shell = "/bin/bash" if os.path.exists("/bin/bash") else "/bin/sh"
        
        # 执行命令
        process = await asyncio.create_subprocess_exec(
            shell,
            str(temp_file),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(self.work_dir),
            env=self._get_environment_variables()
        )
        
        stdout, stderr = await process.communicate()
        
        return ExecutionResult(
            process.returncode,
            stdout.decode("utf-8", errors="replace"),
            stderr.decode("utf-8", errors="replace")
        )
    
    def _get_environment_variables(self) -> Dict[str, str]:
        """获取环境变量"""
        env = os.environ.copy()
        
        # 如果有虚拟环境，添加相关环境变量
        if self.virtual_env_context:
            # 将虚拟环境的bin目录添加到PATH
            if sys.platform == "win32":
                bin_dir = Path(self.virtual_env_context.env_dir) / "Scripts"
            else:
                bin_dir = Path(self.virtual_env_context.env_dir) / "bin"
            
            path_sep = os.pathsep
            env["PATH"] = f"{bin_dir}{path_sep}{env.get('PATH', '')}"
            env["VIRTUAL_ENV"] = str(self.virtual_env_context.env_dir)
            
        return env


async def LocalCodeExecutor(codeblock_list, env=None, filedir='/oper/ch/autogen'):
    """
    执行代码块列表，可在指定环境中运行
    
    Args:
        codeblock_list: 代码块列表，每个元素应为CodeBlock实例
        env: 虚拟环境路径，如果为None则创建新环境
        filedir: 工作目录路径
        
    Returns:
        执行结果
    """
    work_dir = Path(filedir)
    work_dir.mkdir(exist_ok=True, parents=True)
    
    # 处理虚拟环境
    try:
        if not env:
            venv_dir = work_dir / ".venv"
            if venv_dir.exists():
                # 如果已存在，使用现有的
                venv_context = venv.EnvBuilder(with_pip=True).ensure_directories(venv_dir)
            else:
                # 否则创建新的
                venv_builder = venv.EnvBuilder(with_pip=True)
                venv_builder.create(venv_dir)
                venv_context = venv_builder.ensure_directories(venv_dir)
        else:
            env_path = Path(env)
            if not env_path.exists():
                # 如果指定的环境不存在，创建它
                venv_builder = venv.EnvBuilder(with_pip=True)
                venv_builder.create(env_path)
            venv_context = venv.EnvBuilder(with_pip=True).ensure_directories(env_path)
        
        # 创建执行器并执行代码
        local_executor = LocalCommandLineCodeExecutor(work_dir=work_dir, virtual_env_context=venv_context)
        try:
            result = await local_executor.execute_code_blocks(
                code_blocks=codeblock_list,
                cancellation_token=CancellationToken(),
            )
            return result
        except Exception as e:
            return f"执行错误: {e}"
    except Exception as e:
        return f"环境设置错误: {e}"


class AICodeAssistant:
    """AI代码助手类，用于处理AI代码生成与优化的反馈循环"""
    
    def __init__(self, api_key: str, model: str = "gpt-4", work_dir: str = "/oper/ch/autogen", 
                 env: Optional[str] = None, max_iterations: int = 5):
        """
        初始化AI代码助手
        
        Args:
            api_key: AI服务的API密钥
            model: 使用的模型名称
            work_dir: 工作目录路径
            env: 虚拟环境路径，如果为None则创建新环境
            max_iterations: 最大迭代次数，防止无限循环
        """
        self.api_key = api_key
        self.model = model
        self.work_dir = work_dir
        self.env = env
        self.max_iterations = max_iterations
        
        # 初始化AI客户端
        if "gpt" in model.lower():
            openai.api_key = api_key
        else:
            # 根据需要添加其他模型的初始化代码
            pass
    
    async def solve_coding_task(self, task_description: str, test_criteria: Optional[str] = None, 
                               language: str = "python") -> Dict[str, Any]:
        """
        解决编码任务，通过反馈循环直到任务完成或达到最大迭代次数
        
        Args:
            task_description: 任务描述
            test_criteria: 测试标准，用于验证代码是否完成任务
            language: 编程语言
            
        Returns:
            包含最终代码、执行结果和迭代历史的字典
        """
        iteration_history = []
        
        # 初始提示，包含任务描述
        prompt = f"""
        任务: {task_description}
        
        请编写{language}代码来解决上述任务。代码应该是完整可执行的。
        如果有测试标准，代码应该满足以下测试标准：
        {test_criteria if test_criteria else '无特定测试标准，请确保代码能正确运行并解决任务。'}
        """
        
        for iteration in range(1, self.max_iterations + 1):
            print(f"\n===== 迭代 {iteration}/{self.max_iterations} =====")
            
            # 向AI请求代码
            code = await self._generate_code_from_ai(prompt, language)
            
            # 创建代码块并执行
            code_block = CodeBlock(code, language)
            execution_result = await LocalCodeExecutor([code_block], env=self.env, filedir=self.work_dir)
            
            # 保存当前迭代结果
            current_iteration = {
                "iteration": iteration,
                "code": code,
                "execution_result": execution_result
            }
            iteration_history.append(current_iteration)
            
            # 分析执行结果
            success, evaluation = self._evaluate_execution_result(execution_result, test_criteria)
            current_iteration["evaluation"] = evaluation
            
            if success:
                print(f"✅ 任务成功完成! (迭代 {iteration}/{self.max_iterations})")
                break
            
            # 准备下一次迭代的提示
            prompt = f"""
            任务: {task_description}
            
            测试标准: {test_criteria if test_criteria else '无特定测试标准，请确保代码能正确运行并解决任务。'}
            
            你之前的代码:
            ```{language}
            {code}
            ```
            
            执行结果:
            {self._format_execution_result(execution_result)}
            
            问题评估:
            {evaluation}
            
            请修复上述问题并提供改进的代码。提供完整的代码，不要省略任何部分。
            """
            
            print(f"❌ 代码执行有问题，准备下一次迭代...")
            
        return {
            "final_code": iteration_history[-1]["code"],
            "final_result": iteration_history[-1]["execution_result"],
            "success": success if 'success' in locals() else False,
            "iterations": len(iteration_history),
            "iteration_history": iteration_history
        }
    
    async def _generate_code_from_ai(self, prompt: str, language: str) -> str:
        """从AI生成代码"""
        print("🤖 正在从AI获取代码...")
        
        try:
            if "gpt" in self.model.lower():
                response = await self._call_openai_api(prompt)
                return self._extract_code_from_response(response, language)
            else:
                # 为其他模型API添加适当的调用代码
                raise NotImplementedError(f"未实现对模型 {self.model} 的支持")
        except Exception as e:
            print(f"❗ 从AI获取代码时出错: {e}")
            return f"# 生成代码时出错: {e}\n# 这是一个占位符代码\nprint('发生错误，无法生成有效代码')"
    
    async def _call_openai_api(self, prompt: str) -> Any:
        """调用OpenAI API"""
        # 这个示例使用异步调用，如果你使用的是同步API，需要调整
        try:
            # 根据你使用的OpenAI库版本调整这个调用
            # 这是针对较新版本的openai库
            response = await openai.ChatCompletion.acreate(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个专业的代码助手，擅长编写高质量的代码并解决编程问题。"},
                    {"role": "user", "content": prompt}
                ]
            )
            return response
        except Exception as e:
            print(f"OpenAI API调用失败: {e}")
            raise
    
    def _extract_code_from_response(self, response: Any, language: str) -> str:
        """从AI响应中提取代码"""
        try:
            # 假设使用的是OpenAI的ChatCompletion API
            full_text = response.choices[0].message.content
            
            # 尝试提取代码块
            code_pattern = rf"```(?:{language})?\s*([\s\S]*?)\s*```"
            code_matches = re.findall(code_pattern, full_text)
            
            if code_matches:
                # 返回找到的第一个代码块
                return code_matches[0].strip()
            else:
                # 如果没有代码块，返回整个回复（可能需要进一步清理）
                return self._clean_non_code_text(full_text, language)
        except Exception as e:
            print(f"提取代码时出错: {e}")
            return full_text
    
    def _clean_non_code_text(self, text: str, language: str) -> str:
        """清理非代码文本，尝试保留代码部分"""
        # 删除明显的非代码行，如"让我们编写代码..."等
        lines = text.split('\n')
        code_lines = []
        in_code = False
        
        for line in lines:
            line = line.strip()
            # 跳过空行和明显是说明的行
            if not line or line.startswith(('Here is', 'Let me', 'I will', 'Now,', 'First,')):
                continue
                
            # 假设第一行如果包含某些关键字可能是代码开始
            if not in_code and any(keyword in line for keyword in ['import', 'def ', 'class ', 'function', '#', '//']):
                in_code = True
                
            if in_code:
                code_lines.append(line)
        
        return '\n'.join(code_lines)
    
    def _format_execution_result(self, result: Dict[str, Any]) -> str:
        """格式化执行结果为可读字符串"""
        formatted_result = []
        
        for block_id, block_result in result.items():
            if isinstance(block_result, dict):
                formatted_result.append(f"--- {block_id} ({block_result.get('language', 'unknown')}) ---")
                formatted_result.append(f"状态: {block_result.get('status', 'unknown')}")
                
                if block_result.get('status') == 'success':
                    formatted_result.append(f"输出:\n{block_result.get('output', '')}")
                else:
                    formatted_result.append(f"错误:\n{block_result.get('error', '')}")
            else:
                # 处理非字典结果
                formatted_result.append(f"执行结果: {str(block_result)}")
        
        return '\n'.join(formatted_result)
    
    def _evaluate_execution_result(self, execution_result: Dict[str, Any], 
                                 test_criteria: Optional[str]) -> Tuple[bool, str]:
        """
        评估执行结果是否满足测试标准
        
        Returns:
            (成功与否, 评估描述)
        """
        # 默认评估
        if not execution_result or not isinstance(execution_result, dict):
            return False, "执行结果无效或不完整"
        
        # 检查是否有错误状态
        has_error = False
        error_messages = []
        
        for block_id, result in execution_result.items():
            if isinstance(result, dict):
                if result.get('status') == 'error':
                    has_error = True
                    error_messages.append(f"{block_id}: {result.get('error', '未知错误')}")
            elif isinstance(result, str) and ('错误' in result or 'error' in result.lower()):
                has_error = True
                error_messages.append(result)
        
        if has_error:
            return False, f"代码执行出错:\n" + "\n".join(error_messages)
        
        # 如果没有明确的测试标准，只要没有错误就认为成功
        if not test_criteria:
            return True, "代码执行成功，无错误"
        
        # 如果有测试标准，需要进一步验证输出是否符合预期
        # 这里可以实现更复杂的测试标准验证逻辑
        # 目前简单实现：检查输出中是否包含测试标准中的关键词
        
        all_output = ""
        for block_id, result in execution_result.items():
            if isinstance(result, dict) and 'output' in result:
                all_output += result['output'] + "\n"
        
        # 简单的关键词检查（可以根据需要扩展为更复杂的逻辑）
        keywords = [kw.strip().lower() for kw in test_criteria.split(',')]
        for keyword in keywords:
            if keyword and keyword not in all_output.lower():
                return False, f"测试未通过: 输出中未找到关键词 '{keyword}'"
        
        return True, "代码执行成功，并且满足所有测试标准"


async def main():
    """示例用法"""
    # 设置你的API密钥
    api_key = "your_api_key_here"  # 替换为实际的API密钥
    
    # 创建AI代码助手
    assistant = AICodeAssistant(
        api_key=api_key,
        model="gpt-4",  # 或其他支持的模型
        work_dir="./ai_code_workspace",
        max_iterations=3
    )
    
    # 定义编码任务
    task_description = """
    创建一个简单的Python函数，用于计算斐波那契数列的第n个数字。
    函数应该接受一个参数n，并返回对应的斐波那契数。
    请确保处理边界情况（n=0和n=1）。
    函数应该能高效地处理大的n值。
    """
    
    # 定义测试标准
    test_criteria = """
    函数应该正确计算斐波那契数列：f(0)=0, f(1)=1, f(2)=1, f(3)=2, f(10)=55
    """
    
    # 解决任务
    result = await assistant.solve_coding_task(
        task_description=task_description,
        test_criteria=test_criteria,
        language="python"
    )
    
    # 输出结果
    print("\n===== 最终结果 =====")
    print(f"成功: {'是' if result['success'] else '否'}")
    print(f"迭代次数: {result['iterations']}")
    print("\n最终代码:")
    print(result['final_code'])


if __name__ == "__main__":
    asyncio.run(main())