In [2]:
import os, json, time, socket, threading
from tkinter import Tk, Text, Scrollbar, Entry, Button, END, BOTH, RIGHT, LEFT, Y
from dotenv import load_dotenv
from openai import OpenAI
from datetime import datetime
import re
import tempfile
import base64


# Server information
HOST = "127.0.0.1"
PORT = 5000


# Load environmental parameters (ie. the openai key)
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


# The list and definitions of actions the agent can select

ACTION_DOC = """
Here are the allowed actions with parameters and descriptions:

1. add_cube: size (float), location (x,y,z), rotation (x,y,z)
   - Adds a cube mesh to the scene.

2. add_sphere: radius (float), location (x,y,z)
   - Adds a UV sphere.

3. add_cylinder: radius (float), depth (float), location (x,y,z)
   - Adds a cylinder mesh.

4. add_cone: radius1 (float), radius2 (float), depth (float), location (x,y,z)
   - Adds a cone mesh.

5. add_plane: size (float), location (x,y,z)
   - Adds a flat plane for ground or walls.

7. move_object: object_name (str), location (x,y,z)
   - Moves an existing object.

8. rotate_object: object_name (str), rotation (x,y,z)
   - Rotates an existing object.

9. scale_object: object_name (str), scale (x,y,z)
   - Scales an existing object.

10. add_camera: name (str), location (x,y,z)
    - Adds a camera to the scene.

11. add_point_light: name (str), location (x,y,z), energy (float)
    - Adds a point light.

12. add_sun_light: name (str), rotation (x,y,z), strength (float)
    - Adds a directional sun light.

13. set_material_color: object_name (str), color (r,g,b,a)
    - Sets the base color of an object's material.

14. render: filepath (str)
    - Renders the current scene to the given file path.

15. list_objects: no parameters
    - Returns a list of all objects currently in the scene.

16. delete_object: object_name (str)
    - Deletes the specified object.

17. clear_scene: no parameters
    - Removes all objects and resets the scene.
    
18. print_objects: no parameters
    - Prints the current scene objects to the app log.

"""


# Send the selected command to the blender server
def send_command(command):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        s.sendall(json.dumps(command).encode())
        response = s.recv(65536)
    return json.loads(response.decode())


