# Reinforcement Learning 2025 - Final Assignment

**Authors:** Amit Ezer, Gal Yaacov Noy.

In [25]:
!pip install minigrid gymnasium matplotlib



In [26]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
import gymnasium as gym
import minigrid
from minigrid.wrappers import ImgObsWrapper, RGBImgPartialObsWrapper
from collections import deque
from abc import ABC, abstractmethod
from datetime import datetime

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

MODELS_DIR = "./models"
os.makedirs(MODELS_DIR, exist_ok=True)
print(f"Models will be saved to: {MODELS_DIR}")

Using device: cuda
Models will be saved to: ./models


In [27]:
class MiniGridCNN(nn.Module):
    def __init__(self, output_dim=128, input_channels=3, input_size=84):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten()
        )

        # Compute the output size after the conv layers by passing a dummy input.
        with torch.no_grad():
            dummy_input = torch.zeros(1, input_channels, input_size, input_size)
            conv_out_size = self.conv(dummy_input).shape[1]

        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, output_dim),
            nn.ReLU()
        )

    def forward(self, x):
        return self.fc(self.conv(x))

In [28]:
class BaseAgent(ABC):
    def __init__(self, env_name, num_episodes=1000, max_steps=500):
        self.env_name = env_name
        self.env = self._create_env(env_name)
        self.num_actions = self.env.action_space.n
        self.obs_shape = self.env.reset()[0].shape
        self.num_episodes = num_episodes
        self.max_steps = max_steps

        self.episode_rewards = []
        self.episode_lengths = []
        self.losses = []

    def _preprocess(self, obs):
        """
        Convert RGB observation to tensor and normalize to [0,1] range.
        Input: RGB image array (H, W, 3)
        Output: tensor (C, H, W) normalized to [0,1]
        """
        return torch.tensor(obs, dtype=torch.float32).permute(2, 0, 1) / 255.0

    def _create_env(self, env_name):
        """Create and return a MiniGrid environment with RGB observations."""
        env = gym.make(env_name)
        env = RGBImgPartialObsWrapper(env)
        env = ImgObsWrapper(env)
        return env

    @abstractmethod
    def _get_algorithm_name(self):
        """Return string identifier of the algorithm."""
        pass

    @abstractmethod
    def _save_model(self, save_dir: str):
        """Save model to the specified directory."""
        pass

    def save_model(self, models_dir=MODELS_DIR):
        """Save trained model specified directory."""
        env_clean = self.env_name.replace("MiniGrid-", "").replace("-v0", "")
        algorithm_name = self._get_algorithm_name()
        
        save_dir = os.path.join(models_dir, f"{algorithm_name}_{env_clean}")
        os.makedirs(save_dir, exist_ok=True)
        
        self._save_model(save_dir)
        return save_dir
    
    def _get_additional_logs_str(self):
        """Return additional logs as a formatted string for printing."""
        return ""
    
    def get_additional_logs(self):
        """Return any additional logs to be printed during training."""
        return {}

    
    @abstractmethod
    def _train_episode(self, episode: int):
        """Runs and trains a single episode. Should return (reward, steps, loss)."""
        pass
    
    def train(self):
        print(f"\n{'='*60}")
        print(f"TRAINING {self._get_algorithm_name().upper()} ON {self.env.spec.id.upper()}")
        print(f"{'='*60}")
        print(f"Episodes: {self.num_episodes} | Max Steps: {self.max_steps}")
        print(f"Observation Shape: {self.obs_shape} | Action Space: {self.num_actions}")
        print(f"{'='*60}")
        
        for episode in range(self.num_episodes):
            reward, steps, loss = self._train_episode(episode)
            self.episode_rewards.append(reward)
            self.episode_lengths.append(steps)
            self.losses.append(loss)

            if episode % 100 == 0:
                recent_rewards = np.mean(self.episode_rewards[-50:]) if len(self.episode_rewards) >= 50 else np.mean(self.episode_rewards)
                recent_steps = np.mean(self.episode_lengths[-50:]) if len(self.episode_lengths) >= 50 else np.mean(self.episode_lengths)
                recent_loss = np.mean(self.losses[-100:]) if len(self.losses) >= 100 else (np.mean(self.losses) if self.losses else 0)
                print(f"[{self._get_algorithm_name()}] Ep {episode:4d} | Reward: {reward:6.2f} | Avg Reward: {recent_rewards:6.2f} | Steps: {steps:3d} | Avg Steps: {recent_steps:5.1f} | Loss: {recent_loss:.4f}{self._get_additional_logs_str()}")

        print(f"\n{'='*60}")
        print(f"TRAINING COMPLETED!")
        final_rewards = np.mean(self.episode_rewards[-100:]) if len(self.episode_rewards) >= 100 else np.mean(self.episode_rewards)
        final_steps = np.mean(self.episode_lengths[-100:]) if len(self.episode_lengths) >= 100 else np.mean(self.episode_lengths)
        print(f"Final Average Reward (last 100 episodes): {final_rewards:.3f}")
        print(f"Final Average Steps (last 100 episodes): {final_steps:.1f}")
        print(f"Total Training Episodes: {len(self.episode_rewards)}")
        print(f"{'='*60}")
        self.env.close()
        
        return {
            "name": self._get_algorithm_name(),
            "rewards": self.episode_rewards,
            "steps": self.episode_lengths,
            "losses": self.losses,
            **self.get_additional_logs()
        }

