# Sokoban RL with PPO

**CS 175 Final Project** - Training a PPO agent for Sokoban-small-v0

## Quick Start

1. **Run cells 1-9** (setup and class definitions)
2. **Run Cell 10 (DEMO)** - Quick demonstration with pre-trained agent
3. Optionally view Cell 11 for comprehensive evaluation

**Note:** Training from scratch (Cell 12) takes several hours.


## 1. Setup: Import Dependencies and Initialize Environment

This cell imports all required libraries and sets up the Sokoban environment:
- **gym & gym_sokoban**: Sokoban puzzle environment
- **torch**: Deep learning framework for PPO agent
- **matplotlib**: For visualization

**CHECKPOINT_PATH**: Pre-trained model location (trained for 3000 episodes)


In [40]:
import gym
import gym_sokoban
import pygame
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt
import os

CHECKPOINT_PATH='google_colab_checkpoints/sokoban-small-v0/ppo_sokoban_ep3000.pth'

# NumPy 2.x compatibility patch
if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_
print(f"NumPy version: {np.__version__}")
print("Compatibility patch applied ✓")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


class SokobanRewardShaper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.previous_min_distance = None
        self.boxes_on_target = 0
        self.max_steps = 150

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        self.previous_min_distance = self._compute_min_box_target_distance()
        self.boxes_on_target = self._count_boxes_on_target()
        return obs

    def _compute_min_box_target_distance(self):
        room = self.env.unwrapped.room_state

        boxes = np.argwhere((room == 3) | (room == 4))

        targets = np.argwhere((room == 2) | (room == 3))

        if len(boxes) == 0 or len(targets) == 0:
            return 0

        #manhattan distance
        total_min_dist = 0
        for box in boxes:
            distances = np.abs(targets - box).sum(axis=1)
            min_dist = distances.min()
            total_min_dist += min_dist

        return total_min_dist

    def _count_boxes_on_target(self):
        room = self.env.unwrapped.room_state
        return np.sum(room == 3)

    def step(self, action):
        obs, reward, done, info = self.env.step(action)

        shaped_reward = reward

        current_min_distance = self._compute_min_box_target_distance()
        if self.previous_min_distance is not None:
            distance_change = self.previous_min_distance - current_min_distance
            if distance_change > 0:  # Moved closer
                shaped_reward += 0.1 * distance_change
            elif distance_change < 0:  # Moved away
                shaped_reward += 0.1 * distance_change  # Small penalty

        self.previous_min_distance = current_min_distance

        current_boxes_on_target = self._count_boxes_on_target()
        if current_boxes_on_target > self.boxes_on_target:
            shaped_reward += 0.5 * (current_boxes_on_target - self.boxes_on_target)

        self.boxes_on_target = current_boxes_on_target
        
        if done and not info.get('all_boxes_on_target', False):
            if hasattr(self.env.unwrapped, 'num_env_steps'):
                if self.env.unwrapped.num_env_steps >= self.max_steps - 1:
                    shaped_reward -= 3.0  # Timeout penalty

        return obs, shaped_reward, done, info

NumPy version: 1.26.4
Compatibility patch applied ✓
Using device: cpu


  if not hasattr(np, 'bool8'):


## 2. Actor-Critic Neural Network Architecture

CNN-based architecture for PPO agent:
- **Actor head**: Outputs action probabilities (policy)
- **Critic head**: Outputs state value estimates

Features:
- 3 convolutional layers with layer normalization
- Orthogonal weight initialization for stable training
- Separate heads for policy and value function