class AgentSession:
    def __init__(self):
        self.conversation_history = []
        self.scene_state = None            # raw list returned from server (list[str] or list[dict])
        self.error_log = []
        self.expected_objects = set()      # store normalized names

    def log(self, msg):
        app.append_log(msg)

    # Get the list of objects from blender
    def get_scene_objects_raw(self):
        try:
            resp = send_command({"action": "list_objects"})
        except Exception as e:
            self.log(f"Error listing objects: {e}")
            return []

        # If server returned a list directly
        if isinstance(resp, list):
            return resp

        # If it's a dict, try well-known keys first
        if isinstance(resp, dict):
            for key in ("objects", "result", "items", "data", "payload"):
                if key in resp and isinstance(resp[key], list):
                    return resp[key]
            # fallback: find the first list-of-things in values
            for v in resp.values():
                if isinstance(v, list):
                    return v

        # Log and return empty list
        self.log(f"Warning: unexpected list_objects response shape: {resp}")
        return []

    @staticmethod
    def _extract_names_from_list(lst):
        """
        From a list that may contain dicts or strings, return a list of names (strings).
        For dict entries, prefer common keys like 'name', 'object_name', 'id', 'label'.
        """
        names = []
        for item in lst:
            if isinstance(item, dict):
                name = item.get("name") or item.get("object_name") or item.get("id") or item.get("label")
                if name:
                    names.append(str(name))
                else:
                    # as a last resort stringify the dict (not ideal, but safe)
                    names.append(str(item))
            elif isinstance(item, str):
                names.append(item)
            else:
                # convert other types safely to string
                names.append(str(item))
        return names

    _NAME_SUFFIX_RE = re.compile(r"^(.*?)(?:\.\d+)?$")

    # Normalize the name to help with comparison
    @classmethod    
    def _normalize_name(cls, name):
        """Strip Blender numeric suffixes like 'Cube.001' -> 'Cube' and trim whitespace."""
        if name is None:
            return ""
        s = str(name).strip()
        m = cls._NAME_SUFFIX_RE.match(s)
        return m.group(1) if m else s

    # =================================================================================
    # Assemble the prompt and send it to the LLM.  This is where the real logic happens
    def ask_llm_for_commands(self):
        goal_description = "\n".join(self.conversation_history)

        context = ""
        if self.scene_state:
            context += f"Scene objects currently present: {self.scene_state}\n"
        if self.error_log:
            context += f"Errors from last attempt: {self.error_log}\n"

        # Define the prompt.  Include the goal (describing the agent), the commands, the history, and the format of the expect 
        # response (ie. json)

        # System prompt defines the role/goals of the agent.
        system_prompt = f"""
You are an AI that generates JSON commands for Blender.  Your goal is to take in the command given the user and select the appropriate
actions to take.   

Each command MUST be an object with keys:
- "action": (string) the action name, one of the allowed actions
- "params": (object, optional) parameters for the action

Do not put the action name as a key itself.

{ACTION_DOC}

"""

        # User prompt is the most updated information.  
        user_prompt = f"""

User instructions so far (latest last):
{goal_description}

{context}

Only consider the last statement in the conversation.  Generate ONLY the new or corrective commands needed.
Return ONLY a raw JSON array, no code fences or markdown.
"""

        # Send prompt to gpt
        resp = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ],
            temperature=0
        )
        text = resp.choices[0].message.content
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            self.log(f"Invalid JSON from LLM: {text}")
            return []

    # Parse and handle the returned instruction list
    def process_instruction(self, user_text):
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        #self.conversation_history.append(f"[{ts}] {user_text}")
        self.conversation_history = f"[{ts}] {user_text}"

        commands = self.ask_llm_for_commands()
        all_results = []
        new_errors = []

        for cmd in commands:
            action = cmd.get("action")

            # First check for local commands.  Currently the only one is print_objects
            # Local-only print_objects
            if action == "print_objects":
                raw = self.get_scene_objects_raw()
                self.scene_state = raw
                self.log("====================")
                self.log(f"Scene objects: {raw}")
                self.log("====================")
                continue

            #  Update expected_objects BEFORE sending (store normalized names) 
            if isinstance(action, str):
                if action.startswith("add_"):
                    # prefer an explicit name param from the LLM; otherwise infer
                    name = cmd.get("params", {}).get("name")
                    if not name:
                        name = action.replace("add_", "").capitalize()
                    self.expected_objects.add(self._normalize_name(name))
                elif action == "delete_object":
                    name = cmd.get("params", {}).get("object_name")
                    if name:
                        self.expected_objects.discard(self._normalize_name(name))
                elif action == "clear_scene":
                    self.expected_objects.clear()

            #  Send the command to blender to modify the scene 
            self.log(f"Sending: {cmd}")
            try:
                result = send_command(cmd)
            except Exception as e:
                result = {"error": str(e)}
            self.log(f"← Result: {result}")
            all_results.append((cmd, result))
            if "error" in result:
                new_errors.append({"command": cmd, "error": result["error"]})
            time.sleep(0.3)

        # Update raw scene state and log it 
        raw = self.get_scene_objects_raw()
        self.scene_state = raw
        self.log(f"Scene now contains: {self.scene_state}")

        # Verification step using extracted & normalized names
        self.verify_scene()

        self.error_log = new_errors

    # Send a command to render the scene to a png.  
    def render_scene(self):

        tmpfile = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
        tmpfile.close()
        try:
            send_command({"action": "render", "params": {"filepath": tmpfile.name}})
            self.log(f"Rendered image saved to {tmpfile.name}")
            return tmpfile.name
        except Exception as e:
            self.log(f"Render failed: {e}")
            return None

    #   Compare the expected set of objects against the current list of objects in the scene
    def verify_scene(self):
        """Compare normalized Blender names to our normalized expected set."""
        raw = self.scene_state or []
        names = self._extract_names_from_list(raw)
        blender_norm = set(self._normalize_name(n) for n in names)
        expected_norm = set(self.expected_objects)  # already normalized on insert

        missing = sorted(expected_norm - blender_norm)
        unexpected = sorted(blender_norm - expected_norm)

        if not missing and not unexpected:
            self.log("SUCCESS:  Verification passed: scene matches expected objects.")
        else:
            if missing:
                self.log(f"MISSING OBJECTS: (expected but not in Blender): {missing}")
            if unexpected:
                self.log(f"UNEXPECTED OBJECT (in Blender but not expected): {unexpected}")

    # Conduct a visual rendering of the scene.  Ask an LLM if the png created matches the description
    def evaluate_render(self, user_prompt):
        """
        Ask a vision-capable LLM if the rendered image matches the user prompt.
        """
        image_path = self.render_scene()
        if not image_path:
            self.log("Cannot evaluate without a render.")
            return

        try:

            #with open(image_path, "rb") as f:
                #img_bytes = f.read()

            with open(image_path, "rb") as f:                
                resp = client.chat.completions.create(
                    model="gpt-4o",  # multimodal
                    messages=[
                        {
                            "role": "system",
                            "content": (
                                "You are a visual QA assistant. "
                                "Given a prompt and a rendered image, "
                                "state whether the image matches the prompt."
                            ),
                        },
                        
                        {
                            "role": "user",
                            "content": [
                                {
                                    "type": "text",
                                    "text": f"Prompt: {user_prompt}\nDoes the rendered image match this description? Answer 'yes' or 'no' and explain briefly."
                                }, 
                                {"type": "file", "file": f} 
                            
                            ],
                        }
                    ],
                    temperature=0,
                )
            answer = resp.choices[0].message.content
            self.log(f"Evaluation result:\n{answer}")
        except Exception as e:
            self.log(f"Evaluation failed: {e}")