In [29]:
def plot_training_results(agent, window=50):
    """Plot training results with moving averages."""
    def moving_average(data, window):
        if len(data) < window:
            return data
        return np.convolve(data, np.ones(window) / window, mode="valid")

    rewards = agent.episode_rewards
    steps = agent.episode_lengths
    losses = agent.losses
    
    # Get additional logs through the proper inheritance method
    additional_logs = agent.get_additional_logs()
    epsilons = additional_logs.get("epsilons", [])

    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle(f"{agent.__class__.__name__} Training Results", fontsize=16, fontweight='bold')

    # Episode Rewards
    axes[0, 0].plot(rewards, alpha=0.3, label="Raw", color='blue')
    if len(rewards) >= window:
        smooth_rewards = moving_average(rewards, window)
        axes[0, 0].plot(range(window-1, len(rewards)), smooth_rewards,
                        label=f"Moving Avg ({window})", linewidth=2, color='red')
    axes[0, 0].set_title("Episode Rewards")
    axes[0, 0].set_xlabel("Episode")
    axes[0, 0].set_ylabel("Reward")
    axes[0, 0].legend()
    axes[0, 0].grid(alpha=0.3)

    # Episode Steps
    axes[0, 1].plot(steps, alpha=0.3, label="Raw", color='green')
    if len(steps) >= window:
        smooth_steps = moving_average(steps, window)
        axes[0, 1].plot(range(window-1, len(steps)), smooth_steps,
                        label=f"Moving Avg ({window})", linewidth=2, color='orange')
    axes[0, 1].set_title("Episode Length")
    axes[0, 1].set_xlabel("Episode")
    axes[0, 1].set_ylabel("Steps")
    axes[0, 1].legend()
    axes[0, 1].grid(alpha=0.3)

    # Training Loss
    if losses:
        axes[1, 0].plot(losses, alpha=0.6, color='purple')
        if len(losses) >= window:
            smooth_loss = moving_average(losses, window)
            axes[1, 0].plot(range(window-1, len(losses)), smooth_loss,
                            label=f"Moving Avg ({window})", linewidth=2, color='red')
            axes[1, 0].legend()
    axes[1, 0].set_title("Training Loss")
    axes[1, 0].set_xlabel("Episode")
    axes[1, 0].set_ylabel("Loss")
    axes[1, 0].grid(alpha=0.3)

    # Epsilon Decay (for DQN agents)
    if epsilons:
        axes[1, 1].plot(epsilons, color='brown', linewidth=2)
        axes[1, 1].set_title("Exploration Rate (Epsilon)")
        axes[1, 1].set_xlabel("Episode")
        axes[1, 1].set_ylabel("Epsilon")
        axes[1, 1].grid(alpha=0.3)
    else:
        axes[1, 1].axis("off")
        axes[1, 1].text(0.5, 0.5, "No Epsilon Data\n(Non-DQN Agent)", 
                       ha='center', va='center', transform=axes[1, 1].transAxes,
                       fontsize=12, style='italic')

    plt.tight_layout()
    plt.show()


def print_performance_summary(agent):
    """Print performance summary for a trained agent."""
    rewards = agent.episode_rewards
    steps = agent.episode_lengths
    losses = agent.losses

    print(f"\n{'='*60}")
    print(f"PERFORMANCE SUMMARY - {agent.__class__.__name__.upper()}")
    print(f"{'='*60}")

    final_rewards = np.mean(rewards[-100:]) if len(rewards) >= 100 else np.mean(rewards)
    best_rewards = max([np.mean(rewards[i:i+100]) for i in range(len(rewards)-99)]) if len(rewards) >= 100 else max(rewards)
    final_steps = np.mean(steps[-100:]) if len(steps) >= 100 else np.mean(steps)
    final_loss = np.mean(losses[-100:]) if len(losses) >= 100 else (np.mean(losses) if losses else 0)

    print(f"Final Performance (last 100 episodes):")
    print(f"  Average Reward: {final_rewards:.3f}")
    print(f"  Average Steps: {final_steps:.1f}")
    print(f"  Training Loss: {final_loss:.4f}")
    print(f"\nBest Performance:")
    print(f"  Best 100-episode Average Reward: {best_rewards:.3f}")
    print(f"  Maximum Single Episode Reward: {max(rewards):.3f}")
    print(f"  Total Episodes: {len(rewards)}")
    print(f"  Total Training Steps: {sum(steps):,}")
        
    print(f"{'='*60}")


