vr2 -> vr3


*   Cropped the observation image, removing some backgrounds
*   Changed RGB to Grayscale

vr3 -> vr4


*   Changed the width and height from 64x64 to 36x36
*   Changed the total timesteps from 500000 to 250000

skip vr5 (w/ multiple images)

vr4 -> vr6


*   Change parameters : `ent_coef=0.01`, `use_sde=True`, `learning_rate=0.0001`

# Install Dependencies

## pybullet-gym

In [None]:
# # Clone the pybullet-gym repository
# !git clone https://github.com/benelot/pybullet-gym.git
# %cd pybullet-gym

# # Install the package
# !pip install -e .

# # Return to the previous directory
# %cd ..

In [None]:
# # Verify the installation and ensure the path is updated
# import sys
# if '/content/pybullet-gym' not in sys.path:
#     sys.path.append('/content/pybullet-gym')

## StableBaseline3

In [None]:
!pip install stable-baselines3[extra]
!pip install sb3-contrib



## OpenCV

In [None]:
!pip install opencv-python-headless



## Tensorboard

In [None]:
!pip install tensorboard



# Code Starts From Here

Import Libraries

In [None]:
import gym
from gym import spaces

import cv2
#import pybulletgym
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

from stable_baselines3 import PPO
from sb3_contrib import RecurrentPPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import DummyVecEnv

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from torch.optim import Adam
from torch.distributions import MultivariateNormal

Set Hyperparameters that applies to all policies

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
total_timesteps = 250000
episodes_test = 50

Delete past experiences from TensorBoard to avoid plotting both past and current experiences on the same board


-> Doesn't seem to be needed when **Disconnect and delete the runtime**, then **Reconnect**

In [None]:
# import shutil

# shutil.rmtree('./logs/PPOwO/')  # PPO trained with One image
# shutil.rmtree('./logs/RPPOwO/')  # RecurrentPPO trained with One image
# shutil.rmtree('./logs/PPOwM/')  # PPO trained with Multiple images
# shutil.rmtree('./logs/RPPOwM/')  # RecurrentPPO trained with Multiple images

First, we calculate the Mean reward of the Random Policy to set the baseline.

In [None]:
env = gym.make("Pendulum-v1")

# Random policy
total_rewards = []

for _ in range(episodes_test):
    obs = env.reset()
    done = False
    episode_reward = 0
    while not done:
        # Take a random action, ensuring it is in the correct format
        action = env.action_space.sample()
        obs, reward, done, info = env.step(action)  # Handle the additional truncated value
        episode_reward += reward
    total_rewards.append(episode_reward)

mean_reward_random = np.mean(total_rewards)
std_reward_random = np.std(total_rewards)

