# Explore different Gymnasium environments

In the following, we explore different Gymnasium environments (classic control, Atari, and MuJoCo) using random actions and a pre-trained policy. 

**Make sure you load ``rl_vis`` environment**

## 1. Imports and Environment Setup

**Explanation:**

- We import Gymnasium (successor to OpenAI Gym), matplotlib for plotting and animations, and stable_baselines3 for loading a pretrained policy.
- We define lists of environment IDs for classic control tasks, Atari games, and MuJoCo tasks.
- The all_env_ids list will let us loop through environments easily.

In [33]:
# Imports and Definitions

# Set rendering backend for Mujoco 
import os
os.environ['MUJOCO_GL'] = 'egl'
os.environ['PYOPENGL_PLATFORM'] = 'egl'


import gymnasium as gym
import numpy as np
import imageio
import os
import base64
from IPython.display import Image, display, clear_output
from io import BytesIO
from typing import Optional, Dict, Any, List
from io import BytesIO





# Define environment categories
control_env_ids = [
    "CartPole-v1",        # Balance a pole on a cart
    "MountainCar-v0",     # Drive up a mountain
    "Acrobot-v1",         # Swing up a double pendulum
    "Pendulum-v1"         # Swing up a pendulum
]

box2d_env_ids = [
    "LunarLander-v2",     # Land a spacecraft
    "BipedalWalker-v3"    # Make a 2D robot walk
]

mujoco_env_ids = [
    "Hopper-v3",          # Make a 2D one-legged robot hop
    "Humanoid-v3"         # Make a 3D humanoid walk
]


atari_env_ids = [
    "PongNoFrameskip-v4",        # Atari Pong
    "BreakoutNoFrameskip-v4"     # Atari Breakout
]






### `print_env_info` Function:

```python
def print_env_info(env_id: str) -> Dict[str, Any]:
```

This function prints and returns key environment information:

1. **Observation Space**:
   - Type (usually Box or Discrete)
   - Shape (dimensions of state space)
   - Bounds (min/max values possible)

2. **Action Space**:
   - Type (Box for continuous, Discrete for discrete actions)
   - For discrete: number of possible actions
   - For continuous: action dimensions and bounds

3. **Time Horizon**:
   - Maximum episode length
   - Some environments have infinite horizons

4. **Reward Range**:
   - Minimum and maximum possible rewards
   - Important for reward scaling in RL algorithms

The function uses `hasattr()` to safely check for properties as different environment types may have different attributes.

Would you like me to elaborate on any specific aspect of these environments or the code?

In [3]:
def print_env_info(env_id: str) -> Dict[str, Any]:
    """
    Print detailed information about a Gymnasium environment
    """
    env = gym.make(env_id)
    info = {}
    
    print(f"\n{'='*20} Environment Info: {env_id} {'='*20}")
    
    # State Space
    print("\nObservation Space:")
    print(f"Type: {type(env.observation_space)}")
    print(f"Shape: {env.observation_space.shape}")
    if hasattr(env.observation_space, 'low'):
        print(f"Bounds: [{env.observation_space.low.min()}, {env.observation_space.high.max()}]")
    info['observation_space'] = env.observation_space
    
    # Action Space
    print("\nAction Space:")
    print(f"Type: {type(env.action_space)}")
    if hasattr(env.action_space, 'n'):
        print(f"Number of actions: {env.action_space.n}")
    elif hasattr(env.action_space, 'shape'):
        print(f"Action shape: {env.action_space.shape}")
        print(f"Bounds: [{env.action_space.low.min()}, {env.action_space.high.max()}]")
    info['action_space'] = env.action_space
    
    # Time limit
    if hasattr(env, '_max_episode_steps'):
        print(f"\nTime Horizon: {env._max_episode_steps} steps")
        info['horizon'] = env._max_episode_steps
    else:
        print("\nTime Horizon: Infinite or Unknown")
        info['horizon'] = None
    
    # Reward range
    if hasattr(env, 'reward_range'):
        print(f"\nReward Range: {env.reward_range}")
        info['reward_range'] = env.reward_range
    
    env.close()
    return info


