# **Library Loading**


In [None]:
!pip install stable-baselines3[extra]
!pip install shimmy>=2.0
!pip install swig
!pip install pyvirtualdisplay
!pip install sbx-rl
!pip install optuna
!pip install "optuna>=3.3.0" "optuna-dashboard>=0.12.0"



In [None]:
import gymnasium as gym
from gymnasium import spaces
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
import numpy as np
import glob
import io
import base64
from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from math import radians
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3 import DQN
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
import random as rand
from stable_baselines3.her.her_replay_buffer import HerReplayBuffer
from stable_baselines3.common.buffers import ReplayBuffer
import torch
import torch.nn as nn

from typing import Any, Dict

# **Video Loading**

In [None]:
#'eval_video/*.mp4'

def show_video(path):
    mp4list = glob.glob(path)
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

# **Clear Training Videos**

In [None]:
! rm -rf /content/video
! rm -rf /content/mountain_cart_tensorboard
! rm -rf /content/eval_video_dqn
! rm -rf /content/eval_video_ppo

# **Output Quantizing** [2]

In [None]:
def discretize_state_obs(obs, env):
  try:
    assert not np.any(np.isnan(obs)), "NaN in obs"
    assert not np.any(np.isinf(obs)), "inf in obs"
    upper_bounds = env.observation_space.high
    lower_bounds = env.observation_space.low
    state_buckets = (20, 20)

    scaling_factors = [(obs[i] + abs(lower_bounds[i])) / (upper_bounds[i] - lower_bounds[i]) for i in range(len(obs))]
    new_obs = [int(round((state_buckets[i] - 1) * scaling_factors[i])) for i in range(len(obs))]
    new_obs = [min(max(0, new_obs[i]), state_buckets[i] - 1) for i in range(len(obs))]
    return tuple(new_obs)
  except Exception as e:
    print('Error in discretize_state_obs: ', e)
    print("OBS: ", obs)
    raise e


In [None]:
def make_env(env_id, rank, seed=0):
    """
    Utility function for multiprocessed env.

    :param env_id: (str) the environment ID
    :param seed: (int) the inital seed for RNG
    :param rank: (int) index of the subprocess
    """

    def _init():
        env = gym.make(env_id)
        env = gym.wrappers.TransformObservation(env, lambda obs: discretize_state_obs(obs, env), env.observation_space)
        # use a seed for reproducibility
        # Important: use a different seed for each environment
        # otherwise they would generate the same experiences
        env.reset(seed=seed + rank)
        return env

    set_random_seed(seed)
    return _init

# **Hyperparameter Tuning for PPO**

In [None]:
N_ENVS = 16

ENV = gym.make("MountainCar-v0")
ENV = gym.wrappers.TransformObservation(ENV, lambda obs: discretize_state_obs(obs, ENV), ENV.observation_space)

DEFAULT_HYPERPARAMS = {
    "policy": "MlpPolicy",
    "env": ENV,
    "normalize_advantage": True
}

N_TRIALS = 100
N_STARTUP_TRIALS = 5
N_EVALUATIONS = 5
N_TIMESTEPS = int(2e5)
EVAL_FREQ = int(N_TIMESTEPS / N_EVALUATIONS)
N_EVAL_EPISODES = 100

def sample_ppo_params(trial: optuna.Trial) -> Dict[str, Any]:
    gamma = 1.0 - trial.suggest_float("gamma", 0.0001, 0.1, log=True)
    max_grad_norm = trial.suggest_float("max_grad_norm", 0.3, 5.0, log=True)
    gae_lambda = 1.0 - trial.suggest_float("gae_lambda", 0.001, 0.2, log=True)
    learning_rate = trial.suggest_float("lr", 1e-5, 1, log=True)
    vf_coef = trial.suggest_float("vf_coef", 0.01, 1, log=True)
    ent_coef = trial.suggest_float("ent_coef", 0.00001, 0.1, log=True)
    n_epochs = trial.suggest_int("n_epochs", 1, 15, log=True)
    clip_range = trial.suggest_float("clip_range", 0.1, 0.4, log=True)

    return {
        "n_steps": 1024,
        "n_epochs": n_epochs,
        "gamma": gamma,
        "gae_lambda": gae_lambda,
        "learning_rate": learning_rate,
        "ent_coef": ent_coef,
        "vf_coef":vf_coef,
        "max_grad_norm": max_grad_norm,
        "clip_range": clip_range
    }


