# SimpleSim Non-Holonomic Navigation Challenge

This notebook attempts to train an agent solve a simplesim non-holonomic driving navigation problem with 1 target with random spawn location


## Install Dependencies and Stable Baselines3 Using Pip



In [1]:
!pip install "stable-baselines3[extra]>=2.0.0a4"



### Setup Tensorboard Logging

In [2]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

# Clear any logs from previous runs
!rm -rf ./logs/

##  Custom Gym Envs

Below are a couple of simpler lower order gridworld type gym environments that can be used as testing and debugging examples ,as well as our main SimpleSIm non-holonomic driving environment (which is imported from the seperate source files env.py and env_gym.py)

In [3]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces

In [4]:
class GoLeftEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a simple env where the agent must learn to go always left.
    """

    # Because of google colab, we cannot implement the GUI ('human' render mode)
    metadata = {"render_modes": ["console"]}

    # Define constants for clearer code
    LEFT = 0
    RIGHT = 1

    def __init__(self, grid_size=10, render_mode="console"):
        super(GoLeftEnv, self).__init__()
        self.render_mode = render_mode

        # Size of the 1D-grid
        self.grid_size = grid_size
        # Initialize the agent at the right of the grid
        self.agent_pos = grid_size - 1

        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions, we have two: left and right
        n_actions = 2
        self.action_space = spaces.Discrete(n_actions)
        # The observation will be the coordinate of the agent
        # this can be described both by Discrete and Box space
        self.observation_space = spaces.Box(
            low=0, high=self.grid_size-1, shape=(1,), dtype=np.float32
        )

    def reset(self, seed=None, options=None):
        """
        Important: the observation must be a numpy array
        :return: (np.array)
        """
        super().reset(seed=seed, options=options)
        # Initialize the agent at the right of the grid
        self.agent_pos = self.grid_size - 1
        # here we convert to float32 to make it more general (in case we want to use continuous actions)
        return np.array([self.agent_pos]).astype(np.float32), {}  # empty info dict

    def step(self, action):
        if action == self.LEFT:
            self.agent_pos -= 1
        elif action == self.RIGHT:
            self.agent_pos += 1
        else:
            raise ValueError(
                f"Received invalid action={action} which is not part of the action space"
            )

        # Account for the boundaries of the grid
        self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size-1)

        # Are we at the left of the grid?
        terminated = bool(self.agent_pos == 0)
        truncated = False  # we do not limit the number of steps here

        # Null reward everywhere except when reaching the goal (left of the grid)
        reward = 1 if self.agent_pos == 0 else 0

        # Optionally we can pass additional info, we are not using that for now
        info = {}

        return (
            np.array([self.agent_pos]).astype(np.float32),
            reward,
            terminated,
            truncated,
            info,
        )

    def render(self):
        # agent is represented as a cross, rest as a dot
        if self.render_mode == "console":
            print("." * self.agent_pos, end="")
            print("x", end="")
            print("." * ((self.grid_size - self.agent_pos)-1))

    def close(self):
        pass



class GoDownLeftEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a simple env where the agent must learn to go always left.
    """

    # Because of google colab, we cannot implement the GUI ('human' render mode)
    metadata = {"render_modes": ["console"]}

    # Define constants for clearer code
    NOTHING = 0
    RIGHT = 1
    UP = 2
    LEFT = 3
    DOWN = 4

    def __init__(self, grid_size=10, render_mode="console"):
        super(GoDownLeftEnv, self).__init__()
        self.render_mode = render_mode

        # Size of the 1D-grid
        self.grid_size = grid_size
        # Initialize the agent at the right of the grid
        self.agent_pos_x = grid_size - 1
        self.agent_pos_y = grid_size - 1

        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions, we have two: left and right
        n_actions = 5
        self.action_space = spaces.Discrete(n_actions)
        # The observation will be the x and y coordinates of the agent
        # this can be described both by Discrete and Box space
        self.observation_space = spaces.Box(
            low=0, high=self.grid_size-1, shape=(2,), dtype=np.float32
        )

    def reset(self, seed=None, options=None):
        """
        Important: the observation must be a numpy array
        :return: (np.array)
        """
        super().reset(seed=seed, options=options)
        # Initialize the agent at the right of the grid
        self.agent_pos_x = self.grid_size - 1
        self.agent_pos_y = self.grid_size - 1
        # here we convert to float32 to make it more general (in case we want to use continuous actions)
        return np.array([self.agent_pos_x, self.agent_pos_y]).astype(np.float32), {}  # empty info dict

    def step(self, action):
        if action == self.NOTHING:
          # Do nothing
          pass
        elif action == self.RIGHT:
            self.agent_pos_x += 1
        elif action == self.UP:
            self.agent_pos_y += 1
        elif action == self.LEFT:
            self.agent_pos_x -= 1
        elif action == self.DOWN:
            self.agent_pos_y -= 1
        else:
            raise ValueError(
                f"Received invalid action={action} which is not part of the action space"
            )

        # Account for the boundaries of the grid
        self.agent_pos_x = np.clip(self.agent_pos_x, 0, self.grid_size-1)
        self.agent_pos_y = np.clip(self.agent_pos_y, 0, self.grid_size-1)

        # Are we at the left of the grid?
        terminated = bool(self.agent_pos_x == 0 and self.agent_pos_y == 0)
        truncated = False  # we do not limit the number of steps here

        # Null reward everywhere except when reaching the goal (left of the grid)
        reward = 1 if (self.agent_pos_x == 0 and self.agent_pos_y == 0) else 0

        # Optionally we can pass additional info, we are not using that for now
        info = {}

        return (
            np.array([self.agent_pos_x, self.agent_pos_y]).astype(np.float32),
            reward,
            terminated,
            truncated,
            info,
        )

    def render(self):
        # agent is represented as a cross, rest as a dot
        if self.render_mode == "console":
            for row in reversed(range(self.grid_size)):
                print("." * self.agent_pos_x, end="")
                if (row == self.agent_pos_y):
                    print("x", end="")
                else:
                    print(".", end="")
                print("." * ((self.grid_size - self.agent_pos_x)-1))

    def close(self):
        pass


