## 1. Simulator

In [None]:
# Automatically detect the best renderer
def detect_best_renderer():
    system_name = platform.system()
    print(f"Detected system type: {system_name}")
    
    # Order of renderers to try
    renderers_to_try = []
    
    if system_name == "Linux":
        # Linux prioritizes EGL, then GLFW and OSMesa
        renderers_to_try = ["glfw", "egl", "osmesa"]
    elif system_name == "Windows":
        # Windows prioritizes GLFW
        renderers_to_try = ["glfw", "osmesa"]
    elif system_name == "Darwin":  # macOS
        # macOS prioritizes GLFW
        renderers_to_try = ["glfw", "osmesa"]
    else:
        # Default order
        renderers_to_try = ["glfw", "egl", "osmesa"]
    
    # Try each renderer
    for renderer in renderers_to_try:
        try:
            print(f"Trying {renderer} renderer...")
            
            # Set environment variables
            os.environ['PYOPENGL_PLATFORM'] = renderer
            os.environ['MUJOCO_GL'] = renderer
            
            # Try creating a simple model and renderer to test
            simple_xml = """
            <mujoco>
                <visual>
                    <global offwidth="4096" offheight="3072"/>
                </visual>
                <worldbody>
                    <light diffuse=".5 .5 .5" pos="0 0 3" dir="0 0 -1"/>
                    <geom type="plane" size="1 1 0.1" rgba=".9 .9 .9 1"/>
                </worldbody>
            </mujoco>
            """
            model = mujoco.MjModel.from_xml_string(simple_xml)
            data = mujoco.MjData(model)
            # Just create a renderer to test
            renderer = mujoco.Renderer(model, width=4096, height=3072)
            
            # If we reach here without exceptions, this renderer is available
            print(f"Successfully using {renderer} renderer")
            return renderer
        except Exception as e:
            print(f"{renderer} renderer test failed: {e}")
    
    # If all attempts fail, use the default
    print("All renderers failed, using default GLFW")
    os.environ['PYOPENGL_PLATFORM'] = 'glfw'
    os.environ['MUJOCO_GL'] = 'glfw'
    return 'glfw'

In [None]:
def create_simple_model():
    """Create a simple model with 4K framebuffer settings"""
    xml = """
    <mujoco>
        <visual>
            <global offwidth="4096" offheight="3072"/>
        </visual>
        
        <worldbody>
            <light diffuse=".5 .5 .5" pos="0 0 3" dir="0 0 -1"/>
            <geom type="plane" size="2 2 0.1" rgba=".9 .9 .9 1"/>
            <body name="robot" pos="0 0 1">
                <joint type="free"/>
                <geom type="box" size=".05 .05 .2" rgba="0.3 0.3 0.7 1"/>
                <site name="chalk_tip" pos="0.05 0 0" size="0.01"/>
            </body>
        </worldbody>
    </mujoco>
    """
    temp_path = "simple_model.xml"
    with open(temp_path, "w", encoding="utf-8") as f:
        f.write(xml)
    return temp_path

### 1.2 A example for renderer object

In [None]:
import mujoco
import mujoco.viewer
import numpy as np
import imageio

# 加载模型
model = mujoco.MjModel.from_xml_path("humanoid.xml")
data = mujoco.MjData(model)

# 创建 renderer
renderer = mujoco.Renderer(model, height=1080, width=1920)

# 运行一步仿真并渲染
mujoco.mj_step(model, data)
renderer.update_scene(data)
image = renderer.render()

# 保存图像
imageio.imwrite("output.png", image)

# 释放资源
renderer.free()

