In [18]:
from collections import namedtuple, deque
from tqdm import tqdm

import gymnasium as gym
import numpy as np

import torch
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import DummyVecEnv

# allow PyTorch throw errors as soon as a NaN gradient is detected
torch.autograd.set_detect_anomaly(True)

# if GPU is to be used
""" ref: M1 GPU support
https://developer.apple.com/metal/pytorch/
https://pytorch.org/blog/introducing-accelerated-pytorch-training-on-mac/
"""
_device_name = "cuda" if torch.cuda.is_available() else "cpu"
_device_name = "mps" if torch.backends.mps.is_available() else _device_name
device = torch.device(_device_name)

# 1. MountainCar-Continuous Environment

In [16]:
""" Environment information
ref: https://github.com/openai/gym/blob/master/gym/envs/classic_control/continuous_mountain_car.py#L27

Observation Space
    The observation is a `ndarray` with shape `(2,)` where the elements correspond to the following:
    | Num | Observation                          | Min  | Max | Unit         |
    |-----|--------------------------------------|------|-----|--------------|
    | 0   | position of the car along the x-axis | -Inf | Inf | position (m) |
    | 1   | velocity of the car                  | -Inf | Inf | position (m) |
Action Space
    The action is a `ndarray` with shape `(1,)`, representing the directional force applied on the car.
    The action is clipped in the range `[-1,1]` and multiplied by a power of 0.0015.
Reward
    A negative reward of *-0.1 * action<sup>2</sup>* is received at each timestep to penalise for
    taking actions of large magnitude. If the mountain car reaches the goal then a positive reward of +100
    is added to the negative reward for that timestep.
Starting State
    The position of the car is assigned a uniform random value in `[-0.6 , -0.4]`.
    The starting velocity of the car is always assigned to 0.
Episode End
    The episode ends if either of the following happens:
    1. Termination: The position of the car is greater than or equal to 0.45 (the goal position on top of the right hill)
    2. Truncation: The length of the episode is 999.
"""
env = gym.make('MountainCarContinuous-v0', render_mode="human")
# env.reset()
# env.render()

Sample environment image

<img width=300 src="mountain_car.png" />

In [183]:
from collections import deque
from gym import spaces
import numpy as np

class ConcatObs(gym.Wrapper):
    def __init__(self, env, k, bound_value=1e6):
        super().__init__(env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape
        # Use large finite values for bounds instead of infinity
        self.observation_space = gym.spaces.Box(low=-bound_value, high=bound_value, shape=((k,) + shp), dtype=np.float32)

    def reset(self, seed=123, options={"low": -1, "high": 0.3}):
        ob, info = self.env.reset(options=options)
        for _ in range(self.k):
            self.frames.append(ob)
        return self._get_ob(), info

    def step(self, action):
        ob, reward, done, _, info = self.env.step(action)
        self.frames.append(ob)
        return self._get_ob(), reward, done, False, info

    def _get_ob(self):
        return np.array(self.frames)

env = gym.make('MountainCarContinuous-v0', render_mode="human")
concat_env = ConcatObs(env, 4)
concat_env.reset()

(array([[-0.46703225,  0.        ],
        [-0.46703225,  0.        ],
        [-0.46703225,  0.        ],
        [-0.46703225,  0.        ]], dtype=float32),
 {})

In [184]:
print("observation_space: ", concat_env.observation_space)

obs, _ = concat_env.reset()  # set random initial position [-1, 0.3]
print("sample obs: ", obs)

print("action_space: ", concat_env.action_space)
print("sample action: ", concat_env.action_space.sample())

observation_space:  Box(-1000000.0, 1000000.0, (4, 2), float32)
sample obs:  [[-0.8535572  0.       ]
 [-0.8535572  0.       ]
 [-0.8535572  0.       ]
 [-0.8535572  0.       ]]
action_space:  Box(-1.0, 1.0, (1,), float32)
sample action:  [-0.8062074]


# 2. PPO algorithm

In [188]:
# (option 1) Train model

# Create the original environment
env = gym.make('MountainCarContinuous-v0')  #, render_mode="human")
# Wrap your environment with ConcatObs
concat_env = ConcatObs(env, 4)
# If you prefer to use a vectorized environment
vec_env = DummyVecEnv([lambda: concat_env])

# Initialize the PPO model using the wrapped environment
model = PPO("MlpPolicy", vec_env, verbose=1, gamma=0.99)

total_timesteps = int(1e5)  # You can adjust this value according to your needs
model.learn(total_timesteps=total_timesteps)

Using cpu device
-----------------------------
| time/              |      |
|    fps             | 6520 |
|    iterations      | 1    |
|    time_elapsed    | 0    |
|    total_timesteps | 2048 |
-----------------------------
------------------------------------------
| time/                   |              |
|    fps                  | 1156         |
|    iterations           | 2            |
|    time_elapsed         | 3            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0067263376 |
|    clip_fraction        | 0.0256       |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.37        |
|    explained_variance   | 0.00735      |
|    learning_rate        | 0.0003       |
|    loss                 | -0.00662     |
|    n_updates            | 10           |
|    policy_gradient_loss | -0.0113      |
|    std                  | 0.919        |
|    value_loss           | 0.0633       |

<stable_baselines3.ppo.ppo.PPO at 0x139060430>

In [None]:
# (option 2) Train model with early stop
env = make_vec_env('MountainCarContinuous-v0', n_envs=1)
model = PPO("MlpPolicy", env, verbose=1)

# Train the model with early stopping
total_timesteps = int(1e6)
last_100_rewards = deque(maxlen=100)

# Custom callback for early stopping
def early_stopping_callback(_locals, _globals):
    reward = _locals['rewards'][0]  # Adjust this if using more than one environment
    last_100_rewards.append(reward)
    
    # Check for early stopping condition every 100 steps
    if len(last_100_rewards) == 100 and np.mean(last_100_rewards) > 75.0:
        print("Early stopping: Average reward of last 100 episodes is greater than 75.0")
        return False  # Returning False stops the training
    return True  # Returning True continues the training

model.learn(total_timesteps=total_timesteps, callback=early_stopping_callback)

In [193]:
# Test the trained agent
env = gym.make('MountainCarContinuous-v0', render_mode="human")
env = ConcatObs(env, 4)
obs, _ = env.reset()
for _ in range(1000):
    obs = np.array(obs).astype(np.float32)
    action, _states = model.predict(obs, deterministic=True)
    obs, rewards, dones, _, info = env.step(action)
    env.render()
    
    if dones:
        break

# Close the environment
env.close()

In [192]:
# Save the model
model.save("ppo_mountaincarcontinuous")

# Load the trained model (optional, for demonstration here)
_model = PPO.load("ppo_mountaincarcontinuous")