# Import our main environment
from env_gym import SimpleSimGym
STARTING_BUDGET = 200
NUM_TARGETS = 1
PLAYER_FOV = 60
RENDER_MODE = "rgb_array"


# This simple toggle can be used to switch which environment we are training the notebook on
env_mode = 2  # 0 for GoLeft, 1 for GoDownLeft, 2 for SimpleSimGym

  and should_run_async(code)


### Validate the environment

Stable Baselines3 provides a [helper](https://stable-baselines3.readthedocs.io/en/master/common/env_checker.html) to check that your environment follows the Gym interface. It also optionally checks that the environment is compatible with Stable-Baselines (and emits warning if necessary).

In [5]:
from stable_baselines3.common.env_checker import check_env

  if not hasattr(tensorboard, "__version__") or LooseVersion(


In [6]:
if env_mode == 0:
  env = GoLeftEnv()
elif env_mode == 1:
  env = GoDownLeftEnv()
else:
  env = SimpleSimGym(starting_budget=STARTING_BUDGET, num_targets=NUM_TARGETS, player_fov=PLAYER_FOV, render_mode=None)

# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

Total Ep Reward: 20


  and should_run_async(code)


## Setup Callback (Auto Saving of the Best Model)

Using the monitoring wrapper, we can save statistics of the environment, and use them to determine the mean training reward. This allows us to save the best model while training.

In [7]:
import os
import numpy as np
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy

In [8]:
class SaveOnBestTrainingRewardCallback(BaseCallback):
    """
    Callback for saving a model (the check is done every ``check_freq`` steps)
    based on the training reward (in practice, we recommend using ``EvalCallback``).

    :param check_freq: (int)
    :param log_dir: (str) Path to the folder where the model will be saved.
      It must contains the file created by the ``Monitor`` wrapper.
    :param verbose: (int)
    """

    def __init__(self, check_freq, log_dir, save_dir, verbose=1):
        super().__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.save_path = os.path.join(save_dir, "best_model")
        self.best_mean_reward = -np.inf

    def _init_callback(self) -> None:
        # Create folder if needed
        if self.log_dir is not None:
            os.makedirs(self.log_dir, exist_ok=True)

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:

            # Retrieve training reward
            # print(self.log_dir)
            # print(load_results(self.log_dir))
            # print(ts2xy(load_results(self.log_dir), "timesteps"))
            x, y = ts2xy(load_results(self.log_dir), "timesteps")
            if len(x) > 0:
                # Mean training reward over the last 100 episodes
                mean_reward = np.mean(y[-100:])
                if self.verbose > 0:
                    print("Num timesteps: {}".format(self.num_timesteps))
                    print(
                        "Best mean reward: {:.2f} - Last mean reward per episode: {:.2f}".format(
                            self.best_mean_reward, mean_reward
                        )
                    )

                # New best model, you could save the agent here
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward
                    # Example for saving best model
                    # if self.verbose > 0:
                    print("Saving new best model at {} timesteps".format(x[-1]))
                    print("Saving new best model to {}.zip".format(self.save_path))
                    self.model.save(self.save_path)

        return True

### Baseline Testing the Environment

Test the performance of an untrained (random) policy on the environment so that we can get a baseline performance to compare to.

In [9]:
# if env_mode == 0:
#   env = GoLeftEnv(grid_size=10)
# elif env_mode == 1:
#   env = GoDownLeftEnv(grid_size=10)
# else:
#   env = SimpleSimGym(starting_budget=STARTING_BUDGET, num_targets=NUM_TARGETS, player_fov=PLAYER_FOV, render_mode=None)

# obs, _ = env.reset()
# env.render()

# print(env.observation_space)
# print(env.action_space)
# print(env.action_space.sample())

# if env_mode == 0:
#   GO_LEFT = 0
#   # Hardcoded best agent: always go left!
#   n_steps = 20
#   for step in range(n_steps):
#       print(f"Step {step + 1}")
#       obs, reward, terminated, truncated, info = env.step(GO_LEFT)
#       done = terminated or truncated
#       print("obs=", obs, "reward=", reward, "done=", done)
#       env.render()
#       if done:
#           print("Goal reached!", "reward=", reward)
#           break
# else:
#   GO_LEFT = 3
#   GO_DOWN = 4
#   # Hardcoded best agent: always go left!
#   n_steps = 20
#   for step in range(n_steps):
#       # Go left
#       print(f"Step {step + 1}")
#       obs, reward, terminated, truncated, info = env.step(GO_LEFT)
#       done = terminated or truncated
#       print("obs=", obs, "reward=", reward, "done=", done)
#       env.render()
#       # Then, go down
#       print(f"Step {step + 1}")
#       obs, reward, terminated, truncated, info = env.step(GO_DOWN)
#       done = terminated or truncated
#       print("obs=", obs, "reward=", reward, "done=", done)
#       env.render()

#       if done:
#           print("Goal reached!", "reward=", reward)
#           break

## Train the Model

In [10]:
config = {
    "policy": 'MlpPolicy',
    "total_timesteps": 500_000,
    "logdir": "logs/",
    "savedir": "saved_models/"
}

# Create log dir
# draft_log_dir = "/tmp/gym/"
os.makedirs(config["logdir"], exist_ok=True)

# Create save dir
# save_dir = "./saved_models/"
os.makedirs(config["savedir"], exist_ok=True)

In [None]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

# Instantiate and wrap the env
if env_mode == 0:
  env = make_vec_env(GoLeftEnv, n_envs=1, monitor_dir=config["logdir"], env_kwargs=dict(grid_size=10))
elif env_mode == 1:
  env = make_vec_env(GoDownLeftEnv, n_envs=1, monitor_dir=config["logdir"], env_kwargs=dict(grid_size=10))
else:
  env = make_vec_env(SimpleSimGym, n_envs=1, monitor_dir=config["logdir"], env_kwargs=dict(starting_budget=STARTING_BUDGET, num_targets=NUM_TARGETS, player_fov=PLAYER_FOV, render_mode=RENDER_MODE))


# Setup callbacks
auto_save_callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=config["logdir"], save_dir=config["savedir"], verbose=0)


# Create the agent
model = A2C(config["policy"], env, tensorboard_log=config["logdir"], verbose=0)


# Train the agent
model.learn(config["total_timesteps"], callback=auto_save_callback, progress_bar=True)


Output()

### Load Model?

In [None]:
# model = A2C.load(f"{config['savedir']}/{config['policy']}")

# Load the best model
best_model = A2C.load(f"{config['savedir']}/best_model.zip")

## Continue Training or Run Dupliacte Experiments?

This cell can be used to either continue training on an existing model (use `reset_num_timesteps=False`) or to run additional duplicate experiments training from scratch to test training consistency

In [None]:
# Run additional duplicate experiments from scratch to test training consistency
# model = A2C(config["policy"], env, tensorboard_log=config["logdir"], verbose=0)
# model.learn(config["total_timesteps"], callback=auto_save_callback, progress_bar=True)

# model = A2C(config["policy"], env, tensorboard_log=config["logdir"], verbose=0)
# model.learn(config["total_timesteps"], callback=auto_save_callback, progress_bar=True)

# model.learn(200_000, tb_log_name="run", reset_num_timesteps=False)
# model.learn(200_000, tb_log_name="run", reset_num_timesteps=False)
# model.learn(200_000, tb_log_name="run", reset_num_timesteps=False)
# model.learn(200_000, tb_log_name="run", reset_num_timesteps=False)

### Save Model ?

In [None]:
# The model will be saved under MlpPolicy.zip
model.save(f"{config['savedir']}/{config['policy']}")

# Show Tensorboard Logs

In [None]:
# Open Tensorboard Logging
%tensorboard --logdir logs/

### Check Performance

Check if the policy can consistently succeed in the environment over multilpe episodes.

In [None]:
# Instantiate the eval env
if env_mode == 0:
    eval_env = make_vec_env(GoLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10))