In [None]:
    def render_frame(self):
        """Render a frame"""
        try:
            # Ensure we have a renderer instance
            if self.current_renderer is None:
                self.current_renderer = self.create_renderer()
                if self.current_renderer is None:
                    raise Exception("Unable to create renderer")
            
            # Update scene and render
            mujoco.mj_forward(self.model, self.data)
            self.current_renderer.update_scene(self.data)
            img = self.current_renderer.render()
            img = self.crop_black_borders(img)
            # Add performance information
            resolution_info = f"{self.width}x{self.height}"
            cv2.putText(
                img, 
                f"FPS: {self.fps:.1f} | {resolution_info}", 
                (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.7, 
                (255, 255, 255), 
                2
            )
            
            # Cache this frame
            self.last_rendered_frame = img
            return img
        except Exception as e:
            print(f"Rendering failed: {e}")
            # Return an error image (smaller size to avoid memory issues)
            error_img = np.zeros((480, 640, 3), dtype=np.uint8)
            error_msg = str(e)
            # For long error messages, display on multiple lines
            y_pos = 240
            cv2.putText(
                error_img,
                f"Render Error:", 
                (10, y_pos - 20),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (0, 0, 255),
                2
            )
            
            # Split error message into multiple lines
            max_chars = 40  # Maximum characters per line
            words = error_msg.split()
            line = ""
            for word in words:
                if len(line + " " + word) > max_chars:
                    cv2.putText(
                        error_img,
                        line,
                        (10, y_pos),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.5,
                        (255, 255, 255),
                        1
                    )
                    y_pos += 20
                    line = word
                else:
                    if line:
                        line += " " + word
                    else:
                        line = word
            
            # Output last line
            if line:
                cv2.putText(
                    error_img,
                    line,
                    (10, y_pos),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5,
                    (255, 255, 255),
                    1
                )
                    
            self.last_rendered_frame = error_img
            return error_img

In [None]:
    def process_blackboard_words(self, words, socket_emit_func):
        """Process a list of words to write on the blackboard"""
        if not words:
            return
            
        print(f"Processing blackboard words: {words}")
        
        # Process each word in the list
        for word in words:
            if not self.simulation_running:
                break
                
            # Process each letter in the word
            for letter in word:
                if not self.simulation_running:
                    break
                
                # Keep trying to write the letter until successful or max retries
                max_retries = 3
                retries = 0
                success = False
                
                while not success and retries < max_retries and self.simulation_running:
                    # If writing is already in progress, wait and retry
                    if self.writing_in_progress:
                        print(f"Writing in progress, waiting before trying letter '{letter}' (retry {retries+1}/{max_retries})")
                        socket_emit_func("writing_status", {
                            "status": "waiting", 
                            "letter": letter, 
                            "retry": retries+1
                        })
                        time.sleep(1.5)  # Wait 1.5 seconds before retry
                        retries += 1
                        continue
                    
                    print(f"Writing letter: {letter}")
                    success = self.writing_simulation(letter, socket_emit_func)
                    
                    if not success:
                        print(f"Failed to start writing for letter '{letter}' (retry {retries+1}/{max_retries})")
                        retries += 1
                        time.sleep(0.5)  # Short pause before retry
                        
                # Wait for writing to complete if successful
                if success:
                    while self.writing_in_progress and self.simulation_running:
                        time.sleep(0.1)
                        
                # Pause between letters
                time.sleep(0.5)
                
            # Add space between words
            time.sleep(1)  # Longer pause between words
        
        socket_emit_func("writing_status", {"status": "all_completed", "words": words})

In [None]:
    def writing_simulation(self, letter, socket_emit_func, retry_count=0, max_retries=3):
        """Start writing simulation for a specific letter with retry logic"""
        # Return immediately if simulation not running
        if not self.simulation_running:
            print("Cannot start writing - simulation not running")
            socket_emit_func("simulation_error", {"error": "Simulation not running"})
            return False
        
        # Add retry logic for writing in progress
        if self.writing_in_progress:
            if retry_count < max_retries:
                print(f"Writing in progress, retrying in 1 second (attempt {retry_count+1}/{max_retries})")
                # Schedule a retry after 1 second
                threading.Timer(1.0, lambda: self.writing_simulation(
                    letter, socket_emit_func, retry_count+1, max_retries)).start()
                return True  # Return True to indicate retry is scheduled
            else:
                print("Writing already in progress, max retries exceeded")
                socket_emit_func("simulation_error", 
                            {"error": "Writing already in progress, please try again later"})
                return False
        # Clean and validate letter
        if not letter or not isinstance(letter, str) or len(letter.strip()) == 0:
            letter = "A"  # Default to A if no valid letter
        else:
            letter = letter.strip().upper()
        print(f"Starting writing simulation for letter: {letter}")
        socket_emit_func("writing_status", {"status": "starting", "letter": letter})
        # Set flag BEFORE creating thread to prevent race conditions
        self.writing_in_progress = True
        try:
            # Create a new thread for the writing process
            writing_thread = threading.Thread(
                target=self._run_writing_thread,
                args=(letter, socket_emit_func)
            )
            writing_thread.daemon = True
            writing_thread.start()
            socket_emit_func("writing_status", {"status": "started", "letter": letter})
            return True
        except Exception as e:
            # Reset flag if thread creation fails
            self.writing_in_progress = False
            print(f"Failed to start writing simulation: {e}")
            socket_emit_func("simulation_error", {"error": f"Writing error: {str(e)}"})
            return False

In [None]:
    def write_on_blackboard(self, words, socket_emit_func):
        """Start a thread to write words on the blackboard"""
        if not self.simulation_running:
            return {"success": False, "message": "Simulation is not running"}
            
        if self.writing_in_progress:
            return {"success": False, "message": "Writing already in progress"}
            
        # Start writing thread
        thread = threading.Thread(
            target=self.process_blackboard_words, 
            args=(words, socket_emit_func)
        )
        thread.daemon = True
        thread.start()
        return {"success": True, "message": f"Writing {len(words)} words to blackboard"}

In [None]:
    def process_blackboard_words(self, words, socket_emit_func):
        """Process a list of words to write on the blackboard"""
        if not words:
            return
            
        print(f"Processing blackboard words: {words}")
        
        # Clear trajectory points before writing new words
        with self.trajectory_lock:
            self.trajectory_points.clear()
        
        # Process each word in the list
        for word in words:
            if not self.simulation_running:
                break
                
            # Process each letter in the word
            for letter in word:
                if not self.simulation_running:
                    break
                
                # Keep trying to write the letter until successful or max retries
                max_retries = 3
                retries = 0
                success = False
                
                while not success and retries < max_retries and self.simulation_running:
                    # If writing is already in progress, wait and retry
                    if self.writing_in_progress:
                        print(f"Writing in progress, waiting before trying letter '{letter}' (retry {retries+1}/{max_retries})")
                        socket_emit_func("writing_status", {
                            "status": "waiting", 
                            "letter": letter, 
                            "retry": retries+1
                        })
                        time.sleep(1.5)  # Wait 1.5 seconds before retry
                        retries += 1
                        continue
                    
                    print(f"Writing letter: {letter}")
                    success = self.writing_simulation(letter, socket_emit_func)
                    
                    if not success:
                        print(f"Failed to start writing for letter '{letter}' (retry {retries+1}/{max_retries})")
                        retries += 1
                        time.sleep(0.5)  # Short pause before retry
                        
                # Wait for writing to complete if successful
                if success:
                    while self.writing_in_progress and self.simulation_running:
                        time.sleep(0.1)
                        
                # Pause between letters
                time.sleep(0.5)
                
            # Add space between words
            time.sleep(1)  # Longer pause between words
        
        socket_emit_func("writing_status", {"status": "all_completed", "words": words})

In [None]:
def process_blackboard_words(self, words, socket_emit_func):
        """Process a list of words to write on the blackboard"""
        if not words:
            return
            
        print(f"Processing blackboard words: {words}")
        
        # Clear trajectory points before writing new words
        with self.trajectory_lock:
            self.trajectory_points.clear()
        
        # Process each word in the list
        for word in words:
            if not self.simulation_running:
                break
                
            # Process each letter in the word
            for letter in word:
                if not self.simulation_running:
                    break
                
                # Keep trying to write the letter until successful or max retries
                max_retries = 3
                retries = 0
                success = False
                
                while not success and retries < max_retries and self.simulation_running:
                    # If writing is already in progress, wait and retry
                    if self.writing_in_progress:
                        print(f"Writing in progress, waiting before trying letter '{letter}' (retry {retries+1}/{max_retries})")
                        socket_emit_func("writing_status", {
                            "status": "waiting", 
                            "letter": letter, 
                            "retry": retries+1
                        })
                        time.sleep(1.5)  # Wait 1.5 seconds before retry
                        retries += 1
                        continue
                    
                    print(f"Writing letter: {letter}")
                    success = self.writing_simulation(letter, socket_emit_func)
                    
                    if not success:
                        print(f"Failed to start writing for letter '{letter}' (retry {retries+1}/{max_retries})")
                        retries += 1
                        time.sleep(0.5)  # Short pause before retry
                        
                # Wait for writing to complete if successful
                if success:
                    while self.writing_in_progress and self.simulation_running:
                        time.sleep(0.1)
                        
                # Pause between letters
                time.sleep(0.5)
                
            # Add space between words
            time.sleep(1)  # Longer pause between words
            
        socket_emit_func("writing_status", {"status": "all_completed", "words": words})