In [1]:
import asyncio
import websockets
import json
from typing import Dict, List, Any
from autogen import AssistantAgent, UserProxyAgent, config_list_from_json

In [2]:
# configure llm model
config_list = config_list_from_json(
    "OAI_CONFIG_LIST.json",
    file_location="./config/",
    # filter_dict={
    #     "model": ["gpt-4-1106-preview"] 
    # }
)
print("Successfully configure the llm model:", config_list)

Successfully configure the llm model: [{'model': 'gemini-2.0-pro-latest', 'api_key': 'AIzaSyAb7sjA83pc8dpL5cpRQQZfXyW4ACoGciI', 'api_type': 'google'}]


In [4]:
import time
from subprocess import Popen
from autogen_ext.tools.mcp.client import MCPClient
from autogen_ext.tools.mcp import McpWorkbench, StdioServerParams

class BlenderController:
    def __init__(self):
        self.client = None
        self.blender_process = None
        self._start_blender()
        self._connect_mcp()

    def _start_blender(self):
        """自动启动Blender实例"""
        try:
            self.blender_process = Popen([
                "blender", 
                "--background", 
                "--python-expr", 
                "import bpy; bpy.ops.wm.mcp_start()"
            ])
            print("正在启动Blender进程...")
            time.sleep(5)  # 等待初始化完成
        except Exception as e:
            print(f"启动失败: {str(e)}")
            raise RuntimeError("请确保Blender已安装并加入系统PATH")

    def _connect_mcp(self):
        """连接MCP服务器"""
        self.client = MCPClient(
            host="localhost",
            port=8000,
            auth_token="autogen",
            auto_reconnect=True
        )
        
        try:
            self.client.connect()
            print("成功连接到Blender MCP服务器")
        except ConnectionRefusedError:
            print("连接被拒绝，请检查：")
            print("1. Blender是否已安装MCP插件")
            print("2. 端口8000是否可用")
            raise

    def execute_command(self, command: str):
        """执行MCP命令（带重试机制）"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = self.client.send(command)
                return {
                    "status": "success",
                    "data": response.data,
                    "metadata": response.metadata
                }
            except Exception as e:
                if attempt == max_retries -1:
                    return {
                        "status": "error",
                        "message": f"最终失败: {str(e)}"
                    }
                print(f"尝试 {attempt+1} 失败，正在重试...")
                time.sleep(1)
                self._connect_mcp()

# 初始化控制器
blender_controller = BlenderController()

ModuleNotFoundError: No module named 'autogen_ext.tools.mcp.client'

In [8]:
# 步骤4：构建AutoGen Agent系统
def termination_msg(x):
    return isinstance(x, dict) and "TERMINATE" == x.get("content", "")

# 创建用户代理
user_proxy = UserProxyAgent(
    name="User_Proxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=5,
    code_execution_config={
        "work_dir": "blender_workspace",
        "use_docker": False,
    },
    system_message="负责接收用户需求并与场景设计师协调。当收到完成通知时回复TERMINATE",
    default_auto_reply="继续执行下一步操作",
    is_termination_msg=termination_msg
)

# 创建场景设计师Agent
scene_designer = AssistantAgent(
    name="3D_Scene_Designer",
    llm_config={
        "config_list": config_list,
        "temperature": 0.3,
        "timeout": 120,
        "functions": [
            {
                "name": "execute_blender_command",
                "description": "执行Blender操作命令",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "command": {
                            "type": "string",
                            "description": "符合BlenderMCP协议的指令"
                        }
                    },
                    "required": ["command"]
                }
            }
        ]
    },
    system_message=
    """你是一个专业的3D场景设计师，擅长将自然语言需求转换为BlenderMCP指令。
    1. 分析用户需求，分解为Blender操作步骤
    2. 生成精确的BlenderMCP命令
    3. 通过execute_blender_command函数执行命令
    4. 处理执行结果并反馈
    5. 可以使用中英文交流"""
)

# 注册Blender执行函数
@user_proxy.register_for_execution()
@scene_designer.register_for_llm(description="执行Blender命令")
def execute_blender_command(command: str):
    return blender_controller.execute_command(command)

ImportError: Please install `google-genai` and 'vertexai' to use Google's API.