def run_agent(agent, save_model=True):
    """Train the agent, analyze results, and optionally save the model."""
    agent.train()
    print_performance_summary(agent)
    plot_training_results(agent)
    
    if save_model:
        save_path = agent.save_model()
        print(f"\nModel saved successfully!")
    
    return agent

## DoubleDQN

In [30]:
class QNetwork(nn.Module):
    def __init__(self, num_actions, feature_dim=128, input_size=84):
        super().__init__()
        self.encoder = MiniGridCNN(output_dim=feature_dim, input_size=input_size)
        self.q_head = nn.Linear(feature_dim, num_actions)

    def forward(self, x):
        features = self.encoder(x)
        return self.q_head(features)

In [31]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, transition):
        self.buffer.append(transition)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.stack(states).to(device),
            torch.stack(actions).to(device),
            torch.stack(rewards).to(device),
            torch.stack(next_states).to(device),
            torch.stack(dones).to(device),
        )

    def __len__(self):
        return len(self.buffer)

In [37]:
class DoubleDQNAgent(BaseAgent):
    def __init__(
        self,
        env_name,
        num_episodes=1000,
        max_steps=500,
        gamma=0.99,
        lr=1e-3,
        batch_size=256,
        target_update_freq=100,
        replay_capacity=10_000,
        epsilon_start=1.0,
        epsilon_end=0.01,
        epsilon_decay=0.995,
    ):
        super().__init__(env_name, num_episodes, max_steps)
        self.gamma = gamma
        self.lr = lr
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq
        self.replay_buffer = ReplayBuffer(capacity=replay_capacity)

        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.epsilons = []

        self.policy_net = QNetwork(self.num_actions, input_size=self.obs_shape[0]).to(device)
        self.target_net = QNetwork(self.num_actions, input_size=self.obs_shape[0]).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.lr)
        self.global_step = 0

    def _get_algorithm_name(self):
        return "DoubleDQN"

    def _save_model(self, save_dir):
        torch.save(self.policy_net.state_dict(), os.path.join(save_dir, "policy_net.pt"))
        torch.save(self.target_net.state_dict(), os.path.join(save_dir, "target_net.pt"))

    def get_additional_logs(self):
        return {"epsilons": self.epsilons}
    
    def _get_additional_logs_str(self):
        """Return additional logs as a formatted string for printing."""
        return f" | Epsilon: {self.epsilon:.4f}"
    
    def _select_action(self, state):
        if random.random() < self.epsilon:
            return torch.tensor([random.randint(0, self.num_actions - 1)], device=device)
        with torch.no_grad():
            q_values = self.policy_net(state.unsqueeze(0))
            return q_values.argmax(dim=1)

    def _train_episode(self, episode):
        obs, _ = self.env.reset()
        state = self._preprocess(obs).to(device)
        total_reward, steps = 0, 0
        episode_loss = 0.0
        loss_count = 0

        for _ in range(self.max_steps):
            action = self._select_action(state)
            next_obs, reward, terminated, truncated, _ = self.env.step(action.item())
            next_state = self._preprocess(next_obs).to(device)
            done = terminated or truncated

            self.replay_buffer.push((
                state,
                torch.tensor([action.item()], dtype=torch.long, device=device),
                torch.tensor([reward], dtype=torch.float32, device=device),
                next_state,
                torch.tensor([done], dtype=torch.float32, device=device),
            ))

            state = next_state
            total_reward += reward
            steps += 1
            self.global_step += 1

            if len(self.replay_buffer) >= self.batch_size:
                states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

                q_values = self.policy_net(states).gather(1, actions.view(-1, 1)).squeeze()
                with torch.no_grad():
                    next_actions = self.policy_net(next_states).argmax(dim=1, keepdim=True)
                    next_q_values = self.target_net(next_states).gather(1, next_actions).squeeze()
                    targets = rewards.squeeze() + self.gamma * next_q_values * (1 - dones.squeeze())

                loss = F.mse_loss(q_values, targets)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                episode_loss += loss.item()
                loss_count += 1

            if self.global_step % self.target_update_freq == 0:
                self.target_net.load_state_dict(self.policy_net.state_dict())

            if done:
                break

        self.epsilons.append(self.epsilon)
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
        avg_loss = episode_loss / loss_count if loss_count > 0 else 0.0
                
        return total_reward, steps, avg_loss

