# Curriculum Learning - Self-Contained

Automatic curriculum learning for MiniGrid environments.

In [1]:
import os, warnings
os.environ["PYTHONWARNINGS"] = "ignore"
warnings.filterwarnings("ignore")

from typing import List, Dict
import numpy as np
import time
import torch

from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env import SubprocVecEnv

from src.environment import make_fixed_mixed_vec_env
from src.evaluation import evaluate
from src.cnn import get_policy_kwargs
from src.filemanager import FileManager

# Result directory

In [2]:
fm: FileManager = FileManager("02_curriculum_learning_catastrophic_learning_test", output_dir="results")

Experiment directory initialized: results/02_curriculum_learning_catastrophic_learning_test_20251031_222928


## Curriculum Teacher


In [3]:
class CurriculumTeacher:
    """
    Simple sequential curriculum:
    - Train stage i until recent success >= target
    - Then move to next stage
    """

    def __init__(self, stages: List[str], target: float = 0.90, window: int = 5):
        self.stages = stages
        self.target = target
        self.window = window
        self.idx = 0
        self.performance: Dict[str, List[float]] = {s: [] for s in stages}
        self._done = False

    def get_env_composition(self) -> Dict[str, int]:
        # Check if we should advance
        if self._should_advance():
            self._advance()

        # return the env_name as key with the number 8 as value
        return {self.stages[self.idx]: 8}

    def record(self, success_rate: float) -> None:
        s = self.stages[self.idx]
        h = self.performance[s]
        h.append(success_rate)
        if len(h) > 2000: self.performance[s] = h[-1000:]

    # stats
    def _mean_recent(self, s: str) -> float:
        h = self.performance[s]
        if not h: return 0.0
        w = min(self.window, len(h))
        return float(np.mean(h[-w:]))

    # progression rule
    def _should_advance(self) -> bool:
        s = self.stages[self.idx]
        return self._mean_recent(s) >= self.target and len(self.performance[s]) >= self.window

    def _advance(self) -> None:
        if self.idx < len(self.stages) - 1:
            self.idx += 1
        else:
            self._done = True

    # queries
    def current_stage(self) -> str:
        return self.stages[self.idx]

    # stopping
    def has_converged_hardest(self, thresh: float = 0.95, evals: int = 3) -> bool:
        s = self.stages[-1]
        h = self.performance[s]
        if len(h) < evals: return False
        return float(np.mean(h[-evals:])) >= thresh

    def catastrophic_learning(self, under_target: float = 0.1, evals: int = 3) -> bool:
        s = self.stages[-1]
        h = self.performance[s]
        if len(h) < evals: return False
        seg = h[-evals:]
        return float(np.mean(seg)) < (self.target - under_target)


## Step Callback


In [4]:
class StepCallback(BaseCallback):
    """Periodic evaluation and entropy management."""
    
    def __init__(
        self,
        teacher: CurriculumTeacher,
        eval_freq: int = 5_000,
        n_eval: int = 30,
        visualize: bool = True,
    ) -> None:
        super().__init__()
        self.teacher: CurriculumTeacher = teacher
        self.eval_freq: int = eval_freq
        self.n_eval: int = n_eval
        self.visualize: bool = visualize
        self.stage_steps: int = 0
        self.total_steps: int = 0
        self.training_start_time: float = time.time()
        self.stage_start_time: float = time.time()
        self.total_training_time: float = 0.0
    
    def _on_step(self) -> bool:
        self.stage_steps += 1
        self.total_steps += 1
        
        if self.stage_steps % self.eval_freq == 0:
            assert isinstance(self.model, PPO)
            
            # Evaluate
            episode_batch = evaluate(self.model, self.teacher.current_stage(), self.n_eval)
            self.teacher.record(episode_batch.success_rate)
            
            # Calculate elapsed times
            stage_elapsed: float = time.time() - self.stage_start_time
            total_elapsed: float = self.get_total_time()
            
            # Print evaluation results
            print(
                f"  Eval @ Stage {self.stage_steps:,} | Total: {self.total_steps:,} | "
                f"Success: {episode_batch.success_rate:.1%} | "
                f"Reward: {episode_batch.mean_reward:.2f} | "
                f"PolicyEnt: {episode_batch.mean_entropy:.3f} | "
                f"StageTime: {int(stage_elapsed//60):02d}:{int(stage_elapsed%60):02d} | "
                f"TotalTime: {int(total_elapsed//60):02d}:{int(total_elapsed%60):02d}"
            )
            
            # Write evaluation
            fm.dump_eval_to_csv(
                total_step=self.total_steps,
                stage=self.teacher.current_stage(),
                stage_step=self.stage_steps,
                batch=episode_batch,
                model=self.model,
                allocation={}
            )
            
            # Visualize
            if self.visualize:
                from src.episode_visualization import visualize_eval_episode
                visualize_eval_episode(
                    model=self.model,
                    episode=episode_batch.episodes[0],
                    timestep=self.total_steps,
                    output_dir=fm.get_visualization_dir()
                )
        
        return True

    
    def reset_for_stage(self) -> None:
        """Reset stage counter and timer for new environment."""
        # Accumulate time from completed stage
        stage_elapsed: float = time.time() - self.stage_start_time
        self.total_training_time += stage_elapsed
        
        # Reset for new stage
        self.stage_steps = 0
        self.stage_start_time = time.time()
    
    def get_stage_elapsed(self) -> float:
        """Get elapsed time for current stage in seconds."""
        return time.time() - self.stage_start_time
    
    def get_total_time(self) -> float:
        """Get total training time across all stages in seconds."""
        return self.total_training_time + self.get_stage_elapsed()