### `collect_trajectory_random_policy` Function:

```python
def collect_trajectory_random_policy(env_id: str, max_steps: Optional[int] = None) -> tuple:
```

This function:
1. Creates environment with RGB array rendering mode
2. Initializes lists for frames and rewards
3. Sets episode length limit (uses environment's limit or default 1000)
4. Runs episodes using random actions:
   - Samples random action from action space
   - Takes step in environment
   - Collects reward and rendered frame
   - Stops if environment terminates or truncates
5. Returns frames and episode statistics (length, total/average reward)


In [4]:
def collect_trajectory(env_id: str, policy=None, max_steps: Optional[int] = None) -> tuple:
   """
   Collect trajectory using either random or trained policy and return frames and stats
   
   Parameters:
   -----------
   env_id : str
       Gymnasium environment ID
   policy : stable_baselines3 model or None
       If None, uses random policy. Otherwise uses the trained policy
   max_steps : int or None
       Maximum steps per episode. If None, uses environment default
   
   Returns:
   --------
   frames : list
       List of rendered frames as RGB arrays
   episode_stats : dict
       Statistics about the episode including length and rewards
   """
   env = gym.make(env_id, render_mode="rgb_array")
   frames = []
   rewards = []
   
   if max_steps is None:
       max_steps = env._max_episode_steps if hasattr(env, '_max_episode_steps') else 1000
   
   obs, _ = env.reset()
   
   for step in range(max_steps):
       # Get action from policy or random sampling
       if policy is None:
           action = env.action_space.sample()
       else:
           action, _ = policy.predict(obs, deterministic=True)
           
       obs, reward, terminated, truncated, info = env.step(action)
       rewards.append(reward)
       
       frame = env.render()
       frames.append(frame)
       
       if terminated or truncated:
           break
   
   env.close()
   
   episode_stats = {
       'length': len(frames),
       'total_reward': sum(rewards),
       'avg_reward': np.mean(rewards)
   }
   
   return frames, episode_stats



### Covert frames as Gif files and write a function to reprot the results

In [5]:
def generate_gif(frames, fps=30):
    """
    Generates an animated GIF from a list of frames with infinite looping.

    Args:
        frames (list): List of frames (as NumPy arrays).
        fps (int): Frames per second for the GIF.

    Returns:
        gif_bytes (bytes): The GIF image in bytes.
    """
    with BytesIO() as buffer:
        # 'loop=0' ensures the GIF loops infinitely
        imageio.mimsave(buffer, frames, format='GIF', fps=fps, loop=0)
        gif_bytes = buffer.getvalue()
    return gif_bytes

def display_frames_as_gif(frames, fps=30):
    """
    Displays an animated GIF in the Jupyter notebook with infinite looping.

    Args:
        gif_bytes (bytes): The GIF image in bytes.
    """
    gif_bytes = generate_gif(frames, fps)
    display(Image(data=gif_bytes))

    

def analyze_environment_category(env_ids: List[str], category_name: str):
    """
    Analyze all environments in a category and display their trajectories
    """
    print(f"\n{'='*30} {category_name} Environments {'='*30}")
    
    for env_id in env_ids:
        try:
            print(f"\nAnalyzing {env_id}...")
            
            # Print environment information
            info = print_env_info(env_id)
            
            # Collect trajectory
            frames, stats = collect_trajectory(env_id)
            
            print("\nEpisode Statistics:")
            print(f"Length: {stats['length']}")
            print(f"Total Reward: {stats['total_reward']:.2f}")
            print(f"Average Reward: {stats['avg_reward']:.2f}")
            
            print("\nTrajectory visualization:")
            display_frames_as_gif(frames)
            
        except Exception as e:
            print(f"Error with environment {env_id}: {e}")

 

## Classical Control Environments in Gymnasium

Classical control environments implement fundamental problems from control theory and reinforcement learning. These environments are:

1. **CartPole-v1**:
   - A pole is attached to a cart moving along a frictionless track
   - Goal: Apply forces to the cart to keep the pole upright
   - State space: [cart position, cart velocity, pole angle, pole angular velocity]
   - Action space: Discrete(2) - push left (0) or right (1)
   - Terminal conditions: Pole angle > 15°, cart position > 2.4, or episode length > 500

2. **MountainCar-v0**:
   - A car must drive up a mountain
   - Goal: Build enough momentum by driving back and forth to reach the top
   - State space: [position, velocity]
   - Action space: Discrete(3) - push left (0), no push (1), push right (2)
   - Terminal conditions: Reaching the goal or episode length > 200

3. **Acrobot-v1**:
   - A two-link pendulum with one actuator
   - Goal: Swing the end of the lower link above a certain height
   - State space: [cos(θ₁), sin(θ₁), cos(θ₂), sin(θ₂), θ₁_dot, θ₂_dot]
   - Action space: Discrete(3) - apply torque to joint
   - Terminal conditions: End reaches target height or episode length > 500

4. **Pendulum-v1**:
   - An inverted pendulum swingup problem
   - Goal: Apply torques to swing the pendulum upright
   - State space: [cos(θ), sin(θ), angular velocity]
   - Action space: Box(1) - continuous torque value
   - Time limit: 200 steps

### The following code prints the basic information of Classical Control Environments

In [None]:

env_ids, category_name = control_env_ids, "Classical_Control"

analyze_environment_category(env_ids, category_name)

## The following code load trained policies from HuggingFace, which are trained by StableBaselines3 

**Models**: https://huggingface.co/sb3 

**Reference**: https://huggingface.co/docs/hub/stable-baselines3

In [None]:
import gymnasium as gym
from huggingface_sb3 import load_from_hub
from stable_baselines3 import PPO, DDPG, DQN, SAC
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.base_class import BaseAlgorithm
import numpy as np
from typing import Optional

class GymToGymnasiumWrapper:
    """
    Wrapper to make SB3 models work with Gymnasium environments
    """
    def __init__(self, model: BaseAlgorithm):
        self.model = model
    
    def predict(self, obs, deterministic=True):
        action, _state = self.model.predict(obs, deterministic=deterministic)
        return action, None

def load_and_wrap_model(config: dict) -> GymToGymnasiumWrapper:
    """
    Load model from HuggingFace and wrap it for Gymnasium compatibility
    """
    checkpoint = load_from_hub(
        repo_id=config["repo_id"],
        filename=config["filename"]
    )
    model = config["algorithm"].load(checkpoint)
    return GymToGymnasiumWrapper(model)

def collect_trajectory(env_id: str, policy=None, max_steps: Optional[int] = None) -> tuple:
    """
    Collect trajectory using either random or trained policy
    """
    env = gym.make(env_id, render_mode="rgb_array")
    frames = []
    rewards = []
    
    if max_steps is None:
        max_steps = env._max_episode_steps if hasattr(env, '_max_episode_steps') else 1000
    
    obs, _ = env.reset()
    
    for step in range(max_steps):
        if policy is None:
            action = env.action_space.sample()
        else:
            action, _ = policy.predict(obs, deterministic=True)
            
        obs, reward, terminated, truncated, info = env.step(action)
        rewards.append(reward)
        
        frame = env.render()
        frames.append(frame)
        
        if terminated or truncated:
            break
    
    env.close()
    
    episode_stats = {
        'length': len(frames),
        'total_reward': sum(rewards),
        'avg_reward': np.mean(rewards)
    }
    
    return frames, episode_stats

def compare_policies(env_ids: list):
    """
    Compare random and trained policies with visualizations
    """
    for env_id in env_ids:
        print(f"\n{'='*20} Evaluating {env_id} {'='*20}")
        
        # Random policy trajectory
        print("\nRandom Policy Trajectory:")
        frames_random, stats_random = collect_trajectory(env_id, policy=None)
        print("Random Policy Stats:")
        print(f"Episode Length: {stats_random['length']}")
        print(f"Total Reward: {stats_random['total_reward']:.2f}")
        print(f"Average Reward: {stats_random['avg_reward']:.2f}")
        print("\nRandom Policy Visualization:")
        display_frames_as_gif(frames_random)
        
        # Trained policy trajectory
        try:
            print("\nTrained Policy Trajectory:")
            config = MODEL_CONFIGS[env_id]
            wrapped_model = load_and_wrap_model(config)
            
            frames_trained, stats_trained = collect_trajectory(env_id, policy=wrapped_model)
            print("Trained Policy Stats:")
            print(f"Episode Length: {stats_trained['length']}")
            print(f"Total Reward: {stats_trained['total_reward']:.2f}")
            print(f"Average Reward: {stats_trained['avg_reward']:.2f}")
            print("\nTrained Policy Visualization:")
            display_frames_as_gif(frames_trained)
            
            improvement = ((stats_trained['total_reward'] - stats_random['total_reward']) 
                         / abs(stats_random['total_reward'])) * 100
            print(f"\nImprovement in Total Reward: {improvement:.1f}%")
            
        except Exception as e:
            print(f"Error evaluating trained model: {e}")
            raise  # Show full error traceback




In [47]:
MODEL_CONFIGS = {
    "CartPole-v1": {
        "repo_id": "sb3/demo-hf-CartPole-v1",
        "filename": "ppo-CartPole-v1.zip",
        "algorithm": PPO
    },
    "MountainCar-v0": {
        "repo_id": "sb3/dqn-MountainCar-v0",
        "filename": "dqn-MountainCar-v0.zip",
        "algorithm": DQN
    },
    "Acrobot-v1": {
        "repo_id": "sb3/dqn-Acrobot-v1",
        "filename": "dqn-Acrobot-v1.zip",
        "algorithm": DQN
    },
    "Pendulum-v1": {
        "repo_id": "sb3/ddpg-Pendulum-v1",
        "filename": "ddpg-Pendulum-v1.zip",
        "algorithm": DDPG
    },
    "LunarLander-v2": {
        "repo_id": "sb3/ppo-LunarLander-v2",
        "filename": "ppo-LunarLander-v2.zip",
        "algorithm": PPO
    },
    "BipedalWalker-v3": {
        "repo_id": "sb3/ddpg-BipedalWalker-v3",
        "filename": "ddpg-BipedalWalker-v3.zip",
        "algorithm": DDPG
    },
    "Walker2d-v4": {
        "repo_id": "jren123/sac-walker2d-v4",
        "filename": "SAC-Walker2d-v4.zip",
        "algorithm": SAC
    },
    "Humanoid-v4": {
        "repo_id": "jren123/sac-humanoid-v4", # https://huggingface.co/jren123/sac-humanoid-v4
        "filename": "SAC-Humanoid-v4.zip",
        "algorithm": SAC
    }
}


In [None]:


# Run evaluation
control_env_ids = [
    "CartPole-v1",
    "MountainCar-v0",
    "Acrobot-v1",
    "Pendulum-v1"
]

compare_policies(control_env_ids)

### Box2D environments

In [None]:
box2d_env_ids = [
    "LunarLander-v2",     # Land a spacecraft
    "BipedalWalker-v3"   # Make a 2D robot walk
]

env_ids, category_name = box2d_env_ids, "box2d"

analyze_environment_category(env_ids, category_name)

### Load Pretrained Policies for Box2D environments

In [None]:

compare_policies(box2d_env_ids)

## Atari Games

Loading pretrained models for Atari games is a bit complicated because of the pre-processing of the states. So we will not do that. 

In [None]:
import ale_py

gym.register_envs(ale_py)

atari_env_ids = [
    "ALE/Pong-v5",        # Atari Pong
    "ALE/Breakout-v5"     # Atari Breakout
]

env_ids, category_name = atari_env_ids, "Atari"
analyze_environment_category(env_ids, category_name)



## Mujoco

In [None]:

mujoco_env_ids = [
    "Walker2d-v4",          # Make a 2D one-legged robot hop
    "Humanoid-v4"         # Make a 3D humanoid walk
]

env_ids, category_name = mujoco_env_ids, "Mujoco"
analyze_environment_category(env_ids, category_name)

In [None]:
compare_policies(mujoco_env_ids)