In [1]:
import numpy as np
import gymnasium as gym
import gymnasium_robotics

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
from stable_baselines3.common.monitor import Monitor

gym.register_envs(gymnasium_robotics)

In [2]:
class PixelObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env, width=84, height=84, camera_id=0):
        super().__init__(env)
        self.width = width
        self.height = height
        self.camera_id = camera_id
        # observation_space now pure image
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(height, width, 3), dtype=np.uint8
        )

    def observation(self, obs):
        frame = self.env.render()
        # some envs render (W,H,3) others (H,W,3)
        return np.array(frame, dtype=np.uint8)

# usage
env = gym.make("FetchReach-v4", render_mode="rgb_array", max_episode_steps=50)
env = PixelObservationWrapper(env, width=84, height=84, camera_id=0)

In [11]:

from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import CheckpointCallback
import time

# --- 1. Configuration ---

ENV_ID = "FetchReach-v4"  # A simpler Fetch task to start with
N_ENVS = 8                # Use multiple environments for PPO
TOTAL_TIMESTEPS = 2_00_000 # Fetch envs need many steps (e.g., 1M+)
MODEL_PATH = "ppo_fetch_reach"
TENSORBOARD_LOG = "./ppo_fetch_tensorboard/"

# --- 2. Create the Vectorized Environment ---

# make_vec_env handles creating multiple parallel envs
print("Creating environments...")
env = make_vec_env(ENV_ID, n_envs=N_ENVS, seed=0, env_kwargs=dict(reward_type="dense", max_episode_steps=50),)

# IMPORTANT:
# Fetch environments use a `Dict` observation space:
# { 'observation': array, 'achieved_goal': array, 'desired_goal': array }
# Because of this, we MUST use the "MultiInputPolicy".
print(f"Observation space: {env.observation_space}")

# --- 3. Define the Model ---

# We use "MultiInputPolicy" to handle the Dict observation space
print("Initializing PPO model...")
model = PPO(
    "MultiInputPolicy",
    env,
    n_steps=1024,          # Num steps to collect before updating policy
    batch_size=64,         # Minibatch size
    n_epochs=10,           # Num of epochs when updating policy
    gamma=0.99,            # Discount factor
    gae_lambda=0.95,       # Factor for GAE
    learning_rate=3e-4,    # Learning rate
    verbose=1,
    tensorboard_log=TENSORBOARD_LOG
)

# --- 4. Setup Callbacks (Optional but recommended) ---

# Save a checkpoint every 100,000 steps
checkpoint_callback = CheckpointCallback(
    save_freq=max(100_000 // N_ENVS, 1),
    save_path=f"./logs/{MODEL_PATH}/",
    name_prefix="checkpoint",
)

# --- 5. Train the Model ---

print(f"Starting training for {TOTAL_TIMESTEPS} steps...")
start_time = time.time()
model.learn(
    total_timesteps=TOTAL_TIMESTEPS,
    callback=checkpoint_callback,
    progress_bar=True
)
end_time = time.time()
print(f"Training finished in {end_time - start_time:.2f} seconds.")

# --- 6. Save the Final Model ---

model.save(MODEL_PATH)
print(f"Final model saved to {MODEL_PATH}.zip")

# Note: If you use VecNormalize, you must save its stats too
# env.save("vec_normalize_fetch.pkl")

env.close()

# --- 7. Evaluate the Trained Agent ---


RecursionError: maximum recursion depth exceeded

In [17]:
import time  # Add this import

print("\n--- Starting Evaluation ---")

# Load the trained model
model = PPO.load(MODEL_PATH)

# Create a single env for evaluation with "human" render mode
eval_env = gym.make(ENV_ID, render_mode="human")

# Optional: Load VecNormalize stats if used
# eval_env = VecNormalize.load("vec_normalize_fetch.pkl", eval_env)
# eval_env.training = False
# eval_env.norm_reward = False

N_EVAL_EPISODES = 10
for ep in range(N_EVAL_EPISODES):
    obs, info = eval_env.reset()
    done = False
    print(f"\nEvaluation Episode {ep + 1}")
    
    while not done:
        # Use deterministic=True for consistent evaluation
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = eval_env.step(action)

        # Pause a bit to slow down the loop
        done = terminated or truncated

        if done:
            print("Episode finished.")
            if info.get("is_success"):
                print(">>> Goal was reached! <<<")
            else:
                print(">>> Goal was NOT reached. <<<")
            
eval_env.close()
print("Evaluation complete.")



--- Starting Evaluation ---

Evaluation Episode 1
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 2
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 3
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 4
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 5
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 6
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 7
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 8
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 9
Episode finished.
>>> Goal was reached! <<<

Evaluation Episode 10
Episode finished.
>>> Goal was reached! <<<
Evaluation complete.


In [16]:
env.action_space

Box(-1.0, 1.0, (4,), float32)