elif env_mode == 1:
    eval_env = make_vec_env(GoDownLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10))
else:
    eval_env = make_vec_env(SimpleSimGym, n_envs=1, env_kwargs=dict(starting_budget=STARTING_BUDGET, num_targets=NUM_TARGETS, player_fov=PLAYER_FOV, render_mode=RENDER_MODE))




# from stable_baselines3.common.evaluation import evaluate_policy

# # Check performance of best vs last model
# models = {"last": model, "best": best_model}

# for key in models.keys():
#     eval_model = models[key]

#     # Instantiate the eval env
#     if env_mode == 0:
#         eval_env = make_vec_env(GoLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10))
#     elif env_mode == 1:
#         eval_env = make_vec_env(GoDownLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10))
#     else:
#         eval_env = make_vec_env(SimpleSimGym, n_envs=1, env_kwargs=dict(starting_budget=STARTING_BUDGET, num_targets=NUM_TARGETS, player_fov=PLAYER_FOV, render_mode=RENDER_MODE))


#     # Test average reward over multiple episodes
#     mean_reward, std_reward = evaluate_policy(eval_model, eval_env, n_eval_episodes=50)
#     print(f"MODEL TYPE: {key}")
#     print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}\n")

### Visualize Policy with Printouts

In [None]:
eval_model = best_model

