In [1]:
# Import required libraries
import gymnasium as gym
from stable_baselines3 import SAC, TD3, A2C
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import time
import os
import threading
from collections import deque
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
from gymnasium.wrappers import RecordVideo
import warnings
warnings.filterwarnings('ignore')

# Set style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")




ModuleNotFoundError: No module named 'seaborn'

In [None]:
class ModelVisualizationDashboard:
    def __init__(self, model_dir="models"):
        self.model_dir = model_dir
        self.current_model = None
        self.env = None
        self.is_running = False
        self.episode_rewards = deque(maxlen=100)
        self.episode_lengths = deque(maxlen=100)
        self.step_rewards = deque(maxlen=1000)
        self.joint_positions = {}
        self.metrics_data = []
        self.setup_ui()
        
    def get_available_models(self):
        """Get list of available trained models"""
        models = []
        for file in os.listdir(self.model_dir):
            if file.endswith('.zip'):
                models.append(file)
        return sorted(models)
    
    def setup_ui(self):
        """Create the interactive UI components"""
        # Model selection
        self.model_dropdown = widgets.Dropdown(
            options=self.get_available_models(),
            description='Model:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='400px')
        )
        
        # Environment selection
        self.env_dropdown = widgets.Dropdown(
            options=['Humanoid-v4', 'Humanoid-v5'],
            value='Humanoid-v4',
            description='Environment:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='300px')
        )
        
        # Control buttons
        self.load_button = widgets.Button(
            description='🔄 Load Model',
            button_style='primary',
            layout=widgets.Layout(width='150px')
        )
        
        self.start_button = widgets.Button(
            description='▶️ Start Simulation',
            button_style='success',
            layout=widgets.Layout(width='150px')
        )
        
        self.stop_button = widgets.Button(
            description='⏹️ Stop Simulation',
            button_style='danger',
            layout=widgets.Layout(width='150px')
        )
        
        self.record_button = widgets.Button(
            description='🎥 Record Video',
            button_style='warning',
            layout=widgets.Layout(width='150px')
        )
        
        # Performance parameters
        self.deterministic_checkbox = widgets.Checkbox(
            value=True,
            description='Deterministic Actions',
            style={'description_width': 'initial'}
        )
        
        self.max_steps_slider = widgets.IntSlider(
            value=1000,
            min=100,
            max=5000,
            step=100,
            description='Max Steps:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='400px')
        )
        
        self.speed_slider = widgets.FloatSlider(
            value=1.0,
            min=0.1,
            max=3.0,
            step=0.1,
            description='Speed:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='300px')
        )
        
        # Status and info
        self.status_output = widgets.Output(layout={'border': '1px solid black'})
        self.metrics_output = widgets.Output(layout={'border': '1px solid blue'})
        
        # Bind events
        self.load_button.on_click(self.load_model)
        self.start_button.on_click(self.start_simulation)
        self.stop_button.on_click(self.stop_simulation)
        self.record_button.on_click(self.record_video)
        
    def display_ui(self):
        """Display the complete UI"""
        # Header
        display(HTML('<h1 style="color: #2E86AB; text-align: center;">🤖 Humanoid Model Dashboard</h1>'))
        
        # Model selection row
        model_row = widgets.HBox([
            self.model_dropdown,
            self.env_dropdown,
            self.load_button
        ])
        
        # Control row
        control_row = widgets.HBox([
            self.start_button,
            self.stop_button,
            self.record_button
        ])
        
        # Parameters row
        params_row = widgets.HBox([
            self.deterministic_checkbox,
            self.max_steps_slider,
            self.speed_slider
        ])
        
        # Main layout
        display(widgets.VBox([
            model_row,
            control_row,
            params_row,
            widgets.HTML('<hr>'),
            widgets.HBox([
                widgets.VBox([
                    widgets.HTML('<h3>📊 Status & Metrics</h3>'),
                    self.status_output
                ]),
                widgets.VBox([
                    widgets.HTML('<h3>📈 Live Performance</h3>'),
                    self.metrics_output
                ])
            ])
        ]))
    
    def load_model(self, button):
        """Load the selected model"""
        with self.status_output:
            clear_output(wait=True)
            print(f"🔄 Loading model: {self.model_dropdown.value}")
            
            try:
                # Determine algorithm from filename
                model_path = os.path.join(self.model_dir, self.model_dropdown.value)
                
                if 'SAC' in self.model_dropdown.value:
                    self.current_model = SAC.load(model_path)
                    algo = 'SAC'
                elif 'TD3' in self.model_dropdown.value:
                    self.current_model = TD3.load(model_path)
                    algo = 'TD3'
                elif 'A2C' in self.model_dropdown.value:
                    self.current_model = A2C.load(model_path)
                    algo = 'A2C'
                else:
                    print("❌ Unknown algorithm in filename")
                    return
                
                # Create environment
                self.env = gym.make(self.env_dropdown.value, render_mode='human')
                
                print(f"✅ Model loaded successfully!")
                print(f"📋 Algorithm: {algo}")
                print(f"🎮 Environment: {self.env_dropdown.value}")
                print(f"📁 File: {self.model_dropdown.value}")
                
                # Extract training steps from filename
                steps = self.model_dropdown.value.split('_')[-1].replace('.zip', '')
                print(f"🎯 Training Steps: {steps}")
                
            except Exception as e:
                print(f"❌ Error loading model: {str(e)}")
    
    def start_simulation(self, button):
        """Start the simulation"""
        if self.current_model is None:
            with self.status_output:
                print("⚠️ Please load a model first!")
            return
        
        self.is_running = True
        thread = threading.Thread(target=self._run_simulation)
        thread.start()
    
    def stop_simulation(self, button):
        """Stop the simulation"""
        self.is_running = False
        with self.status_output:
            print("⏹️ Simulation stopped")
    
    def _run_simulation(self):
        """Internal simulation loop"""
        episode_count = 0
        
        while self.is_running:
            episode_count += 1
            obs, _ = self.env.reset()
            done = False
            truncated = False
            episode_reward = 0
            episode_length = 0
            step_count = 0
            
            with self.status_output:
                clear_output(wait=True)
                print(f"🎮 Running Episode {episode_count}")
                print(f"⚙️ Speed: {self.speed_slider.value}x")
                print(f"🎯 Max Steps: {self.max_steps_slider.value}")
                print(f"🤖 Deterministic: {self.deterministic_checkbox.value}")
            
            while not (done or truncated) and self.is_running and step_count < self.max_steps_slider.value:
                action, _ = self.current_model.predict(
                    obs, 
                    deterministic=self.deterministic_checkbox.value
                )
                obs, reward, done, truncated, info = self.env.step(action)
                
                episode_reward += reward
                episode_length += 1
                step_count += 1
                
                # Store step reward for real-time plotting
                self.step_rewards.append(reward)
                
                # Update live metrics
                if step_count % 50 == 0:  # Update every 50 steps
                    self.update_live_metrics(episode_count, step_count, episode_reward, reward)
                
                # Control simulation speed
                time.sleep(0.01 / self.speed_slider.value)
            
            # Store episode metrics
            self.episode_rewards.append(episode_reward)
            self.episode_lengths.append(episode_length)
            
            # Final episode update
            self.update_episode_summary(episode_count, episode_reward, episode_length)
            
            if not self.is_running:
                break
    
    def update_live_metrics(self, episode, step, episode_reward, step_reward):
        """Update live performance metrics"""
        with self.metrics_output:
            clear_output(wait=True)
            
            # Create simple matplotlib plots for better compatibility
            fig, axes = plt.subplots(2, 2, figsize=(12, 8))
            fig.suptitle(f'Live Metrics - Episode {episode}, Step {step}')
            
            # Episode rewards plot
            if len(self.episode_rewards) > 0:
                axes[0,0].plot(list(self.episode_rewards), 'b-', marker='o')
                axes[0,0].set_title('Episode Rewards')
                axes[0,0].set_xlabel('Episode')
                axes[0,0].set_ylabel('Total Reward')
                axes[0,0].grid(True, alpha=0.3)
            
            # Episode lengths plot
            if len(self.episode_lengths) > 0:
                axes[0,1].plot(list(self.episode_lengths), 'g-', marker='s')
                axes[0,1].set_title('Episode Lengths')
                axes[0,1].set_xlabel('Episode')
                axes[0,1].set_ylabel('Steps')
                axes[0,1].grid(True, alpha=0.3)
            
            # Recent step rewards
            if len(self.step_rewards) > 0:
                recent_rewards = list(self.step_rewards)[-200:]  # Last 200 steps
                axes[1,0].plot(recent_rewards, 'r-', alpha=0.7)
                axes[1,0].set_title('Recent Step Rewards')
                axes[1,0].set_xlabel('Recent Steps')
                axes[1,0].set_ylabel('Step Reward')
                axes[1,0].grid(True, alpha=0.3)
            
            # Current episode progress
            progress = (step / self.max_steps_slider.value) * 100
            axes[1,1].bar(['Progress'], [progress], color='skyblue')
            axes[1,1].set_title('Episode Progress')
            axes[1,1].set_ylabel('Percentage')
            axes[1,1].set_ylim(0, 100)
            axes[1,1].grid(True, alpha=0.3)
            
            plt.tight_layout()
            plt.show()
            
            # Display numeric metrics
            print(f"📊 Current Metrics:")
            print(f"   Episode: {episode}")
            print(f"   Step: {step}")
            print(f"   Episode Reward: {episode_reward:.2f}")
            print(f"   Last Step Reward: {step_reward:.4f}")
            if len(self.episode_rewards) > 0:
                print(f"   Avg Episode Reward: {np.mean(self.episode_rewards):.2f}")
                print(f"   Best Episode Reward: {np.max(self.episode_rewards):.2f}")
    
    def update_episode_summary(self, episode, reward, length):
        """Update episode summary"""
        with self.status_output:
            print(f"\n📋 Episode {episode} Complete:")
            print(f"   Reward: {reward:.2f}")
            print(f"   Length: {length} steps")
            print(f"   Avg Reward: {reward/length:.4f} per step")
    
    def record_video(self, button):
        """Record a video of the model performance"""
        if self.current_model is None:
            with self.status_output:
                print("⚠️ Please load a model first!")
            return
        
        with self.status_output:
            print("🎥 Recording video...")
        
        # Create video environment
        video_env = gym.make(self.env_dropdown.value, render_mode="rgb_array")
        video_env = RecordVideo(
            video_env, 
            video_folder="videos", 
            episode_trigger=lambda x: True,
            name_prefix=f"{self.model_dropdown.value.replace('.zip', '')}"
        )
        
        try:
            obs, _ = video_env.reset()
            done = False
            truncated = False
            step_count = 0
            
            while not (done or truncated) and step_count < 1000:
                action, _ = self.current_model.predict(obs, deterministic=True)
                obs, _, done, truncated, _ = video_env.step(action)
                step_count += 1
            
            video_env.close()
            
            with self.status_output:
                print(f"✅ Video recorded successfully!")
                print(f"📁 Check the 'videos' folder for the recording")
        
        except Exception as e:
            with self.status_output:
                print(f"❌ Error recording video: {str(e)}")
    
    def generate_performance_report(self):
        """Generate a comprehensive performance report"""
        if len(self.episode_rewards) == 0:
            print("⚠️ No data available. Run some episodes first!")
            return
        
        # Create comprehensive report
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle(f'Performance Report - {self.model_dropdown.value}', fontsize=16)
        
        # Episode rewards over time
        axes[0,0].plot(self.episode_rewards, 'b-', alpha=0.7)
        axes[0,0].set_title('Episode Rewards Over Time')
        axes[0,0].set_xlabel('Episode')
        axes[0,0].set_ylabel('Total Reward')
        axes[0,0].grid(True, alpha=0.3)
        
        # Episode lengths
        axes[0,1].plot(self.episode_lengths, 'g-', alpha=0.7)
        axes[0,1].set_title('Episode Lengths Over Time')
        axes[0,1].set_xlabel('Episode')
        axes[0,1].set_ylabel('Episode Length')
        axes[0,1].grid(True, alpha=0.3)
        
        # Reward distribution
        axes[1,0].hist(self.episode_rewards, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
        axes[1,0].set_title('Reward Distribution')
        axes[1,0].set_xlabel('Episode Reward')
        axes[1,0].set_ylabel('Frequency')
        axes[1,0].grid(True, alpha=0.3)
        
        # Performance statistics
        stats_text = f"""
        Performance Statistics:
        
        Episodes: {len(self.episode_rewards)}
        
        Rewards:
        • Mean: {np.mean(self.episode_rewards):.2f}
        • Std: {np.std(self.episode_rewards):.2f}
        • Min: {np.min(self.episode_rewards):.2f}
        • Max: {np.max(self.episode_rewards):.2f}
        
        Episode Lengths:
        • Mean: {np.mean(self.episode_lengths):.1f}
        • Std: {np.std(self.episode_lengths):.1f}
        • Min: {np.min(self.episode_lengths)}
        • Max: {np.max(self.episode_lengths)}
        """
        
        axes[1,1].text(0.1, 0.5, stats_text, transform=axes[1,1].transAxes, 
                      fontsize=10, verticalalignment='center',
                      bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
        axes[1,1].set_xlim(0, 1)
        axes[1,1].set_ylim(0, 1)
        axes[1,1].axis('off')
        
        plt.tight_layout()
        plt.show()


In [None]:
# Create and display the dashboard
dashboard = ModelVisualizationDashboard()
dashboard.display_ui()


In [None]:
# Generate performance report (run this after running some episodes)
dashboard.generate_performance_report()


In [None]:
class ModelComparison:
    def __init__(self, model_dir="models"):
        self.model_dir = model_dir
        self.comparison_data = {}
    
    def evaluate_model(self, model_path, episodes=5):
        """Evaluate a model for multiple episodes"""
        print(f"🔄 Evaluating {model_path}...")
        
        # Load model
        if 'SAC' in model_path:
            model = SAC.load(os.path.join(self.model_dir, model_path))
        elif 'TD3' in model_path:
            model = TD3.load(os.path.join(self.model_dir, model_path))
        elif 'A2C' in model_path:
            model = A2C.load(os.path.join(self.model_dir, model_path))
        
        env = gym.make("Humanoid-v4", render_mode=None)
        
        rewards = []
        lengths = []
        
        for episode in range(episodes):
            obs, _ = env.reset()
            done = False
            truncated = False
            episode_reward = 0
            episode_length = 0
            
            while not (done or truncated) and episode_length < 1000:
                action, _ = model.predict(obs, deterministic=True)
                obs, reward, done, truncated, _ = env.step(action)
                episode_reward += reward
                episode_length += 1
            
            rewards.append(episode_reward)
            lengths.append(episode_length)
            print(f"  Episode {episode+1}: Reward={episode_reward:.2f}, Length={episode_length}")
        
        env.close()
        
        self.comparison_data[model_path] = {
            'rewards': rewards,
            'lengths': lengths,
            'mean_reward': np.mean(rewards),
            'std_reward': np.std(rewards),
            'mean_length': np.mean(lengths),
            'std_length': np.std(lengths)
        }
        
        print(f"✅ {model_path} - Mean Reward: {np.mean(rewards):.2f} ± {np.std(rewards):.2f}")
    
    def compare_models(self, model_list, episodes=5):
        """Compare multiple models"""
        for model in model_list:
            self.evaluate_model(model, episodes)
        
        self.plot_comparison()
    
    def plot_comparison(self):
        """Plot comparison results"""
        if not self.comparison_data:
            print("No comparison data available")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('Model Comparison Results', fontsize=16)
        
        models = list(self.comparison_data.keys())
        mean_rewards = [self.comparison_data[m]['mean_reward'] for m in models]
        std_rewards = [self.comparison_data[m]['std_reward'] for m in models]
        mean_lengths = [self.comparison_data[m]['mean_length'] for m in models]
        
        # Mean rewards comparison
        x_pos = range(len(models))
        axes[0,0].bar(x_pos, mean_rewards, yerr=std_rewards, capsize=5, alpha=0.7)
        axes[0,0].set_title('Mean Episode Rewards')
        axes[0,0].set_ylabel('Reward')
        axes[0,0].set_xticks(x_pos)
        axes[0,0].set_xticklabels([m.split('_')[1] for m in models], rotation=45)
        axes[0,0].grid(True, alpha=0.3)
        
        # Episode lengths comparison
        axes[0,1].bar(x_pos, mean_lengths, alpha=0.7, color='green')
        axes[0,1].set_title('Mean Episode Lengths')
        axes[0,1].set_ylabel('Steps')
        axes[0,1].set_xticks(x_pos)
        axes[0,1].set_xticklabels([m.split('_')[1] for m in models], rotation=45)
        axes[0,1].grid(True, alpha=0.3)
        
        # Reward distributions
        for i, model in enumerate(models):
            axes[1,0].hist(self.comparison_data[model]['rewards'], 
                          alpha=0.5, label=model.split('_')[1], bins=10)
        axes[1,0].set_title('Reward Distributions')
        axes[1,0].set_xlabel('Episode Reward')
        axes[1,0].set_ylabel('Frequency')
        axes[1,0].legend()
        axes[1,0].grid(True, alpha=0.3)
        
        # Summary table
        summary_text = "Model Performance Summary:\n\n"
        for model in models:
            data = self.comparison_data[model]
            summary_text += f"{model.split('_')[1]}:\n"
            summary_text += f"  Reward: {data['mean_reward']:.2f} ± {data['std_reward']:.2f}\n"
            summary_text += f"  Length: {data['mean_length']:.1f} ± {data['std_length']:.1f}\n\n"
        
        axes[1,1].text(0.1, 0.5, summary_text, transform=axes[1,1].transAxes,
                      fontsize=10, verticalalignment='center',
                      bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.8))
        axes[1,1].set_xlim(0, 1)
        axes[1,1].set_ylim(0, 1)
        axes[1,1].axis('off')
        
        plt.tight_layout()
        plt.show()


In [None]:
# Example: Compare different models
# Uncomment and modify the model names according to your available models

# comparator = ModelComparison()
# models_to_compare = [
#     'SAC_100000.zip',
#     'SAC_200000.zip',
#     'TD3_100000.zip',
#     'A2C_100000.zip'
# ]
# comparator.compare_models(models_to_compare, episodes=3)


In [None]:
def analyze_training_progress():
    """Analyze training progress across different model checkpoints"""
    model_dir = "models"
    models = [f for f in os.listdir(model_dir) if f.endswith('.zip')]
    
    # Group by algorithm
    algo_models = {'SAC': [], 'TD3': [], 'A2C': []}
    
    for model in models:
        for algo in algo_models.keys():
            if algo in model:
                try:
                    steps = int(model.split('_')[-1].replace('.zip', ''))
                    algo_models[algo].append((steps, model))
                except:
                    pass
    
    # Sort by training steps
    for algo in algo_models:
        algo_models[algo].sort()
    
    # Display available models by algorithm
    for algo, models in algo_models.items():
        if models:
            print(f"\n{algo} Models:")
            for steps, model_name in models[:10]:  # Show first 10
                print(f"  {steps:,} steps: {model_name}")
            if len(models) > 10:
                print(f"  ... and {len(models)-10} more")

analyze_training_progress()
