In [3]:
import os
import numpy as np
import csv
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback
from stable_baselines3.common.base_class import BaseAlgorithm
from gymnasium.utils.save_video import save_video
from sai_rl import SAIClient
import gymnasium as gym

In [5]:
# -------- CONFIG --------
TOTAL_TIMESTEPS = 1_000_000
VIDEO_DIR = "./ppo_videos/"
LOG_DIR = "./ppo_logs/"
os.makedirs(VIDEO_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

CURRICULUM_STAGES = [
    {"difficulty": 0.1, "reward_bonus": 0.0},
    {"difficulty": 0.3, "reward_bonus": 0.2},
    {"difficulty": 0.6, "reward_bonus": 0.4},
    {"difficulty": 1.0, "reward_bonus": 0.6},
]

In [4]:
def evaluate(
    model: BaseAlgorithm,
    num_episodes: int = 10,
    deterministic: bool = True,
) -> float:
    """
    Evaluate an RL agent for `num_episodes` using its VecEnv.

    :param model: the RL Agent
    :param num_episodes: number of episodes to evaluate it
    :param deterministic: Whether to use deterministic or stochastic actions
    :return: Mean reward across episodes
    """
    vec_env = model.get_env()
    obs = vec_env.reset()

    all_episode_rewards = []
    episode_rewards = []
    episode_count = 0

    while episode_count < num_episodes:
        action, _states = model.predict(obs, deterministic=deterministic)
        obs, rewards, dones, infos = vec_env.step(action)

        episode_rewards.append(rewards[0])  # Only one env in DummyVecEnv

        if dones[0]:
            all_episode_rewards.append(sum(episode_rewards))
            episode_rewards = []
            episode_count += 1

    mean_reward = np.mean(all_episode_rewards)
    std_reward = np.std(all_episode_rewards)
    print(f"✅ Evaluation complete: Mean reward = {mean_reward:.2f} ± {std_reward:.2f} over {num_episodes} episodes")

    return mean_reward

In [6]:
class CurriculumWrapper(gym.Wrapper):
    def __init__(self, env, stages, steps_per_stage=250_000):
        super().__init__(env)
        self.stages = stages
        self.steps_per_stage = steps_per_stage
        self.current_stage = 0
        self.total_steps = 0

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self._apply_difficulty()
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        reward += self.stages[self.current_stage]["reward_bonus"]

        self.total_steps += 1
        self._update_stage()  # Check every step for smooth curriculum

        return obs, reward, terminated, truncated, info

    def _update_stage(self):
        success = self.env.get_attr("success_rate")[0] if hasattr(self.env, "get_attr") else None
        if success and success > 0.7 and self.current_stage < len(self.stages) - 1:
            self.current_stage += 1
            self._apply_difficulty()
            print(f"✅ Success {success:.2f}, advancing to stage {self.current_stage}")
        else:
            # Fallback to step-based if success metric is not available
            new_stage = min(self.total_steps // self.steps_per_stage, len(self.stages) - 1)
            if new_stage != self.current_stage:
                self.current_stage = new_stage
                self._apply_difficulty()
                print(
                    f"➡ Curriculum: Stage {self.current_stage} | Difficulty={self.stages[self.current_stage]['difficulty']}"
                )

    def _apply_difficulty(self):
        if hasattr(self.env, "set_difficulty"):
            self.env.set_difficulty(self.stages[self.current_stage]["difficulty"])


def make_env():
    sai = SAIClient(comp_id="franka-ml-hiring")
    env = sai.make_env(render_mode="rgb_array")
    env = Monitor(env)
    return CurriculumWrapper(env, CURRICULUM_STAGES)