if env_mode == 0 or env_mode == 1:
    # Test how many times it successfully reaches the end in 20 steps
    eval_steps = 20
    successes = 0
    for i in range(10):
      obs = eval_env.reset()
      for step in range(eval_steps):
          action, _ = eval_model.predict(obs, deterministic=True)
          # print(f"Step {step + 1}")
          # print("Action: ", action)
          obs, reward, done, info = eval_env.step(action)
          # print("obs=", obs, "reward=", reward, "done=", done)
          # eval_env.render()
          if done:
              # Note that the VecEnv resets automatically
              # when a done signal is encountered
              # print("Goal reached!", "reward=", reward, "\n")
              successes += 1
              break
    print("======================================")
    print(f"{successes} successes / 10 tries")
    print("======================================")

In [None]:
if env_mode == 0 or env_mode == 1:
  # Test the trained agent
  # using the vecenv
  obs = eval_env.reset()
  print(f"Step 0")
  print("obs=", obs)
  eval_env.render()
  n_steps = 40
  for step in range(n_steps):
      action, _ = eval_model.predict(obs, deterministic=True)
      print(f"Step {step + 1}")
      print("Action: ", action)
      obs, reward, done, info = eval_env.step(action)
      print("obs=", obs, "reward=", reward, "done=", done)
      eval_env.render()
      if done:
          # Note that the VecEnv resets automatically
          # when a done signal is encountered
          print("Goal reached!", "reward=", reward)
          break

### Prepare Video Recording

In [None]:
# Set up fake display; otherwise rendering will fail
import os
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'

import base64
from pathlib import Path

from IPython import display as ipythondisplay


def show_videos(video_path="", prefix=""):
    """
    Taken from https://github.com/eleurent/highway-env

    :param video_path: (str) Path to the folder containing videos
    :param prefix: (str) Filter the video, showing only the only starting with this prefix
    """
    html = []
    for mp4 in Path(video_path).glob("{}*.mp4".format(prefix)):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append(
            """<video alt="{}" autoplay
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{}" type="video/mp4" />
                </video>""".format(
                mp4, video_b64.decode("ascii")
            )
        )
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))


from stable_baselines3.common.vec_env import VecVideoRecorder #, DummyVecEnv

# # Create videos dir
# videos_dir = "./videos/"
# os.makedirs(videos_dir, exist_ok=True)

def record_video(eval_env, model, video_length=500, prefix="", video_folder="videos/"):
    """
    :param eval_env: (vec env)
    :param model: (RL model)
    :param video_length: (int)
    :param prefix: (str)
    :param video_folder: (str)
    """
    # eval_env = DummyVecEnv([lambda: gym.make(env_id, render_mode="rgb_array")])
    # Start the video at step=0 and record 500 steps
    eval_env = VecVideoRecorder(
        eval_env,
        video_folder=video_folder,
        record_video_trigger=lambda step: step == 0,
        video_length=video_length,
        name_prefix=prefix,
    )

    obs = eval_env.reset()
    for _ in range(video_length):
        action, _ = model.predict(obs)
        obs, _, _, _ = eval_env.step(action)

    # Close the video recorder
    eval_env.close()

### Visualize Trained Agent with Video



In [None]:
if env_mode == 2:
    record_video(eval_env, eval_model, video_length=500*3, prefix="a2c-simplesim")
    show_videos("videos", prefix="a2c")