print(f"Mean reward, random policy: {mean_reward_random} +/- {std_reward_random}")

  deprecation(
  deprecation(
  if not isinstance(terminated, (bool, np.bool8)):


Mean reward, random policy: -1222.8205031183095 +/- 264.8168020435486


# One observation image per step

### Define the Environment with Environment Wrapper

In [None]:
import gym
import numpy as np
import cv2
from gym import spaces
#from stable_baselines3.common.envs import DummyVecEnv, VecTransposeImage
from stable_baselines3 import PPO

class ImageObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env, width=36, height=36):
        super(ImageObservationWrapper, self).__init__(env)
        self.width = width
        self.height = height
        self.observation_space = spaces.Box(low=0, high=255, shape=(height, width, 1), dtype=np.uint8)

    def observation(self, obs):

        img = self.env.render(mode='rgb_array')  # Capture the rendered image from the environment

        # Crop the image to focus on the pendulum
        # assuming the pendulum is centered in the middle
        center_x, center_y = img.shape[1] // 2, img.shape[0] // 2
        crop_size = 250
        img = img[center_y - crop_size//2:center_y + crop_size//2, center_x - crop_size//2:center_x + crop_size//2]

        # Convert to grayscale
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

        img = cv2.resize(img, (self.width, self.height))  # Resize the image to the desired size
        img = img  # NO normalization

        # Add a channel dimension to the image (from (height, width) to (height, width, 1)), to make it compatible with CnnPolciy
        img = img[:, :, None]

        # # Plot the image
        #   # NOTE THAT When you display a grayscale image using imshow,
        #   # Matplotlib uses a colormap to map the single-channel grayscale values to colors
        # plt.imshow(img)
        # plt.axis('off')  # Turn off the axis labels
        # plt.show(block=False)  # Non-blocking show
        # plt.pause(0.001)  # Pause to allow the plot to updat

        return img

    def reset(self, **kwargs):
        # Reset the environment and return both the observation and an empty info dict
        obs = self.env.reset(**kwargs)
        img_obs = self.observation(obs)
        return img_obs, {}  # Return the observation and an empty info dictionary

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        img_obs = self.observation(obs)
        return img_obs, reward, done, done, info  # Return observation, reward, terminated, truncated, info

    def render(self, mode='human', **kwargs):
        return self.env.render(mode=mode, **kwargs)


# Custom callback class to track the cumulative reward per episode
class TotalRewardLoggerCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(TotalRewardLoggerCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.current_episode_reward = 0

    def _on_step(self) -> bool:
        # Accumulate rewards using self.locals['rewards'] directly
        reward = self.locals['rewards'][0]
        self.current_episode_reward += reward

        # Check if the episode is done
        if self.locals['dones'][0]:
            # Log the total reward for this episode
            self.episode_rewards.append(self.current_episode_reward)
            self.current_episode_reward = 0

        return True


# Use the existing Pendulum environment
env = gym.make('Pendulum-v1')
env = ImageObservationWrapper(env)
env = DummyVecEnv([lambda: env])



## RecurrentPPO

### Define and Train the Model

In [None]:
# Define the Model
model_RPPOwO = RecurrentPPO('CnnLstmPolicy', env, verbose=1, device=device, ent_coef=0.01, use_sde=True, learning_rate=0.0001, batch_size=256, tensorboard_log="./logs/RPPOwO/")

# Initialize the custom callback
total_reward_logger_RPPOwO = TotalRewardLoggerCallback()

# Train the model with the custom callback
model_RPPOwO.learn(total_timesteps=total_timesteps, callback=total_reward_logger_RPPOwO)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
|    std                  | 1             |
|    value_loss           | 8.24e+03      |
-------------------------------------------
-------------------------------------------
| time/                   |               |
|    fps                  | 39            |
|    iterations           | 471           |
|    time_elapsed         | 1517          |
|    total_timesteps      | 60288         |
| train/                  |               |
|    approx_kl            | 1.1920929e-07 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2           |
|    entropy_loss         | -3.34         |
|    explained_variance   | 5.96e-08      |
|    learning_rate        | 0.0001        |
|    loss                 | 3.61e+03      |
|    n_updates            | 4700          |
|    policy_gradient_loss | -1.76e-06     |
|    std                  | 1             |
|    value_loss           | 7.23e+03      |
-----------

### Save the Model

### Evaluate the Model

In [None]:
# Load the TensorBoard extension
%load_ext tensorboard

# Start TensorBoard and specify the log directory
%tensorboard --logdir ./logs/RPPOwO/ --port 6006

In [None]:
# download the tensorboard results
import shutil
from google.colab import files

# Zip the TensorBoard logs
shutil.make_archive('RPPOwO_pendulum_tensorboard', 'zip', "./logs/RPPOwO/")

# Download the zip file
files.download('RPPOwO_pendulum_tensorboard.zip')

# To open this file, use this code : tensorboard --logdir=<path_to_unzipped_folder>

In [None]:
# Plot the total reward per episode
plt.figure(figsize=(10, 5))
plt.plot(total_reward_logger_RPPOwO.episode_rewards, label='Total Reward per Episode')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('<RecurrentPPO with one image>\n Total Reward per Episode During Training')
plt.legend()
plt.show()

### Test the Model

In [None]:
mean_reward, std_reward = evaluate_policy(model_RPPOwO, env, n_eval_episodes=episodes_test)
print(f"Mean reward, RecurrentPPO with one image per step: {mean_reward} +/- {std_reward}")

## PPO

### Define and Train the Model

In [None]:
# Define the Model
model_PPOwO = PPO('CnnPolicy', env, verbose=1, device='cuda', ent_coef=0.01, use_sde=True, learning_rate=0.0001, batch_size=256, tensorboard_log="./logs/PPOwO/")

# Initialize the custom callback
total_reward_logger_PPOwO = TotalRewardLoggerCallback()

# Train the model with the custom callback
model_PPOwO.learn(total_timesteps=total_timesteps, callback=total_reward_logger_PPOwO)

### Evaluate the Model

Tensorboard

In [None]:
# Load the TensorBoard extension
%load_ext tensorboard

# Start TensorBoard and specify the log directory
%tensorboard --logdir ./logs/PPOwO/ --port 6007

In [None]:
# # download the tensorboard results
# import shutil
# from google.colab import files

# Zip the TensorBoard logs
shutil.make_archive('PPOwO_pendulum_tensorboard', 'zip', "./logs/PPOwO/")

# Download the zip file
files.download('PPOwO_pendulum_tensorboard.zip')

# To open this file, use this code : tensorboard --logdir=<path_to_unzipped_folder>

In [None]:
# Plot the total reward per episode
plt.figure(figsize=(10, 5))
plt.plot(total_reward_logger_PPOwO.episode_rewards, label='Total Reward per Episode')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('<PPO with one image>\n Total Reward per Episode During Training')
plt.legend()
plt.show()

Test the Model

In [None]:
mean_reward, std_reward = evaluate_policy(model_PPOwO, env, n_eval_episodes=episodes_test)
print(f"Mean reward, PPO with one image per step: {mean_reward} +/- {std_reward}")

======================================================================================================================================================================================================================================

======================================================================================================================================================================================================================================