In [41]:
class ActorCritic(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(ActorCritic, self).__init__()
        
        # Convolutional layers with Layer Normalization for stability
        self.conv1 = nn.Conv2d(input_shape[0], 32, kernel_size=3, stride=2, padding=1)
        self.ln1 = nn.LayerNorm([32, input_shape[1]//2, input_shape[2]//2])
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.ln2 = nn.LayerNorm([64, input_shape[1]//4, input_shape[2]//4])
        
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1)
        self.ln3 = nn.LayerNorm([64, input_shape[1]//8, input_shape[2]//8])
        
        conv_out_size = self._get_conv_out(input_shape)
        
        # Actor head (policy) with smaller hidden layer
        self.actor = nn.Sequential(
            nn.Linear(conv_out_size, 128),  # Reduced from 256
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )
        
        # Critic head (value function) with smaller hidden layer
        self.critic = nn.Sequential(
            nn.Linear(conv_out_size, 128),  # Reduced from 256
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        
        # Orthogonal initialization for better gradient flow
        self._initialize_weights()
    
    def _initialize_weights(self):
        """Initialize weights using orthogonal initialization"""
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.orthogonal_(m.weight, gain=np.sqrt(2))
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.orthogonal_(m.weight, gain=np.sqrt(2))
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def _get_conv_out(self, shape):
        o = self._forward_conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))
    
    def _forward_conv(self, x):
        """Forward pass through convolutional layers"""
        x = torch.relu(self.ln1(self.conv1(x)))
        x = torch.relu(self.ln2(self.conv2(x)))
        x = torch.relu(self.ln3(self.conv3(x)))
        return x
    
    def forward(self, x):
        conv_out = self._forward_conv(x).view(x.size()[0], -1)
        return self.actor(conv_out), self.critic(conv_out)
    
    def get_action_probs(self, x):
        logits, _ = self.forward(x)
        return torch.softmax(logits, dim=-1)
    
    def get_value(self, x):
        _, value = self.forward(x)
        return value

## 3. PPO (Proximal Policy Optimization) Agent

Main reinforcement learning agent implementing PPO algorithm.

**Key features:**
- Generalized Advantage Estimation (GAE)
- Clipped surrogate objective
- Value function clipping
- Learning rate warmup
- Entropy regularization for exploration

**Hyperparameters:** lr=3e-4, gamma=0.99, eps_clip=0.2, K_epochs=4


In [42]:
class PPOAgent:
    def __init__(self, env, lr=3e-4, gamma=0.99, eps_clip=0.2, K_epochs=4, gae_lambda=0.95, 
                 entropy_coef=0.05, value_clip=0.2, warmup_steps=10, advantage_clip=10.0):
        self.env = env
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.K_epochs = K_epochs
        self.gae_lambda = gae_lambda
        self.entropy_coef = entropy_coef
        self.value_clip = value_clip
        self.advantage_clip = advantage_clip
        
        # Learning rate warmup
        self.base_lr = lr
        self.warmup_steps = warmup_steps
        self.current_update = 0
        
        # Get observation shape
        obs = env.reset()
        if len(obs.shape) == 3:
            obs = np.transpose(obs, (2, 0, 1))
        
        self.input_shape = obs.shape
        self.n_actions = env.action_space.n
        
        self.device = torch.device("cpu")
        self.policy = ActorCritic(self.input_shape, self.n_actions).to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        
        self.policy_old = ActorCritic(self.input_shape, self.n_actions).to(self.device)
        self.policy_old.load_state_dict(self.policy.state_dict())
        
        self.MseLoss = nn.MSELoss()
    
    def _get_current_lr(self):
        """Get learning rate with warmup schedule"""
        if self.current_update < self.warmup_steps:
            # Linear warmup from 0 to base_lr
            return self.base_lr * (self.current_update + 1) / self.warmup_steps
        else:
            return self.base_lr
    
    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            action_probs = self.policy_old.get_action_probs(state)
        
        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()
        action_logprob = dist.log_prob(action)
        
        return action.item(), action_logprob.item()
    
    def compute_gae(self, rewards, values, dones):
        advantages = []
        gae = 0
        
        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]
            
            delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * gae
            advantages.insert(0, gae)
        
        return advantages
    
    def update(self, memory):
        self.current_update += 1
        current_lr = self._get_current_lr()
        
        # Update learning rate
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = current_lr
        
        states = torch.FloatTensor(np.array(memory['states'])).to(self.device)
        actions = torch.LongTensor(memory['actions']).to(self.device)
        old_logprobs = torch.FloatTensor(memory['logprobs']).to(self.device)
        
        rewards = memory['rewards']
        dones = memory['dones']
        
        # Compute values and advantages
        with torch.no_grad():
            old_values = self.policy_old.get_value(states).squeeze().cpu().numpy()
        
        advantages = self.compute_gae(rewards, old_values, dones)
        advantages_tensor = torch.FloatTensor(advantages).to(self.device)
        
        # Store raw advantage statistics before clipping/normalization
        raw_adv_mean = advantages_tensor.mean().item()
        raw_adv_std = advantages_tensor.std().item()
        raw_adv_max = advantages_tensor.max().item()
        raw_adv_min = advantages_tensor.min().item()
        
        # CLIP advantages to prevent extreme values
        advantages_tensor = torch.clamp(advantages_tensor, -self.advantage_clip, self.advantage_clip)
        
        # Normalize advantages
        advantages_tensor = (advantages_tensor - advantages_tensor.mean()) / (advantages_tensor.std() + 1e-8)
        
        returns = advantages_tensor + torch.FloatTensor(old_values).to(self.device)
        old_values_tensor = torch.FloatTensor(old_values).to(self.device)
        
        # Optimize policy for K epochs
        total_grad_norm = 0.0
        for _ in range(self.K_epochs):
            logits, state_values = self.policy(states)
            dist = torch.distributions.Categorical(logits=logits)
            action_logprobs = dist.log_prob(actions)
            dist_entropy = dist.entropy()
            
            ratios = torch.exp(action_logprobs - old_logprobs)
            
            surr1 = ratios * advantages_tensor
            surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages_tensor
            
            actor_loss = -torch.min(surr1, surr2).mean()
            
            # Value function loss with clipping
            state_values_squeeze = state_values.squeeze()
            value_pred_clipped = old_values_tensor + torch.clamp(
                state_values_squeeze - old_values_tensor,
                -self.value_clip,
                self.value_clip
            )
            value_loss1 = self.MseLoss(state_values_squeeze, returns)
            value_loss2 = self.MseLoss(value_pred_clipped, returns)
            critic_loss = torch.max(value_loss1, value_loss2)
            
            entropy_loss = -self.entropy_coef * dist_entropy.mean()
            
            loss = actor_loss + 0.5 * critic_loss + entropy_loss
            
            self.optimizer.zero_grad()
            loss.backward()
            grad_norm = torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
            total_grad_norm += grad_norm.item()
            self.optimizer.step()
        
        avg_grad_norm = total_grad_norm / self.K_epochs
        
        # Calculate policy ratio statistics
        with torch.no_grad():
            final_logits, _ = self.policy(states)
            final_dist = torch.distributions.Categorical(logits=final_logits)
            final_logprobs = final_dist.log_prob(actions)
            final_ratios = torch.exp(final_logprobs - old_logprobs)
            
            ratio_mean = final_ratios.mean().item()
            ratio_std = final_ratios.std().item()
            ratio_max = final_ratios.max().item()
            ratio_min = final_ratios.min().item()
        
        self.policy_old.load_state_dict(self.policy.state_dict())
        
        # Return comprehensive metrics
        metrics = {
            'actor_loss': actor_loss.item(),
            'critic_loss': critic_loss.item(),
            'entropy': -entropy_loss.item() / self.entropy_coef,
            'grad_norm': avg_grad_norm,
            'advantage_mean': raw_adv_mean,
            'advantage_std': raw_adv_std,
            'advantage_max': raw_adv_max,
            'advantage_min': raw_adv_min,
            'ratio_mean': ratio_mean,
            'ratio_std': ratio_std,
            'ratio_max': ratio_max,
            'ratio_min': ratio_min,
            'value_mean': np.mean(old_values),
            'value_std': np.std(old_values),
            'learning_rate': current_lr,
        }
        
        return metrics
    
    def save(self, path):
        torch.save(self.policy.state_dict(), path)
    
    def load(self, path):
        self.policy.load_state_dict(torch.load(path, weights_only=False))
        self.policy_old.load_state_dict(torch.load(path, weights_only=False))

## 4. Training Function (For Training from Scratch)

Training features:
- Reward shaping for sparse reward environments
- Checkpoint saving every 100 episodes
- Detailed logging to logs/ directory
- Resume training from checkpoint support


In [43]:
def train(env_name='Sokoban-v0', max_episodes=10000, max_timesteps=300, update_timestep=2048, save_freq=100, 
          resume_from_checkpoint=None, start_episode=1):
    import datetime
    
    # Enable reward shaping for sparse reward exploration
    env = gym.make(env_name)
    env = SokobanRewardShaper(env)  # Enable reward shaping
    
    agent = PPOAgent(env)
    
    # Load checkpoint if provided
    if resume_from_checkpoint is not None:
        print(f"\n{'='*100}")
        print(f"RESUMING FROM CHECKPOINT: {resume_from_checkpoint}")
        print(f"Starting from episode: {start_episode}")
        print(f"{'='*100}\n")
        agent.load(resume_from_checkpoint)
    
    os.makedirs('checkpoints', exist_ok=True)
    os.makedirs('logs', exist_ok=True)
    
    # Create log file with timestamp
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    if resume_from_checkpoint:
        log_file = f'logs/training_log_resumed_{timestamp}.txt'
    else:
        log_file = f'logs/training_log_{timestamp}.txt'
    
    # Write header to log file
    with open(log_file, 'w') as f:
        f.write("=" * 100 + "\n")
        if resume_from_checkpoint:
            f.write(f"SOKOBAN PPO TRAINING LOG (RESUMED) - Started at {datetime.datetime.now()}\n")
            f.write(f"Resumed from checkpoint: {resume_from_checkpoint}\n")
            f.write(f"Starting episode: {start_episode}\n")
        else:
            f.write(f"SOKOBAN PPO TRAINING LOG (WITH REWARD SHAPING) - Started at {datetime.datetime.now()}\n")
        f.write("=" * 100 + "\n")
        f.write(f"Environment: {env_name} (WITH REWARD SHAPING)\n")
        f.write(f"Max Episodes: {max_episodes}\n")
        f.write(f"Max Timesteps per Episode: {max_timesteps}\n")
        f.write(f"Update Timestep: {update_timestep}\n")
        f.write(f"Save Frequency: {save_freq}\n")
        f.write("=" * 100 + "\n\n")
        f.write("HYPERPARAMETER IMPROVEMENTS + REWARD SHAPING:\n")
        f.write("1. REWARD SHAPING ENABLED: Intermediate rewards for box movements\n")
        f.write("   - +0.5 for each unit a box moves closer to targets\n")
        f.write("   - +2.0 for placing a box on a target\n")
        f.write("   - -0.2 for moving a box away from targets\n")
        f.write("2. Layer Normalization: Stabilizes network activations\n")
        f.write("3. Orthogonal Initialization: Better gradient flow\n")
        f.write("4. Value Function Clipping: Prevents critic divergence\n")
        f.write("5. Learning Rate Warmup: Gradual increase (10 updates)\n")
        f.write("6. Advantage Clipping: Prevents extreme advantage values\n")
        f.write("7. IMPROVED Hyperparameters:\n")
        f.write("   - lr=3e-4 (standard PPO learning rate)\n")
        f.write("   - entropy_coef=0.05 (exploration)\n")
        f.write("   - grad_clip=0.5 (stable gradients)\n")
        f.write("   - K_epochs=4\n")
        f.write("   - warmup_steps=10\n")
        f.write("=" * 100 + "\n\n")
        f.write("METRICS EXPLANATION:\n")
        f.write("- Episode: Episode number\n")
        f.write("- Reward: Total reward for this episode (WITH SHAPING BONUSES)\n")
        f.write("- Running Reward: Exponential moving average of rewards\n")
        f.write("- Steps: Number of steps taken in this episode\n")
        f.write("- Timestep: Total timesteps so far\n")
        f.write("- Actor Loss: Policy improvement metric\n")
        f.write("- Critic Loss: Value estimation error\n")
        f.write("- Entropy: Action randomness (target: 0.5-2.0)\n")
        f.write("- Grad Norm: Gradient magnitude\n")
        f.write("- Learning Rate: Current LR with warmup\n")
        f.write("- Ratio Mean: Policy change (should stay near 1.0)\n")
        f.write("=" * 100 + "\n\n")
    
    print(f"Logging to: {log_file}\n")
    print("REWARD SHAPING ENABLED:")
    print("  - +0.5 per unit boxes move closer to targets")
    print("  - +2.0 for placing box on target")
    print("  - -0.2 per unit boxes move away from targets")
    print("\nIMPROVED HYPERPARAMETERS ACTIVE:")
    print("  - Layer normalization + Orthogonal init")
    print("  - Value function clipping + Advantage clipping (±10)")
    print("  - Learning rate warmup (10 updates)")
    print("  - lr=3e-4, entropy=0.05, grad_clip=0.5, K_epochs=4\n")
    
    episode_rewards = []
    episode_steps = []
    running_reward = 0
    timestep = 0
    
    # Track latest update metrics
    latest_metrics = None
    
    memory = {
        'states': [],
        'actions': [],
        'logprobs': [],
        'rewards': [],
        'dones': []
    }
    
    for episode in range(start_episode, max_episodes + 1):
        state = env.reset()
        if len(state.shape) == 3:
            state = np.transpose(state, (2, 0, 1))
        
        episode_reward = 0
        
        for t in range(max_timesteps):
            timestep += 1
            
            action, action_logprob = agent.select_action(state)
            next_state, reward, done, info = env.step(action)
            
            if len(next_state.shape) == 3:
                next_state = np.transpose(next_state, (2, 0, 1))
            
            memory['states'].append(state)
            memory['actions'].append(action)
            memory['logprobs'].append(action_logprob)
            memory['rewards'].append(reward)
            memory['dones'].append(done)
            
            state = next_state
            episode_reward += reward
            
            if timestep % update_timestep == 0:
                latest_metrics = agent.update(memory)
                memory = {
                    'states': [],
                    'actions': [],
                    'logprobs': [],
                    'rewards': [],
                    'dones': []
                }
                print(f"[UPDATE {agent.current_update}] Timestep {timestep} - "
                      f"LR: {latest_metrics['learning_rate']:.2e}, "
                      f"Actor: {latest_metrics['actor_loss']:.4f}, "
                      f"Critic: {latest_metrics['critic_loss']:.4f}, "
                      f"Entropy: {latest_metrics['entropy']:.4f}, "
                      f"GradNorm: {latest_metrics['grad_norm']:.4f}")
            
            if done:
                break
        
        episode_rewards.append(episode_reward)
        episode_steps.append(t + 1)
        running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward
        
        # Console output
        print(f"Episode {episode:5d} | Reward: {episode_reward:7.2f} | "
              f"Running: {running_reward:7.2f} | Steps: {t+1:3d}")
        
        # Write to log file after EVERY episode
        with open(log_file, 'a') as f:
            f.write(f"\n{'='*100}\n")
            f.write(f"EPISODE {episode} (Timestep: {timestep})\n")
            f.write(f"{'='*100}\n")
            f.write(f"  Reward:          {episode_reward:10.4f}\n")
            f.write(f"  Running Reward:  {running_reward:10.4f}\n")
            f.write(f"  Steps:           {t+1:10d}\n")
            f.write(f"  Total Timestep:  {timestep:10d}\n")
            
            # Add update metrics if available
            if latest_metrics is not None:
                f.write(f"\n  --- Latest Update Metrics (Update #{agent.current_update}) ---\n")
                f.write(f"  Learning Rate:   {latest_metrics['learning_rate']:10.8f}  (with warmup)\n")
                f.write(f"  Actor Loss:      {latest_metrics['actor_loss']:10.6f}\n")
                f.write(f"  Critic Loss:     {latest_metrics['critic_loss']:10.6f}\n")
                f.write(f"  Entropy:         {latest_metrics['entropy']:10.6f}\n")
                f.write(f"  Grad Norm:       {latest_metrics['grad_norm']:10.6f}\n")
                f.write(f"  \n")
                f.write(f"  Advantage Mean:  {latest_metrics['advantage_mean']:10.6f}\n")
                f.write(f"  Advantage Std:   {latest_metrics['advantage_std']:10.6f}\n")
                f.write(f"  Advantage Max:   {latest_metrics['advantage_max']:10.6f}\n")
                f.write(f"  Advantage Min:   {latest_metrics['advantage_min']:10.6f}\n")
                f.write(f"  \n")
                f.write(f"  Ratio Mean:      {latest_metrics['ratio_mean']:10.6f}\n")
                f.write(f"  Ratio Std:       {latest_metrics['ratio_std']:10.6f}\n")
                f.write(f"  Ratio Max:       {latest_metrics['ratio_max']:10.6f}\n")
                f.write(f"  Ratio Min:       {latest_metrics['ratio_min']:10.6f}\n")
                f.write(f"  \n")
                f.write(f"  Value Mean:      {latest_metrics['value_mean']:10.6f}\n")
                f.write(f"  Value Std:       {latest_metrics['value_std']:10.6f}\n")
        
        # Save checkpoints
        if episode % save_freq == 0:
            agent.save(f'checkpoints/ppo_sokoban_ep{episode}.pth')
            print(f"[CHECKPOINT] Model saved at episode {episode}")
            
            with open(log_file, 'a') as f:
                f.write(f"\n  >>> CHECKPOINT SAVED: checkpoints/ppo_sokoban_ep{episode}.pth\n")
    
    env.close()
    
    # Final summary
    with open(log_file, 'a') as f:
        f.write(f"\n\n{'='*100}\n")
        f.write(f"TRAINING COMPLETED - {datetime.datetime.now()}\n")
        f.write(f"{'='*100}\n")
        f.write(f"Total Episodes:       {max_episodes - start_episode + 1}\n")
        f.write(f"Total Timesteps:      {timestep}\n")
        f.write(f"Final Running Reward: {running_reward:.4f}\n")
        if episode_rewards:
            f.write(f"Best Episode Reward:  {max(episode_rewards):.4f} (Episode {episode_rewards.index(max(episode_rewards)) + start_episode})\n")
            f.write(f"Worst Episode Reward: {min(episode_rewards):.4f} (Episode {episode_rewards.index(min(episode_rewards)) + start_episode})\n")
            f.write(f"Average Reward:       {np.mean(episode_rewards):.4f}\n")
            f.write(f"Average Steps:        {np.mean(episode_steps):.2f}\n")
        f.write(f"{'='*100}\n")
    
    print(f"\nTraining complete! Log saved to: {log_file}")
    
    return episode_rewards

## 5.Resume Training from Checkpoint

This cell will take hours to run.

For quick evaluation, skip to **Section 7** instead.


In [44]:
# Resume training from ep3000 checkpoint on Sokoban-small-v0
# Remove optional path for training from scratch

episode_rewards = train(
    env_name='Sokoban-small-v0',
    max_episodes=10000,              # Continue training until episode 10000
    max_timesteps=150,               # Shorter episodes for smaller puzzles
    update_timestep=2048,
    save_freq=100,                   # Save checkpoint every 100 episodes
    resume_from_checkpoint=CHECKPOINT_PATH,  # Path to uploaded checkpoint
    start_episode=3001               # Continue from episode 3001
)

  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(



RESUMING FROM CHECKPOINT: google_colab_checkpoints/sokoban-small-v0/ppo_sokoban_ep3000.pth
Starting from episode: 3001

Logging to: logs/training_log_resumed_20251212_144114.txt

REWARD SHAPING ENABLED:
  - +0.5 per unit boxes move closer to targets
  - +2.0 for placing box on target
  - -0.2 per unit boxes move away from targets

IMPROVED HYPERPARAMETERS ACTIVE:
  - Layer normalization + Orthogonal init
  - Value function clipping + Advantage clipping (±10)
  - Learning rate warmup (10 updates)
  - lr=3e-4, entropy=0.05, grad_clip=0.5, K_epochs=4



  logger.deprecation(
  if not isinstance(done, (bool, np.bool8)):


Episode  3001 | Reward:  -14.50 | Running:   -0.72 | Steps: 150


KeyboardInterrupt: 

## 6. Evaluation Functions

Multiple evaluation options with different levels of detail:
- **quick_evaluation**: 10 episodes, ~30 seconds (used in demo)
- **test_agent**: 100 episodes, ~5 minutes
- **comprehensive_evaluation**: 1000 episodes with graphs, ~30 minutes

### Test Agent Function (100 Episodes)


In [47]:
def test_agent(checkpoint_path, env_name='Sokoban-small-v0', num_episodes=100, max_steps=200, render=False):
    """
    Test a trained agent without reward shaping.
    
    Args:
        checkpoint_path: Path to the checkpoint file
        env_name: Environment to test on
        num_episodes: Number of episodes to test
        max_steps: Maximum steps per episode (default: 200)
        render: Whether to render the environment
    
    Returns:
        List of episode rewards (base Sokoban rewards without shaping)
    """
    # Create environment WITHOUT reward shaping for true performance
    env = gym.make(env_name)
    agent = PPOAgent(env)
    agent.load(checkpoint_path)
    
    total_rewards = []
    success_count = 0
    timeout_count = 0
    
    print(f"Testing agent for {num_episodes} episodes on {env_name} (WITHOUT reward shaping)...\n")
    
    for episode in range(num_episodes):
        state = env.reset()
        if len(state.shape) == 3:
            state = np.transpose(state, (2, 0, 1))
        
        episode_reward = 0
        done = False
        steps = 0
        
        while not done and steps < max_steps:
            if render:
                env.render()
            
            action, _ = agent.select_action(state)
            next_state, reward, done, info = env.step(action)
            
            if len(next_state.shape) == 3:
                next_state = np.transpose(next_state, (2, 0, 1))
            
            state = next_state
            episode_reward += reward
            steps += 1
        
        # Check success/failure status
        success = done and info.get('all_boxes_on_target', False)
        timeout = steps >= max_steps
        
        if success:
            success_count += 1
            status = "✓ PASS"
        else:
            status = "✗ FAIL"
        
        if timeout:
            timeout_count += 1
        
        total_rewards.append(episode_reward)
        
        # Print every episode with pass/fail status
        print(f"Episode {episode + 1:3d}/{num_episodes} | Reward: {episode_reward:7.2f} | Steps: {steps:3d} | {status} | {'TIMEOUT' if timeout else ''}")
    
    env.close()
    
    success_rate = (success_count / num_episodes) * 100
    timeout_rate = (timeout_count / num_episodes) * 100
    
    print(f"\n{'='*80}")
    print(f"FINAL RESULTS:")
    print(f"  Total Episodes:  {num_episodes}")
    print(f"  Passed:          {success_count} ({success_rate:.1f}%)")
    print(f"  Failed:          {num_episodes - success_count} ({100 - success_rate:.1f}%)")
    print(f"  Timeouts:        {timeout_count} ({timeout_rate:.1f}%)")
    print(f"  Average Reward:  {np.mean(total_rewards):.2f}")
    print(f"{'='*80}")
    
    return total_rewards

### Comprehensive Evaluation (1000 Episodes with Graphs)

Takes a while to run (1000 episodes). For quick demo, use quick_evaluation instead.


In [60]:
import time
import matplotlib.pyplot as plt
import numpy as np

def comprehensive_evaluation(checkpoint_path, env_name='Sokoban-small-v0', num_episodes=1000, max_steps=200, verbose=True):
    """
    Comprehensive evaluation with detailed metrics and timing.
    
    Args:
        checkpoint_path: Path to checkpoint file
        env_name: Environment name
        num_episodes: Number of test episodes
        max_steps: Max steps per episode
        verbose: If True, print each episode result
    """
    # Create environment WITHOUT reward shaping for true performance
    env = gym.make(env_name)
    agent = PPOAgent(env)
    agent.load(checkpoint_path)
    
    # Metrics storage
    episode_rewards = []
    episode_steps = []
    episode_times = []
    success_flags = []
    timeout_flags = []
    
    print(f"{'='*80}")
    print(f"COMPREHENSIVE EVALUATION")
    print(f"{'='*80}")
    print(f"Checkpoint:    {checkpoint_path}")
    print(f"Environment:   {env_name}")
    print(f"Episodes:      {num_episodes}")
    print(f"Max Steps:     {max_steps}")
    print(f"{'='*80}\n")
    
    start_total = time.time()
    
    for episode in range(num_episodes):
        state = env.reset()
        if len(state.shape) == 3:
            state = np.transpose(state, (2, 0, 1))
        
        episode_reward = 0
        done = False
        steps = 0
        
        episode_start = time.time()
        
        while not done and steps < max_steps:
            action, _ = agent.select_action(state)
            next_state, reward, done, info = env.step(action)
            
            if len(next_state.shape) == 3:
                next_state = np.transpose(next_state, (2, 0, 1))
            
            state = next_state
            episode_reward += reward
            steps += 1
        
        episode_time = time.time() - episode_start
        
        # Check success/failure
        success = done and info.get('all_boxes_on_target', False)
        timeout = steps >= max_steps
        
        # Record metrics
        episode_rewards.append(episode_reward)
        episode_steps.append(steps)
        episode_times.append(episode_time)
        success_flags.append(1 if success else 0)
        timeout_flags.append(1 if timeout else 0)
        
        # Print individual episode result if verbose
        if verbose:
            status = "✓ PASS" if success else "✗ FAIL"
            timeout_marker = "TIMEOUT" if timeout else ""
            print(f"Episode {episode + 1:4d}/{num_episodes} | "
                  f"Reward: {episode_reward:7.2f} | "
                  f"Steps: {steps:3d} | "
                  f"{status:7s} | "
                  f"{timeout_marker:7s}")
        
        # Progress summary every 100 episodes (even if not verbose)
        if (episode + 1) % 100 == 0:
            current_success_rate = sum(success_flags) / len(success_flags) * 100
            avg_reward = np.mean(episode_rewards)
            avg_steps = np.mean(episode_steps)
            avg_time = np.mean(episode_times)
            
            print(f"\n{'='*80}")
            print(f"PROGRESS UPDATE - Episode {episode + 1}/{num_episodes}")
            print(f"{'='*80}")
            print(f"  Success Rate:       {current_success_rate:6.2f}%")
            print(f"  Avg Reward:         {avg_reward:7.2f}")
            print(f"  Avg Steps:          {avg_steps:6.1f}")
            print(f"  Avg Episode Time:   {avg_time:7.4f}s")
            print(f"{'='*80}\n")
    
    total_time = time.time() - start_total
    env.close()
    
    # Calculate comprehensive statistics
    success_rate = sum(success_flags) / num_episodes * 100
    timeout_rate = sum(timeout_flags) / num_episodes * 100
    
    success_episodes = [i for i in range(num_episodes) if success_flags[i] == 1]
    failure_episodes = [i for i in range(num_episodes) if success_flags[i] == 0]
    
    success_rewards = [episode_rewards[i] for i in success_episodes]
    failure_rewards = [episode_rewards[i] for i in failure_episodes]
    
    success_steps_list = [episode_steps[i] for i in success_episodes]
    failure_steps_list = [episode_steps[i] for i in failure_episodes]
    
    # Print comprehensive data overview
    print("\n" + "=" * 80)
    print("FINAL EVALUATION SUMMARY")
    print("=" * 80)
    print(f"\n{'OVERALL PERFORMANCE':^80}")
    print("-" * 80)
    print(f"  Total Episodes:          {num_episodes}")
    print(f"  Successful Episodes:     {len(success_episodes)} ({success_rate:.2f}%)")
    print(f"  Failed Episodes:         {len(failure_episodes)} ({100-success_rate:.2f}%)")
    print(f"  Timeout Episodes:        {sum(timeout_flags)} ({timeout_rate:.2f}%)")
    
    print(f"\n{'REWARD STATISTICS':^80}")
    print("-" * 80)
    print(f"  Overall:")
    print(f"    Mean:                  {np.mean(episode_rewards):7.2f}")
    print(f"    Median:                {np.median(episode_rewards):7.2f}")
    print(f"    Std Dev:               {np.std(episode_rewards):7.2f}")
    print(f"    Min:                   {min(episode_rewards):7.2f}")
    print(f"    Max:                   {max(episode_rewards):7.2f}")
    
    if success_rewards:
        print(f"  Success Episodes:")
        print(f"    Mean:                  {np.mean(success_rewards):7.2f}")
        print(f"    Median:                {np.median(success_rewards):7.2f}")
        print(f"    Std Dev:               {np.std(success_rewards):7.2f}")
    
    if failure_rewards:
        print(f"  Failure Episodes:")
        print(f"    Mean:                  {np.mean(failure_rewards):7.2f}")
        print(f"    Median:                {np.median(failure_rewards):7.2f}")
        print(f"    Std Dev:               {np.std(failure_rewards):7.2f}")
    
    print(f"\n{'EPISODE DURATION (STEPS)':^80}")
    print("-" * 80)
    print(f"  Overall:")
    print(f"    Mean:                  {np.mean(episode_steps):7.1f}")
    print(f"    Median:                {np.median(episode_steps):7.1f}")
    print(f"    Min:                   {min(episode_steps):7d}")
    print(f"    Max:                   {max(episode_steps):7d}")
    
    if success_steps_list:
        print(f"  Success Episodes:")
        print(f"    Mean:                  {np.mean(success_steps_list):7.1f}")
        print(f"    Median:                {np.median(success_steps_list):7.1f}")
        print(f"    Min:                   {min(success_steps_list):7d}")
        print(f"    Max:                   {max(success_steps_list):7d}")
    
    if failure_steps_list:
        print(f"  Failure Episodes:")
        print(f"    Mean:                  {np.mean(failure_steps_list):7.1f}")
        print(f"    Median:                {np.median(failure_steps_list):7.1f}")
    
    print(f"\n{'TIMING STATISTICS':^80}")
    print("-" * 80)
    print(f"  Avg Episode Time:        {np.mean(episode_times):7.4f}s")
    print(f"  Total Evaluation Time:   {total_time:7.2f}s")
    print(f"  Episodes per Second:     {num_episodes/total_time:7.2f}")
    
    print(f"\n{'QUARTILE ANALYSIS':^80}")
    print("-" * 80)
    q1, q2, q3 = np.percentile(episode_rewards, [25, 50, 75])
    print(f"  25th Percentile (Q1):    {q1:7.2f}")
    print(f"  50th Percentile (Q2):    {q2:7.2f}")
    print(f"  75th Percentile (Q3):    {q3:7.2f}")
    print(f"  IQR (Q3 - Q1):           {q3-q1:7.2f}")
    
    print("=" * 80)
    
    return {
        'rewards': episode_rewards,
        'steps': episode_steps,
        'times': episode_times,
        'success': success_flags,
        'timeout': timeout_flags,
        'success_rate': success_rate,
        'total_time': total_time
    }


def plot_individual_graphs(results, checkpoint_name='ep3000'):
    """
    Create 5 separate, presentation-quality graphs for Google Slides.
    Each graph is displayed individually for easy copying.
    """
    rewards = results['rewards']
    steps = results['steps']
    success = results['success']
    times = results['times']
    
    window = 50
    
    print("\n" + "="*80)
    print("GENERATING PRESENTATION GRAPHS")
    print("="*80)
    print("Each graph will be displayed separately for easy copying to slides.")
    print("="*80 + "\n")
    
    # ========== GRAPH 1: Average Reward & Success Rate vs Episodes (NEW!) ==========
    print("Graph 1/5: Average Reward & Success Rate vs Episodes")
    
    fig, ax1 = plt.subplots(figsize=(12, 7))
    
    # Calculate rolling averages
    rolling_rewards = np.convolve(rewards, np.ones(window)/window, mode='valid')
    rolling_success = np.convolve(success, np.ones(window)/window, mode='valid') * 100
    
    # Plot reward curve
    color1 = 'tab:blue'
    ax1.set_xlabel('Episode', fontsize=14, fontweight='bold')
    ax1.set_ylabel('Average Reward (50-ep window)', color=color1, fontsize=14, fontweight='bold')
    line1 = ax1.plot(range(window-1, len(rewards)), rolling_rewards, color=color1, 
                     linewidth=2.5, label='Avg Reward', alpha=0.9)
    ax1.tick_params(axis='y', labelcolor=color1, labelsize=12)
    ax1.grid(True, alpha=0.4, linestyle='--')
    ax1.set_xlim([0, len(rewards)])
    
    # Plot success rate on secondary axis
    ax2 = ax1.twinx()
    color2 = 'tab:orange'
    ax2.set_ylabel('Success Rate % (50-ep window)', color=color2, fontsize=14, fontweight='bold')
    line2 = ax2.plot(range(window-1, len(success)), rolling_success, color=color2, 
                     linewidth=2.5, linestyle='--', label='Success Rate', alpha=0.9)
    ax2.tick_params(axis='y', labelcolor=color2, labelsize=12)
    ax2.set_xlim([0, len(success)])
    
    # Combined legend
    lines = line1 + line2
    labels = [l.get_label() for l in lines]
    ax1.legend(lines, labels, loc='upper left', fontsize=12, framealpha=0.9)
    
    plt.title(f'Agent Performance: Reward & Success Rate - {checkpoint_name}', 
              fontsize=16, fontweight='bold', pad=20)
    plt.tight_layout()
    plt.show()
    print()
    
    # ========== GRAPH 2: Average Reward Curve (50-ep Rolling Window) ==========
    print("Graph 2/5: Average Reward Curve")
    
    fig, ax = plt.subplots(figsize=(12, 7))
    
    rolling_rewards = np.convolve(rewards, np.ones(window)/window, mode='valid')
    
    ax.plot(range(window-1, len(rewards)), rolling_rewards, color='tab:blue', 
            linewidth=2.5, label=f'{window}-Episode Moving Average')
    ax.axhline(y=np.mean(rewards), color='red', linestyle='--', linewidth=2, 
               label=f'Overall Mean: {np.mean(rewards):.2f}', alpha=0.8)
    
    ax.set_xlabel('Episode', fontsize=14, fontweight='bold')
    ax.set_ylabel('Average Reward', fontsize=14, fontweight='bold')
    ax.set_title(f'Average Reward Over Time - {checkpoint_name}', 
                 fontsize=16, fontweight='bold', pad=20)
    ax.legend(fontsize=12, framealpha=0.9)
    ax.grid(True, alpha=0.4, linestyle='--')
    ax.tick_params(labelsize=12)
    
    plt.tight_layout()
    plt.show()
    print()
    
    # ========== GRAPH 3: Cumulative Success Rate ==========
    print("Graph 3/5: Cumulative Success Rate")
    
    fig, ax = plt.subplots(figsize=(12, 7))
    
    cumulative_success = np.cumsum(success) / np.arange(1, len(success) + 1) * 100
    
    ax.plot(cumulative_success, color='green', linewidth=3, alpha=0.9)
    ax.axhline(y=results['success_rate'], color='red', linestyle='--', linewidth=2.5, 
               label=f"Final Success Rate: {results['success_rate']:.1f}%", alpha=0.8)
    
    ax.set_xlabel('Episode', fontsize=14, fontweight='bold')
    ax.set_ylabel('Cumulative Success Rate (%)', fontsize=14, fontweight='bold')
    ax.set_title(f'Cumulative Success Rate - {checkpoint_name}', 
                 fontsize=16, fontweight='bold', pad=20)
    ax.grid(True, alpha=0.4, linestyle='--')
    ax.legend(fontsize=12, framealpha=0.9)
    ax.set_ylim([0, max(cumulative_success) * 1.1])
    ax.tick_params(labelsize=12)
    
    plt.tight_layout()
    plt.show()
    print()
    
    # ========== GRAPH 4: Average Episode Duration (Steps) ==========
    print("Graph 4/5: Average Episode Duration")
    
    fig, ax = plt.subplots(figsize=(12, 7))
    
    rolling_steps = np.convolve(steps, np.ones(window)/window, mode='valid')
    
    ax.plot(range(window-1, len(steps)), rolling_steps, color='purple', 
            linewidth=2.5, label=f'{window}-Episode Moving Average')
    ax.axhline(y=np.mean(steps), color='red', linestyle='--', linewidth=2, 
               label=f'Overall Mean: {np.mean(steps):.1f} steps', alpha=0.8)
    
    ax.set_xlabel('Episode', fontsize=14, fontweight='bold')
    ax.set_ylabel('Steps per Episode', fontsize=14, fontweight='bold')
    ax.set_title(f'Average Episode Duration - {checkpoint_name}', 
                 fontsize=16, fontweight='bold', pad=20)
    ax.grid(True, alpha=0.4, linestyle='--')
    ax.legend(fontsize=12, framealpha=0.9)
    ax.tick_params(labelsize=12)
    
    plt.tight_layout()
    plt.show()
    print()
    
    # ========== GRAPH 5: Performance Metrics Dashboard ==========
    print("Graph 5/5: Performance Metrics Dashboard")
    
    fig, ax = plt.subplots(figsize=(10, 8))
    ax.axis('off')
    
    # Create comprehensive metrics text
    metrics_text = f"""
    PERFORMANCE METRICS SUMMARY
    {'=' * 50}
    
    Checkpoint:             {checkpoint_name}
    Episodes Tested:        {len(rewards)}
    
    {'SUCCESS METRICS':^50}
    {'-' * 50}
    Success Rate:           {results['success_rate']:.2f}%
    Timeout Rate:           {sum(results['timeout'])/len(results['timeout'])*100:.2f}%
    
    {'REWARD STATISTICS':^50}
    {'-' * 50}
    Mean Reward:            {np.mean(rewards):7.2f}
    Median Reward:          {np.median(rewards):7.2f}
    Std Deviation:          {np.std(rewards):7.2f}
    Min Reward:             {min(rewards):7.2f}
    Max Reward:             {max(rewards):7.2f}
    
    {'EPISODE DURATION':^50}
    {'-' * 50}
    Mean Steps:             {np.mean(steps):7.1f}
    Median Steps:           {np.median(steps):7.1f}
    Min Steps:              {min(steps):7d}
    Max Steps:              {max(steps):7d}
    
    {'TIMING ANALYSIS':^50}
    {'-' * 50}
    Avg Episode Time:       {np.mean(times):7.4f}s
    Total Eval Time:        {results['total_time']:7.2f}s
    Episodes/Second:        {len(rewards)/results['total_time']:7.2f}
    
    {'QUARTILE ANALYSIS':^50}
    {'-' * 50}
    Q1 (25th percentile):   {np.percentile(rewards, 25):7.2f}
    Q2 (50th percentile):   {np.percentile(rewards, 50):7.2f}
    Q3 (75th percentile):   {np.percentile(rewards, 75):7.2f}
    IQR (Q3 - Q1):          {np.percentile(rewards, 75) - np.percentile(rewards, 25):7.2f}
    """
    
    ax.text(0.05, 0.95, metrics_text, transform=ax.transAxes,
            fontsize=13, verticalalignment='top', fontfamily='monospace',
            bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8, pad=1.5))
    
    plt.title(f'Evaluation Summary - {checkpoint_name}', 
              fontsize=16, fontweight='bold', pad=20)
    plt.tight_layout()
    plt.show()
    print()
    
    print("="*80)
    print("ALL GRAPHS GENERATED SUCCESSFULLY!")
    print("="*80)
    print("\nYou can now right-click each graph above to copy/save individually.")
    print("Perfect for adding to your Google Slides presentation!")
    print("="*80)


# Run comprehensive evaluation
print("Starting comprehensive evaluation...")
print("This will take several minutes for 1000 episodes.\n")

results = comprehensive_evaluation(
    CHECKPOINT_PATH, 
    env_name='Sokoban-small-v0', 
    num_episodes=1000, 
    max_steps=200,
    verbose=True  # Set to False to hide individual episode output
)

print("\n" + "="*80)
print("Generating presentation-ready graphs...")
print("="*80 + "\n")

plot_individual_graphs(results, checkpoint_name='ep3000')

Starting comprehensive evaluation...
This will take several minutes for 1000 episodes.

COMPREHENSIVE EVALUATION
Checkpoint:    google_colab_checkpoints/sokoban-small-v0/ppo_sokoban_ep3000.pth
Environment:   Sokoban-small-v0
Episodes:      1000
Max Steps:     200

Episode    1/1000 | Reward:  -19.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    2/1000 | Reward:    4.60 | Steps:  74 | ✓ PASS  |        


KeyboardInterrupt: 

### Quick Evaluation Function (10 Episodes)

Lightweight evaluation for quick testing:
- Runs 10 episodes (~30 seconds)
- No graphs (print output only)
- Shows success rate and performance metrics


In [63]:
import time
import matplotlib.pyplot as plt
import numpy as np

def quick_evaluation(checkpoint_path, env_name='Sokoban-small-v0', num_episodes=10, max_steps=200, verbose=True):
    """
    Comprehensive evaluation with detailed metrics and timing.
    
    Args:
        checkpoint_path: Path to checkpoint file
        env_name: Environment name
        num_episodes: Number of test episodes
        max_steps: Max steps per episode
        verbose: If True, print each episode result
    """
    # Create environment WITHOUT reward shaping for true performance
    env = gym.make(env_name)
    agent = PPOAgent(env)
    agent.load(checkpoint_path)
    
    # Metrics storage
    episode_rewards = []
    episode_steps = []
    episode_times = []
    success_flags = []
    timeout_flags = []
    
    print(f"{'='*80}")
    print(f"QUICK EVALUATION (10 Episodes)")
    print(f"{'='*80}")
    print(f"Checkpoint:    {checkpoint_path}")
    print(f"Environment:   {env_name}")
    print(f"Episodes:      {num_episodes}")
    print(f"Max Steps:     {max_steps}")
    print(f"{'='*80}\n")
    
    start_total = time.time()
    
    for episode in range(num_episodes):
        state = env.reset()
        if len(state.shape) == 3:
            state = np.transpose(state, (2, 0, 1))
        
        episode_reward = 0
        done = False
        steps = 0
        
        episode_start = time.time()
        
        while not done and steps < max_steps:
            action, _ = agent.select_action(state)
            next_state, reward, done, info = env.step(action)
            
            if len(next_state.shape) == 3:
                next_state = np.transpose(next_state, (2, 0, 1))
            
            state = next_state
            episode_reward += reward
            steps += 1
        
        episode_time = time.time() - episode_start
        
        # Check success/failure
        success = done and info.get('all_boxes_on_target', False)
        timeout = steps >= max_steps
        
        # Record metrics
        episode_rewards.append(episode_reward)
        episode_steps.append(steps)
        episode_times.append(episode_time)
        success_flags.append(1 if success else 0)
        timeout_flags.append(1 if timeout else 0)
        
        # Print individual episode result if verbose
        if verbose:
            status = "✓ PASS" if success else "✗ FAIL"
            timeout_marker = "TIMEOUT" if timeout else ""
            print(f"Episode {episode + 1:4d}/{num_episodes} | "
                  f"Reward: {episode_reward:7.2f} | "
                  f"Steps: {steps:3d} | "
                  f"{status:7s} | "
                  f"{timeout_marker:7s}")
        
        # Progress summary every 100 episodes (even if not verbose)
        if (episode + 1) % 100 == 0:
            current_success_rate = sum(success_flags) / len(success_flags) * 100
            avg_reward = np.mean(episode_rewards)
            avg_steps = np.mean(episode_steps)
            avg_time = np.mean(episode_times)
            
            print(f"\n{'='*80}")
            print(f"PROGRESS UPDATE - Episode {episode + 1}/{num_episodes}")
            print(f"{'='*80}")
            print(f"  Success Rate:       {current_success_rate:6.2f}%")
            print(f"  Avg Reward:         {avg_reward:7.2f}")
            print(f"  Avg Steps:          {avg_steps:6.1f}")
            print(f"  Avg Episode Time:   {avg_time:7.4f}s")
            print(f"{'='*80}\n")
    
    total_time = time.time() - start_total
    env.close()
    
    # Calculate comprehensive statistics
    success_rate = sum(success_flags) / num_episodes * 100
    timeout_rate = sum(timeout_flags) / num_episodes * 100
    
    success_episodes = [i for i in range(num_episodes) if success_flags[i] == 1]
    failure_episodes = [i for i in range(num_episodes) if success_flags[i] == 0]
    
    success_rewards = [episode_rewards[i] for i in success_episodes]
    failure_rewards = [episode_rewards[i] for i in failure_episodes]
    
    success_steps_list = [episode_steps[i] for i in success_episodes]
    failure_steps_list = [episode_steps[i] for i in failure_episodes]
    
    # Print comprehensive data overview
    print("\n" + "=" * 80)
    print("FINAL EVALUATION SUMMARY")
    print("=" * 80)
    print(f"\n{'OVERALL PERFORMANCE':^80}")
    print("-" * 80)
    print(f"  Total Episodes:          {num_episodes}")
    print(f"  Successful Episodes:     {len(success_episodes)} ({success_rate:.2f}%)")
    print(f"  Failed Episodes:         {len(failure_episodes)} ({100-success_rate:.2f}%)")
    print(f"  Timeout Episodes:        {sum(timeout_flags)} ({timeout_rate:.2f}%)")
    
    print(f"\n{'REWARD STATISTICS':^80}")
    print("-" * 80)
    print(f"  Overall:")
    print(f"    Mean:                  {np.mean(episode_rewards):7.2f}")
    print(f"    Median:                {np.median(episode_rewards):7.2f}")
    print(f"    Std Dev:               {np.std(episode_rewards):7.2f}")
    print(f"    Min:                   {min(episode_rewards):7.2f}")
    print(f"    Max:                   {max(episode_rewards):7.2f}")
    
    if success_rewards:
        print(f"  Success Episodes:")
        print(f"    Mean:                  {np.mean(success_rewards):7.2f}")
        print(f"    Median:                {np.median(success_rewards):7.2f}")
        print(f"    Std Dev:               {np.std(success_rewards):7.2f}")
    
    if failure_rewards:
        print(f"  Failure Episodes:")
        print(f"    Mean:                  {np.mean(failure_rewards):7.2f}")
        print(f"    Median:                {np.median(failure_rewards):7.2f}")
        print(f"    Std Dev:               {np.std(failure_rewards):7.2f}")
    
    print(f"\n{'EPISODE DURATION (STEPS)':^80}")
    print("-" * 80)
    print(f"  Overall:")
    print(f"    Mean:                  {np.mean(episode_steps):7.1f}")
    print(f"    Median:                {np.median(episode_steps):7.1f}")
    print(f"    Min:                   {min(episode_steps):7d}")
    print(f"    Max:                   {max(episode_steps):7d}")
    
    if success_steps_list:
        print(f"  Success Episodes:")
        print(f"    Mean:                  {np.mean(success_steps_list):7.1f}")
        print(f"    Median:                {np.median(success_steps_list):7.1f}")
        print(f"    Min:                   {min(success_steps_list):7d}")
        print(f"    Max:                   {max(success_steps_list):7d}")
    
    if failure_steps_list:
        print(f"  Failure Episodes:")
        print(f"    Mean:                  {np.mean(failure_steps_list):7.1f}")
        print(f"    Median:                {np.median(failure_steps_list):7.1f}")
    
    print(f"\n{'TIMING STATISTICS':^80}")
    print("-" * 80)
    print(f"  Avg Episode Time:        {np.mean(episode_times):7.4f}s")
    print(f"  Total Evaluation Time:   {total_time:7.2f}s")
    print(f"  Episodes per Second:     {num_episodes/total_time:7.2f}")
    
    print("=" * 80)

## 7. DEMO


In [64]:
# Run quick evaluation with 10 episodes
quick_evaluation(
    checkpoint_path=CHECKPOINT_PATH,
    env_name='Sokoban-small-v0',
    num_episodes=10,
    max_steps=200,
    verbose=True
)

QUICK EVALUATION (10 Episodes)
Checkpoint:    google_colab_checkpoints/sokoban-small-v0/ppo_sokoban_ep3000.pth
Environment:   Sokoban-small-v0
Episodes:      10
Max Steps:     200

Episode    1/10 | Reward:  -19.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    2/10 | Reward:    2.80 | Steps:  92 | ✓ PASS  |        
Episode    3/10 | Reward:  -20.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    4/10 | Reward:  -19.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    5/10 | Reward:  -20.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    6/10 | Reward:  -19.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    7/10 | Reward:  -20.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    8/10 | Reward:  -20.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode    9/10 | Reward:  -20.00 | Steps: 200 | ✗ FAIL  | TIMEOUT
Episode   10/10 | Reward:  -20.00 | Steps: 200 | ✗ FAIL  | TIMEOUT

FINAL EVALUATION SUMMARY

                              OVERALL PERFORMANCE                               
-----------------------------------------

In [65]:
def visualize_agent_pygame(checkpoint_path, env_name='Sokoban-small-v0', max_steps=200, fps=5, save_videos=True, output_dir='puzzle_videos', min_success_reward=10.0):
    """
    Visualize trained agent solving Sokoban puzzles using Pygame.
    Shows 2 episodes: 1 failure and 1 success (with reward > min_success_reward).
    Optionally saves videos to files.

    Args:
        checkpoint_path: Path to trained agent checkpoint
        env_name: Sokoban environment name
        max_steps: Maximum steps per episode
        fps: Frames per second for visualization and video
        save_videos: If True, save episodes as video files (MP4)
        output_dir: Directory to save videos
        min_success_reward: Minimum reward required for success episode (default: 10.0)
    """
    import pygame
    import time
    import imageio
    import os
    from datetime import datetime

    # Create output directory if saving videos
    if save_videos:
        os.makedirs(output_dir, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        print(f"Videos will be saved to: {output_dir}/")

    # Create environment WITHOUT reward shaping for true performance
    env = gym.make(env_name)
    agent = PPOAgent(env)
    agent.load(checkpoint_path)

    # Initialize Pygame
    pygame.init()

    # Get initial render to determine window size
    obs = env.reset()
    img = env.render(mode='rgb_array')
    scale = 3
    screen = pygame.display.set_mode((img.shape[1] * scale, img.shape[0] * scale))
    pygame.display.set_caption('Sokoban Agent Visualization - ep3000')
    clock = pygame.time.Clock()

    # Font for overlay text
    font = pygame.font.Font(None, 32)

    print("="*80)
    print("PYGAME VISUALIZATION STARTED")
    print("="*80)
    print(f"Looking for 1 failure and 1 success (reward > {min_success_reward}) to display...")
    print("Close window or press ESC to exit")
    print("="*80 + "\n")

    episodes_to_show = []
    target_outcomes = ['failure', 'success']
    attempt_count = 0
    max_attempts = 100  # Increased to 100 attempts to find good success episode

    # Collect episodes with desired outcomes
    while len(episodes_to_show) < 2 and attempt_count < max_attempts:
        attempt_count += 1

        state = env.reset()
        if len(state.shape) == 3:
            state = np.transpose(state, (2, 0, 1))

        episode_data = {
            'frames': [],
            'rewards': [],
            'steps': 0,
            'outcome': None,
            'total_reward': 0
        }

        done = False
        steps = 0
        episode_reward = 0

        # Run episode and collect frames
        while not done and steps < max_steps:
            # Capture frame
            frame = env.render(mode='rgb_array')
            episode_data['frames'].append(frame)

            # Agent takes action
            action, _ = agent.select_action(state)
            next_state, reward, done, info = env.step(action)

            if len(next_state.shape) == 3:
                next_state = np.transpose(next_state, (2, 0, 1))

            state = next_state
            episode_reward += reward
            steps += 1

            episode_data['rewards'].append(episode_reward)

        # Capture final frame
        final_frame = env.render(mode='rgb_array')
        episode_data['frames'].append(final_frame)
        episode_data['rewards'].append(episode_reward)

        # Determine outcome
        success = done and info.get('all_boxes_on_target', False)
        timeout = steps >= max_steps

        episode_data['steps'] = steps
        episode_data['total_reward'] = episode_reward
        episode_data['outcome'] = 'success' if success else 'failure'

        # Check if we need this outcome
        needed_outcome = target_outcomes[len(episodes_to_show)]

        # For success episodes, also check reward threshold
        if needed_outcome == 'success':
            if episode_data['outcome'] == 'success' and episode_reward > min_success_reward:
                episodes_to_show.append(episode_data)
                print(f"Found {episode_data['outcome']} episode (attempt #{attempt_count}): "
                      f"Reward={episode_reward:.2f}, Steps={steps} - ACCEPTED (reward > {min_success_reward})")
            elif episode_data['outcome'] == 'success':
                print(f"  Skipping success episode (attempt #{attempt_count}): "
                      f"Reward={episode_reward:.2f} - TOO LOW (need > {min_success_reward})")
        else:
            # For failure episodes, just match the outcome
            if episode_data['outcome'] == needed_outcome:
                episodes_to_show.append(episode_data)
                print(f"Found {episode_data['outcome']} episode (attempt #{attempt_count}): "
                      f"Reward={episode_reward:.2f}, Steps={steps}")

    if len(episodes_to_show) < 2:
        print(f"\nWarning: Could only find {len(episodes_to_show)} episodes after {max_attempts} attempts")
        print("Displaying what we found...")

    print(f"\n{'='*80}")
    print("STARTING VISUALIZATION")
    print(f"{'='*80}\n")

    # Display the collected episodes
    running = True
    for ep_idx, episode_data in enumerate(episodes_to_show):
        if not running:
            break

        episode_num = ep_idx + 1
        outcome = episode_data['outcome']
        total_steps = episode_data['steps']
        total_reward = episode_data['total_reward']

        print(f"Episode {episode_num}/2: {outcome.upper()} "
              f"(Reward: {total_reward:.2f}, Steps: {total_steps})")

        # Prepare video writer if saving
        video_frames = []
        if save_videos:
            video_filename = f"{output_dir}/episode_{episode_num}_{outcome}_{timestamp}.mp4"

        # Play through the episode frames
        for step_idx, frame in enumerate(episode_data['frames']):
            # Check for quit events
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False
                    break
                elif event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_ESCAPE:
                        running = False
                        break

            if not running:
                break

            # Render the game state
            surf = pygame.surfarray.make_surface(np.transpose(frame, (1, 0, 2)))
            surf = pygame.transform.scale(surf, (frame.shape[1] * scale, frame.shape[0] * scale))
            screen.blit(surf, (0, 0))

            # Draw semi-transparent overlay background for text
            overlay = pygame.Surface((screen.get_width(), 100))
            overlay.set_alpha(200)
            overlay.fill((0, 0, 0))
            screen.blit(overlay, (0, 0))

            # Prepare overlay text
            current_reward = episode_data['rewards'][step_idx] if step_idx < len(episode_data['rewards']) else total_reward
            status = "SUCCESS" if outcome == 'success' and step_idx == len(episode_data['frames']) - 1 else                      "FAILED" if outcome == 'failure' and step_idx == len(episode_data['frames']) - 1 else                      "IN PROGRESS"

            # Render text
            text_line1 = font.render(f"Episode: {episode_num}/2  |  Step: {step_idx}/{total_steps}", True, (255, 255, 255))
            text_line2 = font.render(f"Reward: {current_reward:.2f}  |  Status: {status}", True, (255, 255, 0))

            # Blit text
            screen.blit(text_line1, (10, 10))
            screen.blit(text_line2, (10, 50))

            pygame.display.flip()

            # Capture frame for video
            if save_videos:
                # Get the rendered screen as RGB array
                video_frame = pygame.surfarray.array3d(screen)
                video_frame = np.transpose(video_frame, (1, 0, 2))  # Transpose to (height, width, channels)
                video_frames.append(video_frame)

            clock.tick(fps)

        # Save video if enabled
        if save_videos and video_frames:
            print(f"  Saving video: {video_filename}")
            imageio.mimsave(video_filename, video_frames, fps=fps, codec='libx264', quality=8)
            print(f"  Video saved successfully! ({len(video_frames)} frames)")

        # Pause between episodes (2 seconds)
        if running and ep_idx < len(episodes_to_show) - 1:
            print("  (Pausing 2 seconds before next episode...)\n")
            time.sleep(2)

    # Clean up
    pygame.quit()
    env.close()

    print(f"\n{'='*80}")
    print("VISUALIZATION COMPLETE")
    if save_videos:
        print(f"Videos saved to: {os.path.abspath(output_dir)}/")
    print(f"{'='*80}")

## 8. Pygame Visualization

Visual demonstration of agent solving puzzles.

Features:
- Shows 1 failure and 1 success episode visually
- Saves videos to puzzle_videos/ folder
- Requires pygame and imageio libraries


In [66]:
# Run Pygame visualization with ep3000 checkpoint

print("Loading checkpoint and starting Pygame visualization...")
print("This will show 2 episodes: 1 failure and 1 success (reward > 10)")
print("Videos will be saved to puzzle_videos/ folder")
print("Close the Pygame window or press ESC to exit\n")

visualize_agent_pygame(
    checkpoint_path=CHECKPOINT_PATH,
    env_name='Sokoban-small-v0',
    max_steps=200,
    fps=5,  # 5 frames per second for easy viewing
    save_videos=True,  # Set to False to disable video saving
    output_dir='puzzle_videos',  # Folder where videos will be saved
    min_success_reward=9.0  # Only show success episodes with reward > 10
)

Loading checkpoint and starting Pygame visualization...
This will show 2 episodes: 1 failure and 1 success (reward > 10)
Videos will be saved to puzzle_videos/ folder
Close the Pygame window or press ESC to exit

Videos will be saved to: puzzle_videos/


  logger.warn(


PYGAME VISUALIZATION STARTED
Looking for 1 failure and 1 success (reward > 9.0) to display...
Close window or press ESC to exit

Found failure episode (attempt #1): Reward=-20.00, Steps=200


KeyboardInterrupt: 