# PandaReach-v3 Visual Training

This notebook runs a training session with visual rendering, so you can see the robot moving as it learns.

In [None]:
import numpy as np
import gymnasium as gym
import panda_gym
import torch
import matplotlib.pyplot as plt
import time
from IPython import display

# Import project modules
from sac import SAC
from replay_buffer import ReplayBuffer

# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# Create environment with rendering enabled
env_name = "PandaReach-v3"
env = gym.make(env_name, render_mode="human")
eval_env = gym.make(env_name)

# Print environment information
print("Environment Information:")
print(f"Observation Space: {env.observation_space}")
print(f"Action Space: {env.action_space}")
print(f"Action Space High: {env.action_space.high}")
print(f"Action Space Low: {env.action_space.low}")

In [None]:
# Function to evaluate a policy
def evaluate_policy(policy, env, eval_episodes=5):
    """Evaluate the policy without rendering"""
    avg_reward = 0.
    successes = 0
    
    for _ in range(eval_episodes):
        obs_dict, _ = env.reset()
        obs = obs_dict['observation']
        done = False
        truncated = False
        
        while not (done or truncated):
            action = policy.select_action(np.array(obs), noise=0)
            obs_dict, reward, done, truncated, info = env.step(action)
            obs = obs_dict['observation']
            avg_reward += reward
            
            if 'is_success' in info and info['is_success'] == 1.0:
                successes += 1
                break
    
    avg_reward /= eval_episodes
    success_rate = successes / eval_episodes
    
    print(f"Evaluation over {eval_episodes} episodes: {avg_reward:.3f}, Success rate: {success_rate:.3f}")
    return avg_reward, success_rate

In [None]:
# Set up the agent
seed = 0
torch.manual_seed(seed)
np.random.seed(seed)

# Get state and action dimensions
state_dim = env.observation_space['observation'].shape[0]
action_dim = env.action_space.shape[0]

# Initialize policy
policy = SAC(
    state_dim=state_dim,
    action_dim=action_dim,
    action_space=env.action_space,
    device=device,
    hidden_dim=256,
    lr=3e-4,
    gamma=0.99,
    tau=0.005,
    alpha=0.2,
    automatic_entropy_tuning=True
)

# Initialize replay buffer
replay_buffer = ReplayBuffer(
    state_dim=state_dim,
    action_dim=action_dim,
    max_size=1000000,
    device=device
)

In [None]:
# Visual training loop - watch the robot learn!
max_episodes = 1000
max_steps_per_episode = 50

# For collecting statistics
all_rewards = []
all_lengths = []
success_rates = []
eval_interval = 10  # Evaluate every 10 episodes

for episode in range(1, max_episodes + 1):
    print(f"\nEpisode {episode}/{max_episodes}")
    
    # Reset environment
    state_dict, _ = env.reset(seed=seed+episode)
    state = state_dict['observation']
    
    episode_reward = 0
    episode_steps = 0
    done = False
    truncated = False
    
    # Run one episode
    while not (done or truncated) and episode_steps < max_steps_per_episode:
        # Select action
        action = policy.select_action(state)
        
        # Execute action
        next_state_dict, reward, done, truncated, info = env.step(action)
        next_state = next_state_dict['observation']
        
        # Save in replay buffer
        replay_buffer.add(state, action, next_state, reward, done)
        
        # Update state and counters
        state = next_state
        episode_reward += reward
        episode_steps += 1
        
        # Train agent (learning happens here)
        policy.update_parameters(replay_buffer)
        
        # Check for success
        if 'is_success' in info and info['is_success'] == 1.0:
            print(f"Success! Goal reached at step {episode_steps}")
            break
    
    # End of episode stats
    print(f"Episode {episode}: Reward = {episode_reward:.3f}, Steps = {episode_steps}")
    all_rewards.append(episode_reward)
    all_lengths.append(episode_steps)
    
    # Evaluate periodically
    if episode % eval_interval == 0:
        avg_reward, success_rate = evaluate_policy(policy, eval_env)
        success_rates.append(success_rate)
        
        # Plot progress
        plt.figure(figsize=(15, 5))
        
        # Plot rewards
        plt.subplot(1, 3, 1)
        plt.plot(all_rewards)
        plt.title('Episode Rewards')
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        
        # Plot lengths
        plt.subplot(1, 3, 2)
        plt.plot(all_lengths)
        plt.title('Episode Lengths')
        plt.xlabel('Episode')
        plt.ylabel('Steps')
        
        # Plot success rates
        plt.subplot(1, 3, 3)
        plt.plot(np.arange(eval_interval, episode+1, eval_interval), success_rates)
        plt.title('Success Rate')
        plt.xlabel('Episode')
        plt.ylabel('Success Rate')
        
        plt.tight_layout()
        plt.show()
        
        # Save model checkpoint
        if success_rate >= 0.8:  # Save when we reach 80% success
            print("Saving model checkpoint - good performance!")
            policy.save(f"./results/sac_checkpoint_episode_{episode}")

# Close environment when done
env.close()
eval_env.close()

In [None]:
# Test the final policy
test_env = gym.make(env_name, render_mode="human")
test_episodes = 5

for episode in range(test_episodes):
    obs_dict, _ = test_env.reset()
    obs = obs_dict['observation']
    done = False
    truncated = False
    episode_reward = 0
    steps = 0
    
    print(f"\nTesting Episode {episode+1}/{test_episodes}")
    
    while not (done or truncated):
        # Choose action without exploration noise
        action = policy.select_action(obs, noise=0)
        
        # Execute action
        obs_dict, reward, done, truncated, info = test_env.step(action)
        obs = obs_dict['observation']
        episode_reward += reward
        steps += 1
        
        # Check for success
        if 'is_success' in info and info['is_success'] == 1.0:
            print(f"Success! Goal reached at step {steps}")
            time.sleep(1)  # Pause for a moment to see the success
            break
    
    print(f"Episode {episode+1}: Reward = {episode_reward:.3f}, Steps = {steps}")

test_env.close()