# The blender app.  Allows for a multi-turn interaction.  Also has a panel to display information.  
class BlenderAgentApp(Tk):
    def __init__(self, agent):
        super().__init__()
        self.title("Blender Agent")
        self.geometry("700x500")
        self.agent = agent

        self.text_area = Text(self, wrap="word")
        self.scrollbar = Scrollbar(self, command=self.text_area.yview)
        self.text_area.configure(yscrollcommand=self.scrollbar.set)
        self.scrollbar.pack(side=RIGHT, fill=Y)
        self.text_area.pack(side=LEFT, fill=BOTH, expand=True)

        self.entry = Entry(self)
        self.entry.pack(fill="x", padx=5, pady=5)
        self.entry.bind("<Return>", lambda e: self.send())

        self.send_button = Button(self, text="Send", command=self.send)
        self.send_button.pack(pady=5)
        self.eval_button = Button(self, text="Evaluate Render",
                                  command=self.evaluate)
        self.eval_button.pack(pady=5)

    def evaluate(self):
        # Use the last user statement as the reference prompt
        last_prompt = self.agent.conversation_history[-1] if self.agent.conversation_history else ""
        threading.Thread(
            target=self.agent.evaluate_render,
            args=(last_prompt,),
            daemon=True
        ).start()

    def append_log(self, msg):
        self.text_area.insert(END, f"{msg}\n")
        self.text_area.see(END)

    def send(self):
        user_text = self.entry.get().strip()
        if not user_text:
            return
        self.entry.delete(0, END)
        self.append_log(f"User: {user_text}")
        threading.Thread(
            target=self._background_process, args=(user_text,), daemon=True
        ).start()

    def _background_process(self, text):
        self.agent.process_instruction(text)


# Go!
if __name__ == "__main__":
    agent = AgentSession()
    app = BlenderAgentApp(agent)
    app.append_log(
        "Blender Agent Ready. Example:\n"
        "  'Make a cube at the origin'\n"
        "  'What objects are currently in the scene?'"
    )
    app.mainloop()
