# Maze Robot PPO Simulation

This notebook runs the Proximal Policy Optimization (PPO) simulation for maze robot navigation in a headless environment and captures video for visualization.

In [None]:
# Import necessary libraries
import numpy as np
import matplotlib.pyplot as plt
import pygame
import os
import sys
import time
import cv2
import torch
from IPython.display import HTML, display
from base64 import b64encode
from IPython.display import Video

# Import project modules
from Robot_config import Robot
from Environment import Environment
from ppo_agent import PPO
from reward_model import RewardModel

In [2]:
# Setup headless pygame
os.environ["SDL_VIDEODRIVER"] = "dummy"
pygame.init()

# Configure video recording
def setup_video_writer(filename, fps=30, size=(900, 900)):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # or 'XVID'
    return cv2.VideoWriter(filename, fourcc, fps, size)

In [3]:
# Helper function to convert pygame surface to numpy array for OpenCV
def surface_to_array(surface):
    return np.transpose(np.array(pygame.surfarray.pixels3d(surface)), (1, 0, 2))

In [4]:
# Function to display video in the notebook
def show_video(file_path, width=640):
    return Video(file_path, width=width)

In [5]:
def run_ppo_simulation(max_episodes=50, video_filename="ppo_simulation.mp4", record_interval=2):
    # Record only certain episodes to keep video size manageable
    record_episodes = set([0, max_episodes//4, max_episodes//2, max_episodes-1])
    
    # Load map
    map = pygame.image.load("./maze.jpg")
    map_copy = map.copy()
    
    # Start position & size maze
    start_position = (450, 100)  # origin 450, 100
    end_position = (450, 900, -np.pi/2)
    size = (900, 900)
    
    environment = Environment(size, "./maze.jpg")
    
    # Robot setup
    robot = Robot(start_position, "./car.png", 2, 2)
    
    # Setup PPO agent
    state_dim = 11  # 8 sensors + x, y, theta
    action_dim = 2  # vx, vtheta
    ppo_agent = PPO(state_dim=state_dim, action_dim=action_dim, hidden_dim=128,
                    lr=3e-4, gamma=0.99, clip_ratio=0.2, batch_size=32, epochs=5)
    
    # Setup reward model
    reward_model = RewardModel(target_position=end_position[:2], target_orientation=end_position[2])
    
    # Video writer
    video_writer = setup_video_writer(video_filename)
    
    # Metrics to track
    episode_rewards = []
    distances_to_goal = []
    best_reward = -float('inf')
    best_distance = float('inf')
    
    # Training loop
    episode = 0
    dt = 0.05  # Fixed dt for simulation stability
    
    try:
        while episode < max_episodes:
            print(f"Processing episode {episode+1}/{max_episodes}")
            
            # Record this episode?
            record_this_episode = episode in record_episodes
            frame_counter = 0
            
            # Reset environment
            robot.reset(start_position)
            
            # Reset reward model
            reward_model.reset((robot.x, robot.y))
            
            # Initial state
            robot.update_sensor_data(map_copy, (0, 0, 0))
            state = robot.get_state()
            
            episode_reward = 0
            step = 0
            done = False
            max_steps_per_episode = 500
            
            while not done and step < max_steps_per_episode:
                # Clear display surface for this frame
                if record_this_episode and frame_counter % record_interval == 0:
                    environment.map.blit(map, (0, 0))
                
                # Get action from policy
                action, log_prob = ppo_agent.select_action(state)
                
                # Scale action
                robot.vx = action[0] * 150  # Scale to reasonable range
                robot.vtheta = action[1] * 1.0  # Scale to reasonable range
                
                # Execute action
                robot.move(dt)
                robot.time += dt
                robot.update_sensor_data(map_copy, (0, 0, 0))
                robot.check_crash(map_copy, (0, 0, 0))
                
                # Get new state
                next_state = robot.get_state()
                
                # Calculate reward and check if done
                robot_state = {
                    'position': (robot.x, robot.y),
                    'orientation': robot.theta,
                    'velocity': (robot.x_dot, robot.y_dot),
                    'sensor_data': robot.sensor_data,
                    'crash': robot.crash,
                    'time': robot.time
                }
                reward, done = reward_model.calculate_reward(robot_state)
                
                # Store transition in PPO memory
                ppo_agent.store_transition(state, action, reward, next_state, done, log_prob)
                
                # Update state and accumulate reward
                state = next_state
                episode_reward += reward
                step += 1
                
                # Visualization - only record certain frames to keep video size manageable
                if record_this_episode and frame_counter % record_interval == 0:
                    robot.draw(environment.map)
                    environment.robot_frames([robot.x, robot.y], robot.theta)
                    environment.robot_sensor([robot.x, robot.y], robot.points)
                    environment.trail((robot.x, robot.y))
                    
                    # Add episode info to frame
                    font = pygame.font.Font('freesansbold.ttf', 24)
                    text = font.render(f"Episode: {episode+1}/{max_episodes}, Reward: {episode_reward:.2f}", True, (255, 255, 255), (0, 0, 0))
                    environment.map.blit(text, (10, 10))
                    
                    # Record frame
                    frame = surface_to_array(environment.map)
                    # Convert to BGR for OpenCV
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    video_writer.write(frame)
                
                frame_counter += 1
            
            # Update policy every 5 episodes
            if (episode + 1) % 5 == 0:
                ppo_agent.update()
            
            # Record stats
            episode_rewards.append(episode_reward)
            distance_to_goal = math.sqrt((robot.x - end_position[0])**2 + (robot.y - end_position[1])**2)
            distances_to_goal.append(distance_to_goal)
            
            # Save best model
            if episode_reward > best_reward:
                best_reward = episode_reward
                torch.save(ppo_agent.actor_critic.state_dict(), "ppo_best_model.pt")
            
            if distance_to_goal < best_distance:
                best_distance = distance_to_goal
            
            print(f"Episode: {episode+1}, Reward: {episode_reward:.2f}, Distance: {distance_to_goal:.2f}, Best Distance: {best_distance:.2f}")
            episode += 1
    
    except KeyboardInterrupt:
        print("Simulation interrupted by user")
    
    finally:
        # Save final model
        torch.save(ppo_agent.actor_critic.state_dict(), "ppo_final_model.pt")
        # Release video writer
        video_writer.release()
    
    return episode_rewards, distances_to_goal, video_filename

In [None]:
# Import math for distance calculation
import math

# Run simulation
rewards, distances, video_file = run_ppo_simulation(max_episodes=10000)

In [None]:
# Plot metrics
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Plot rewards over episodes
ax1.plot(range(1, len(rewards) + 1), rewards, 'b-')
ax1.set_xlabel('Episode')
ax1.set_ylabel('Episode Reward')
ax1.set_title('Reward Evolution During Training')
ax1.grid(True)

# Calculate moving average for reward trend line
window_size = 5
if len(rewards) > window_size:
    moving_avg = [np.mean(rewards[i:i+window_size]) for i in range(len(rewards)-window_size+1)]
    ax1.plot(range(window_size, len(rewards) + 1), moving_avg, 'r-', label=f'{window_size}-Episode Moving Average')
    ax1.legend()

# Plot distance to goal over episodes
ax2.plot(range(1, len(distances) + 1), distances, 'g-')
ax2.set_xlabel('Episode')
ax2.set_ylabel('Distance to Goal (Pixels)')
ax2.set_title('Distance to Goal at End of Episode')
ax2.grid(True)

plt.tight_layout()
plt.savefig('ppo_metrics.png')
plt.show()

In [None]:
# Display the video
show_video(video_file, width=720)

## Evaluation of Trained Model

After training, we can evaluate the best model to see how well it performs.

In [9]:
def evaluate_ppo_model(model_path="ppo_best_model.pt", episodes=5, video_filename="ppo_evaluation.mp4"):
    # Load map
    map = pygame.image.load("./maze.jpg")
    map_copy = map.copy()
    
    # Start position & size maze
    start_position = (450, 100)
    end_position = (450, 900, -np.pi/2)
    size = (900, 900)
    
    # Environment setup
    environment = Environment(size, "./maze.jpg")
    
    # Robot setup
    robot = Robot(start_position, "./car.png", 2, 2)
    
    # Setup PPO agent for evaluation
    state_dim = 11  # 8 sensors + x, y, theta
    action_dim = 2  # vx, vtheta
    
    # Create model and load weights
    actor_critic = ActorCritic(state_dim, action_dim)
    actor_critic.load_state_dict(torch.load(model_path))
    actor_critic.eval()  # Set to evaluation mode
    
    # Create PPO agent for evaluation only
    ppo_agent = PPO(state_dim, action_dim)
    ppo_agent.actor_critic = actor_critic
    
    # Video writer
    video_writer = setup_video_writer(video_filename)
    
    # Stats collection
    success_count = 0
    distances = []
    times = []
    
    dt = 0.05  # Fixed dt for simulation stability
    
    for episode in range(episodes):
        # Reset robot
        robot.reset(start_position)
        
        # Initial state
        robot.update_sensor_data(map_copy, (0, 0, 0))
        state = robot.get_state()
        
        running = True
        frame_counter = 0
        
        while running:
            # Clear display surface
            environment.map.blit(map, (0, 0))
            
            # Get action from policy (deterministic for evaluation)
            action = ppo_agent.select_action(state, deterministic=True)
            
            # Scale and execute action
            robot.vx = action[0] * 150
            robot.vtheta = action[1] * 1.0
            
            # Execute action
            robot.move(dt)
            robot.time += dt
            robot.update_sensor_data(map_copy, (0, 0, 0))
            robot.check_crash(map_copy, (0, 0, 0))
            
            # Get new state
            state = robot.get_state()
            
            # Check if episode is done
            distance_to_goal = math.sqrt((robot.x - end_position[0])**2 + (robot.y - end_position[1])**2)
            orientation_diff = abs(((robot.theta - end_position[2] + np.pi) % (2 * np.pi)) - np.pi)
            
            # Check success conditions
            success = distance_to_goal < 30 and orientation_diff < 0.5
            done = robot.crash or robot.time > 30 or success
            
            if done:
                distances.append(distance_to_goal)
                times.append(robot.time)
                if success:
                    success_count += 1
                running = False
            
            # Visualization
            robot.draw(environment.map)
            environment.robot_frames([robot.x, robot.y], robot.theta)
            environment.robot_sensor([robot.x, robot.y], robot.points)
            environment.trail((robot.x, robot.y))
            
            # Show additional info
            font = pygame.font.Font('freesansbold.ttf', 20)
            info_text = f"Evaluation Episode: {episode+1}/{episodes}, Time: {robot.time:.1f}s, Distance: {distance_to_goal:.1f}"
            text = font.render(info_text, True, (255, 255, 255), (0, 0, 0))
            environment.map.blit(text, (10, 10))
            
            # Record frame
            if frame_counter % 2 == 0:  # Record every other frame to save space
                frame = surface_to_array(environment.map)
                frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                video_writer.write(frame)
            
            frame_counter += 1
            
            # Limit evaluation to reasonable number of frames
            if frame_counter > 1000:
                running = False
                distances.append(distance_to_goal)
                times.append(robot.time)
        
        print(f"Episode {episode+1}: {'Success' if success else 'Failed'}, Distance: {distance_to_goal:.2f}, Time: {robot.time:.2f}s")
    
    # Print summary statistics
    success_rate = success_count / episodes * 100
    avg_distance = sum(distances) / len(distances)
    avg_time = sum(times) / len(times)
    
    print(f"\nEvaluation Results:")
    print(f"Success Rate: {success_rate:.1f}%")
    print(f"Average Distance to Goal: {avg_distance:.2f}")
    print(f"Average Episode Time: {avg_time:.2f}s")
    
    # Release video writer
    video_writer.release()
    
    return video_filename

In [None]:
# Import ActorCritic for evaluation
from ppo_agent import ActorCritic

# Evaluate the trained model
eval_video = evaluate_ppo_model(episodes=3)
show_video(eval_video, width=720)