In [None]:
class TrialEvalCallback(EvalCallback):
    def __init__(
        self,
        eval_env: gym.Env,
        trial: optuna.Trial,
        n_eval_episodes: int = 100,
        eval_freq: int = 10000,
        deterministic: bool = True,
        verbose: int = 0,
    ):
        super().__init__(
            eval_env=eval_env,
            n_eval_episodes=n_eval_episodes,
            eval_freq=eval_freq,
            deterministic=deterministic,
            verbose=verbose,
        )
        self.trial = trial
        self.eval_idx = 0
        self.is_pruned = False

    def _on_step(self) -> bool:
        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            super()._on_step()
            self.eval_idx += 1
            self.trial.report(self.last_mean_reward, self.eval_idx)
            # Prune trial if need.
            if self.trial.should_prune():
                self.is_pruned = True
                return False
        return True

In [None]:
def objective(trial: optuna.Trial) -> float:
    kwargs = DEFAULT_HYPERPARAMS.copy()
    kwargs.update(sample_ppo_params(trial))

    # Create the RL model.
    model = PPO(**kwargs)
    eval_env = gym.make("MountainCar-v0")
    eval_env = gym.wrappers.TransformObservation(eval_env, lambda obs: discretize_state_obs(obs, eval_env), eval_env.observation_space)
    eval_env = Monitor(eval_env)

    eval_callback = TrialEvalCallback(
        eval_env, trial, n_eval_episodes=N_EVAL_EPISODES, eval_freq=EVAL_FREQ, deterministic=True
    )

    nan_encountered = False
    try:
        model.learn(N_TIMESTEPS, callback=eval_callback)
    except AssertionError as e:
        # Sometimes, random hyperparams can generate NaN.
        print(e)
        nan_encountered = True
    finally:
        # Free memory.
        model.env.close()
        eval_env.close()

    # Tell the optimizer that the trial failed.
    if nan_encountered:
        return float("nan")

    if eval_callback.is_pruned:
        raise optuna.exceptions.TrialPruned()

    return eval_callback.last_mean_reward