In [38]:
agent = DoubleDQNAgent(env_name="MiniGrid-Dynamic-Obstacles-8x8-v0")
run_agent(agent)


TRAINING DOUBLEDQN ON MINIGRID-DYNAMIC-OBSTACLES-8X8-V0
Episodes: 1000 | Max Steps: 500
Observation Shape: (56, 56, 3) | Action Space: 3
[DoubleDQN] Ep    0 | Reward:  -1.00 | Avg Reward:  -1.00 | Steps:  22 | Avg Steps:  22.0 | Loss: 0.0000 | Epsilon: 0.9950
[DoubleDQN] Ep  100 | Reward:  -1.00 | Avg Reward:  -1.00 | Steps:   4 | Avg Steps:  13.1 | Loss: 0.0050 | Epsilon: 0.6027
[DoubleDQN] Ep  200 | Reward:  -1.00 | Avg Reward:  -1.00 | Steps:  87 | Avg Steps:  26.0 | Loss: 0.0001 | Epsilon: 0.3651
[DoubleDQN] Ep  300 | Reward:  -1.00 | Avg Reward:  -1.00 | Steps:  52 | Avg Steps:  42.1 | Loss: 0.0000 | Epsilon: 0.2212
[DoubleDQN] Ep  400 | Reward:  -1.00 | Avg Reward:  -1.00 | Steps:  85 | Avg Steps:  41.5 | Loss: 0.0000 | Epsilon: 0.1340
[DoubleDQN] Ep  500 | Reward:  -1.00 | Avg Reward:  -0.92 | Steps:  67 | Avg Steps: 104.9 | Loss: 0.0000 | Epsilon: 0.0812
[DoubleDQN] Ep  600 | Reward:  -1.00 | Avg Reward:  -0.34 | Steps:  27 | Avg Steps:  78.4 | Loss: 0.0018 | Epsilon: 0.0492


KeyboardInterrupt: 

## REINFORCE

In [34]:
class ReinforcePolicy(nn.Module):
    def __init__(self, num_actions, input_size=84, feature_dim=128):
        super().__init__()
        self.encoder = MiniGridCNN(output_dim=feature_dim, input_size=input_size)
        self.action_head = nn.Linear(feature_dim, num_actions)

    def forward(self, x):
        features = self.encoder(x)
        logits = self.action_head(features)
        return torch.distributions.Categorical(logits=logits)

In [35]:
class REINFORCEAgent(BaseAgent):
    def __init__(
        self,
        env_name,
        num_episodes=1000,
        max_steps=500,
        gamma=0.99,
        lr=1e-3,
        entropy_coeff=0.01,
    ):
        super().__init__(env_name, num_episodes, max_steps)
        self.gamma = gamma
        self.entropy_coeff = entropy_coeff

        self.policy = ReinforcePolicy(self.num_actions, input_size=self.obs_shape[0]).to(device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

    def _get_algorithm_name(self):
        return "REINFORCE"

    def _save_model(self, save_dir):
        torch.save(self.policy.state_dict(), os.path.join(save_dir, "policy.pt"))

    def _train_episode(self, episode):
        obs, _ = self.env.reset()
        state = self._preprocess(obs).to(device)

        log_probs, rewards, entropies = [], [], []
        total_reward, steps = 0, 0

        for _ in range(self.max_steps):
            dist = self.policy(state)
            action = dist.sample()
            log_probs.append(dist.log_prob(action))
            entropies.append(dist.entropy())

            next_obs, reward, terminated, truncated, _ = self.env.step(action.item())
            next_state = self._preprocess(next_obs).to(device)

            rewards.append(reward)
            total_reward += reward
            state = next_state
            steps += 1

            if terminated or truncated:
                break

        # Compute returns (discounted sum of rewards)
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)

        if not returns:
            return total_reward, steps, 0.0

        returns = torch.tensor(returns, dtype=torch.float32).to(device)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        log_probs = torch.stack(log_probs)
        entropies = torch.stack(entropies)

        loss = -(log_probs * returns).sum() - self.entropy_coeff * entropies.sum()

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return total_reward, steps, loss.item()

In [36]:
# Test REINFORCE agent
reinforce_agent = REINFORCEAgent(env_name="MiniGrid-Dynamic-Obstacles-6x6-v0", num_episodes=500)
run_agent(reinforce_agent)


TRAINING REINFORCE ON MINIGRID-DYNAMIC-OBSTACLES-6X6-V0
Episodes: 500 | Max Steps: 500
Observation Shape: (56, 56, 3) | Action Space: 3


RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x9 and 576x128)