## Training


In [5]:
N_ENVS: int = 8
N_STEPS: int = 128
STEPS_PER_ROLLOUT = N_STEPS * N_ENVS

# Evaluate every 5 rollout
EVAL_FREQ: int = 3 * STEPS_PER_ROLLOUT
N_EVALS: int = 100

device = ""
if torch.cuda.is_available(): # type: ignore
    device = "cuda"
else:
    device = "cpu"
print(f"Using device: {device}")

# Curriculum
THRESHOLD: float = 0.90
WINDOW: int = 3

# https://minigrid.farama.org/environments/minigrid/

STAGES: List[str] = [
    "MiniGrid-DoorKey-5x5-v0",
    "MiniGrid-DoorKey-6x6-v0",
    "MiniGrid-DoorKey-8x8-v0",
    "MiniGrid-DoorKey-16x16-v0",
]


TOTAL_STEPS: int = 100_000

# PPO
def make_model(env: SubprocVecEnv) -> PPO:
    return PPO(
        "CnnPolicy",
        env,
        policy_kwargs=get_policy_kwargs(),
        learning_rate= 3e-4,
        n_steps=N_STEPS,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.02,
        verbose=0,
        device=device
    )

Using device: cuda


In [None]:
def train_curriculum() -> None:
    """Train with sequential curriculum."""
    
    teacher = CurriculumTeacher(
        STAGES,
        target=THRESHOLD,
        window=WINDOW,  
    )

    callback = StepCallback(
        teacher,
        eval_freq=EVAL_FREQ,
        n_eval=N_EVALS,
        visualize=True,
    )

    # Initial allocation + initial env/model
    allocation = teacher.get_env_composition()
    env = make_fixed_mixed_vec_env(allocation)
    model = make_model(env)

    print(f"\n=== Initial Allocation: {allocation} ===")

    while (
        callback.total_steps < TOTAL_STEPS 
        and not teacher.has_converged_hardest(thresh=0.9, evals=3)
        and not teacher.catastrophic_learning(under_target=0.1, evals=3)
    ):

        # Train for eval_freq steps (callback does evaluations)
        model.learn( # type: ignore
            total_timesteps=EVAL_FREQ,
            callback=callback,
            reset_num_timesteps=False
        )

        # Recompute curriculum allocation
        new_alloc = teacher.get_env_composition()

        # If allocation changed -> recompose VecEnv
        if new_alloc != allocation:
            print(f"\n>>> Curriculum shift detected")
            print(f"    Old: {allocation}")
            print(f"    New: {new_alloc}\n")

            env.close()
            env = make_fixed_mixed_vec_env(new_alloc)
            model.set_env(env) # type: ignore
            allocation = new_alloc

            fm.save_checkpoint(
                model=model,
                stage=f"auto_stage",
                total_step=callback.total_steps
            )

    env.close()

    total_time = time.time() - callback.training_start_time
    print(f"\n=== TRAINING COMPLETE ===")
    print(f"Time: {int(total_time//60):02d}:{int(total_time%60):02d}")
    print(f"Steps: {callback.total_steps:,}")

In [7]:
train_curriculum()


=== Initial Allocation: {'MiniGrid-DoorKey-5x5-v0': 8} ===
  Eval @ Stage 3,072 | Total: 3,072 | Success: 17.0% | Reward: 0.08 | PolicyEnt: 1.690 | StageTime: 01:41 | TotalTime: 01:41
    → Evaluation saved to: results/02_curriculum_learning_catastrophic_learning_test_20251031_222928/evaluations.csv
    → Saved visualization: results/02_curriculum_learning_catastrophic_learning_test_20251031_222928/visualizations/eval_3072_MiniGrid_DoorKey_5x5_v0.png
  Eval @ Stage 6,144 | Total: 6,144 | Success: 100.0% | Reward: 0.95 | PolicyEnt: 0.673 | StageTime: 02:21 | TotalTime: 02:21
    → Evaluation saved to: results/02_curriculum_learning_catastrophic_learning_test_20251031_222928/evaluations.csv
    → Saved visualization: results/02_curriculum_learning_catastrophic_learning_test_20251031_222928/visualizations/eval_6144_MiniGrid_DoorKey_5x5_v0.png
  Eval @ Stage 9,216 | Total: 9,216 | Success: 100.0% | Reward: 0.96 | PolicyEnt: 0.147 | StageTime: 03:05 | TotalTime: 03:05
    → Evaluation save

# Run

In [None]:
# from src.environment import run_episode
# model_path = "results/01_curriculum_learning_catastrophic_learning_test_20251031_222928/checkpoints/auto_stage_step_33792.zip"
# model = PPO.load(model_path) # type: ignore
# episode_data = run_episode(
#     model=model, 
#     env_name="MiniGrid-DoorKey-16x16-v0", 
#     seed=42, 
#     render_mode="human", 
#     deterministic=True
# )

# print(episode_data)

In [9]:
# from src.episode_visualization import visualize_eval_episode
# visualize_eval_episode(
#     model=model,
#     episode=episode_data,
#     timestep=-1,
#     output_dir="./"
# )