In [None]:
if __name__ == '__main__':
  torch.set_num_threads(1)

  sampler = TPESampler(n_startup_trials=N_STARTUP_TRIALS)
  pruner = MedianPruner(n_startup_trials=N_STARTUP_TRIALS, n_warmup_steps=N_EVALUATIONS // 4)

  study = optuna.create_study(sampler=sampler, storage="sqlite:///db.sqlite3", pruner=pruner, direction="maximize")

  try:
    study.optimize(objective, n_trials=N_TRIALS, timeout=86400)
  except KeyboardInterrupt:
    pass
  print("Number of finished trails; ", len(study.trials))
  print("Best Trial:")
  trial = study.best_trial

  print("  Value: ", trial.value)
  print("  Params: ")
  for k, v in trial.params.items():
    print("    {}: {}".format(k,v))

  print("User Attributes:")
  for k,v in trial.user_attrs.items():
    print("  {}: {}".format(k,v))

  ENV.close()



## **PPO**

In [None]:
#train_env = SubprocVecEnv(
#            [make_env("MountainCar-v0", i, rand.randint(0, int(2**32)-1)) for i in range(16)],
#            start_method="fork",
#        )
train_env = gym.make("MountainCar-v0")
train_env = gym.wrappers.TransformObservation(train_env, lambda obs: discretize_state_obs(obs, train_env), train_env.observation_space)
#train
#-104.99 and parameters: {'gamma': 0.003998818000297534, 'max_grad_norm': 0.754852106599228, 'gae_lambda': 0.007067184406571027, 'lr': 0.0004789987110124482, 'vf_coef': 0.012947193897824389, 'ent_coef': 0.00048159514652158363, 'n_epochs': 15}. Best is trial 40 with value: -104.99.
#model_ppo = PPO("MlpPolicy", train_env, verbose=0, tensorboard_log="./ppo_mountain_cart_tensorboard/", normalize_advantage=True)
model_ppo = PPO("MlpPolicy", train_env, verbose=0,
                tensorboard_log="./ppo_mountain_cart_tensorboard/",
                normalize_advantage=True,
                gamma=1-0.00037648854617733244,
                learning_rate=0.0008036518212978304,
                max_grad_norm=0.9558861968131098,
                gae_lambda=1-0.06267444573408165,
                ent_coef=0.035073620842585695,
                vf_coef=0.17925670781108438,
                n_epochs=12,
                clip_range=0.7344958587946594)
model_ppo.learn(total_timesteps=int(1000000), log_interval=1, tb_log_name="PPOUntunedRun", progress_bar=True)
model_ppo.save("ppo_mountain_cart")

train_env.close()

Output()

## **DQN**

In [None]:
#train_env = SubprocVecEnv(
#            [make_env("MountainCar-v0", i, rand.randint(0, int(2**32)-1)) for i in range(16)],
#            start_method="fork",
#        )
train_env = gym.make("MountainCar-v0")
train_env = gym.wrappers.TransformObservation(test_env, lambda obs: discretize_state_obs(obs, test_env), test_env.observation_space)
#train
#model_dqn = DQN("MlpPolicy", train_env, verbose=0, tensorboard_log="./dqn_mountain_cart_tensorboard/")
model_dqn = DQN("MlpPolicy", train_env, verbose=0, tensorboard_log="./dqn_mountain_cart_tensorboard/",
                gamma=1- 0.04195072356404917,
                max_grad_norm=1.7026730091251607,
                tau=0.24348727902136028,
                learning_starts=361,
                learning_rate=0.0002588035975848367,
                train_freq=100,
                replay_buffer_class=ReplayBuffer,
                buffer_size=69164
                )
model_dqn.learn(total_timesteps=int(1000000), log_interval=1, tb_log_name="DQNUntunedRun", progress_bar=True)
model_dqn.save("dqn_mountain_cart")

In [None]:
test_env = gym.make("MountainCar-v0")
test_env = gym.wrappers.TransformObservation(test_env, lambda obs: discretize_state_obs(obs, test_env), test_env.observation_space)
#test_env = CustomRewardWrapper(test_env)

mean_reward, std_reward = evaluate_policy(model_ppo, test_env, n_eval_episodes=100)

print(f"mean_reward={mean_reward:.2f} +/- {std_reward:.2f}")

test_env.close()



mean_reward=-129.67 +/- 33.62


##**Video**

In [None]:
#'eval_video/*.mp4'

def show_video(path):
    mp4list = glob.glob(path)
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

In [None]:
eval_env = gym.make("MountainCar-v0", render_mode="rgb_array")
eval_env = gym.wrappers.TransformObservation(eval_env, lambda obs: discretize_state_obs(obs, eval_env), eval_env.observation_space)
#eval_env = CustomRewardWrapper(eval_env)
eval_env = gym.wrappers.RecordVideo(eval_env, 'eval_video_ppo')

obs, info = eval_env.reset()
steps = 0
while True:
    steps += 1
    action, _states = model_ppo.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)
    if terminated or truncated:
        #obs, info = env.reset()
        break

eval_env.close()

  """
  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)


In [None]:
print(steps)
show_video('eval_video_ppo/*.mp4')

111
