# Setup

Install the intersection example.

**Note**: The runtime needs to be restarted after installing the dependencies, hence `os.kill(os.getpid(), 9)` is added to stop the current session. Please ignore any resulting error message and simply continue to execute the subsequent cells as per normal.

In [None]:
%%capture --no-stderr
!git clone https://github.com/huawei-noah/SMARTS /content/SMARTS
!cd /content/SMARTS && git checkout intersection-v0 && cd /content/SMARTS/examples/rl/intersection && pip install --force-reinstall .
import sys

sys.path.insert(0, "/content/SMARTS/")
import os

os.kill(os.getpid(), 9)

# Develop reinforcement learning code

We begin by building the necessary environment wrappers. Firstly, an info wrapper is built to help log instances when the ego agent succesfully completes an unprotected left turn.

In [None]:
from typing import Any, Dict, Tuple

import gym


class Info(gym.Wrapper):
    def __init__(self, env: gym.Env):
        super(Info, self).__init__(env)

    def step(self, action: Any) -> Tuple[Any, float, bool, Dict[str, Any]]:
        """Steps the environment. A new "is_success" key is added to the
        returned `info`.

        Args:
            action (Any): Action for the agent.

        Returns:
            Tuple[ Any, float, bool, Dict[str, Any] ]:
                Observation, reward, done, and info, for the agent is returned.
        """
        obs, reward, done, info = self.env.step(action)
        info["is_success"] = bool(info["score"])

        return obs, reward, done, info

The `intersection-v0` environment has a continuous action space of `gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)` described by
+ Throttle: [0,1]
+ Brake: [0,1]
+ Steering: [-1,1]

In order to build a simple reinforcement learning policy, we discretise the action space using an action wrapper.

In [None]:
from typing import Callable, Tuple

import gym
import numpy as np


class Action(gym.ActionWrapper):
    def __init__(self, env: gym.Env):
        super().__init__(env)
        self._wrapper, self.action_space = _discrete()

    def action(self, action):
        """Adapts the action input to the wrapped environment.

        Note: Users should not directly call this method.
        """
        wrapped_act = self._wrapper(action)
        return wrapped_act


def _discrete() -> Tuple[Callable[[int], np.ndarray], gym.Space]:
    space = gym.spaces.Discrete(n=4)

    action_map = {
        # key: [throttle, brake, steering]
        0: [0.3, 0, 0],  # keep_direction
        1: [0, 1, 0],  # slow_down
        2: [0.3, 0, -0.5],  # turn_left
        3: [0.3, 0, 0.5],  # turn_right
    }

    def wrapper(model_action: int) -> np.ndarray:
        throttle, brake, steering = action_map[model_action]
        return np.array([throttle, brake, steering], dtype=np.float32)

    return wrapper, space

Next, we define the rewards using a reward wrapper. The agent is rewarded based on the distance travelled (in meters) per step and is penalised when it collides, goes off-road, goes off-route, goes wrong-way, or drives on the road shoulder.

In [None]:
from typing import Dict

import gym
import numpy as np


class Reward(gym.Wrapper):
    def __init__(self, env: gym.Env):
        super().__init__(env)

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        """Adapts the wrapped environment's step.

        Note: Users should not directly call this method.
        """
        obs, reward, done, info = self.env.step(action)
        wrapped_reward = self._reward(obs, reward)

        if done:
            if obs["events"]["reached_goal"]:
                print(f"ENV: Hooray! Vehicle reached goal.")
            elif obs["events"]["reached_max_episode_steps"]:
                print(f"ENV: Vehicle reached max episode steps.")
            elif (
                obs["events"]["off_road"]
                | obs["events"]["collisions"]
                | obs["events"]["off_route"]
                | obs["events"]["on_shoulder"]
                | obs["events"]["wrong_way"]
            ):
                pass
            else:
                print("Events: ", obs["events"])
                raise Exception("Episode ended for unknown reason.")

        return obs, wrapped_reward, done, info

    def _reward(self, obs: Dict[str, gym.Space], env_reward: np.float64) -> np.float64:
        reward = 0

        # Penalty for driving off road
        if obs["events"]["off_road"]:
            reward -= 10
            print(f"ENV: Vehicle went off road.")
            return np.float64(reward)

        # Penalty for driving on road shoulder
        if obs["events"]["on_shoulder"]:
            reward -= 10
            print(f"ENV: Vehicle went on road shoulder.")
            return np.float64(reward)

        # Penalty for driving on wrong way
        if obs["events"]["wrong_way"]:
            reward -= 10
            print(f"ENV: Vehicle went wrong way.")
            return np.float64(reward)

        # Penalty for colliding
        if obs["events"]["collisions"]:
            reward -= 10
            print(f"ENV: Vehicle collided.")
            return np.float64(reward)

        # Penalty for driving off route
        if obs["events"]["off_route"]:
            reward -= 10
            print(f"ENV: Vehicle went off route.")
            return np.float64(reward)

        # Reward for distance travelled
        reward += env_reward

        return np.float64(reward)

The observation space of `intersection-v0` environment is described in `SMARTS/smarts/env/intersection_env.py`. 

In this tutorial, only the top-down rgb image from the observation space is used as input to our reinforcement learning policy. Therefore, a wrapper is built to filter the top-down rgb image observation.  

In [None]:
from typing import Dict

import gym
import numpy as np


class Observation(gym.ObservationWrapper):
    def __init__(self, env: gym.Env):
        super().__init__(env)
        old_space = env.observation_space["top_down_rgb"]
        self.observation_space = gym.spaces.Box(
            low=0,
            high=255,
            shape=(old_space.shape[-1],) + old_space.shape[:-1],
            dtype=np.uint8,
        )

    def observation(self, obs: Dict[str, gym.Space]) -> np.ndarray:
        top_down_rgb = obs["top_down_rgb"]

        # Channel first
        top_down_rgb = top_down_rgb.transpose(2, 0, 1)

        return np.uint8(top_down_rgb)

We proceed to make the `intersection-v0` environment and wrap it with the previously built wrappers. The environment is additionally wrapped with `VecFrameStack`, from Stable Baselines3 library, to give a sense of time to the policy network.

In [None]:
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack, VecMonitor


def make_env(config: Dict[str, Any]) -> gym.Env:
    # Create environment
    env = gym.make(
        "smarts.env:intersection-v0",
        headless=True,
        visdom=False,
        sumo_headless=True,
        img_meters=config["img_meters"],
        img_pixels=config["img_pixels"],
    )

    # Wrap env with action, reward, and observation wrapper
    env = Info(env=env)
    env = Action(env=env)
    env = Reward(env=env)
    env = Observation(env=env)

    # Check custom environment
    check_env(env)

    # Wrap env with SB3 wrappers
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(venv=env, n_stack=config["n_stack"], channels_order="first")
    env = VecMonitor(
        venv=env,
        filename=str(config["logdir"]),
        info_keywords=("is_success",),
    )

    return env

The training and evaluation code is prepared using the Stable Baseline3 API. Training progress is checkpointed using a callback. At the end, the trained agent is saved and evaluated. 

In [None]:
from datetime import datetime
from typing import Any, Dict

import gym
import stable_baselines3 as sb3lib
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.evaluation import evaluate_policy


def run(env: gym.Env, eval_env: gym.Env, config: Dict[str, Any]):

    checkpoint_callback = CheckpointCallback(
        save_freq=config["checkpoint_freq"],
        save_path=config["logdir"] / "checkpoint",
        name_prefix=config["alg"],
    )
    eval_callback = EvalCallback(
        eval_env=eval_env,
        n_eval_episodes=config["eval_eps"],
        eval_freq=config["eval_freq"],
        log_path=config["logdir"] / "eval",
        best_model_save_path=config["logdir"] / "eval",
        deterministic=True,
    )

    if config["mode"] == "evaluate":
        print("\nStart evaluation.\n")
        model = getattr(sb3lib, config["alg"]).load(
            config["model"], print_system_info=True
        )
    else:
        print("\nStart training from scratch.\n")
        model = getattr(sb3lib, config["alg"])(
            env=env,
            verbose=1,
            tensorboard_log=config["logdir"] / "tensorboard",
            **config["alg_kwargs"],
        )
        model.learn(
            total_timesteps=config["train_steps"],
            callback=[checkpoint_callback, eval_callback],
        )
        save_dir = config["logdir"] / "train"
        save_dir.mkdir(parents=True, exist_ok=True)
        time = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
        model.save(save_dir / ("model_" + time))
        print("\nSaved trained model.\n")

    print("\nEvaluate policy.\n")
    mean_reward, std_reward = evaluate_policy(
        model, eval_env, n_eval_episodes=config["eval_eps"], deterministic=True
    )
    print(f"Mean reward:{mean_reward:.2f} +/- {std_reward:.2f}")
    print("\nFinished evaluating.\n")

We specify several training prameters.

In [None]:
import warnings

warnings.simplefilter("ignore", category=DeprecationWarning)
warnings.simplefilter("ignore", category=ResourceWarning)

config = {
    "img_meters": 50,  # Observation image area size in meters.
    "img_pixels": 112,  # Observation image size in pixels.
    "n_stack": 3,  # Number of frames to stack as input to policy network.
    "train_steps": 2500,  # Number of training steps.
    "checkpoint_freq": 1e3,  # Save a model every checkpoint_freq calls to env.step().
    "eval_eps": 10,  # Number of evaluation epsiodes.
    "eval_freq": 1e3,  # Evaluate the trained model every eval_freq steps and save the best model.
    "alg": "PPO",  # Stable Baselines3 algorithm.
    "alg_kwargs": {"policy": "CnnPolicy", "target_kl": 0.1},  # Network policy.
}


def main(config):
    # Make training and evaluation environments.
    env = make_env(config=config)
    eval_env = make_env(config=config)

    # Run training or evaluation.
    run(env=env, eval_env=eval_env, config=config)
    env.close()

Finally, we are ready to train the agent.

In [None]:
from datetime import datetime
from pathlib import Path

time = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
logdir = Path("/content/SMARTS/examples/rl/intersection/logs") / time
logdir.mkdir(parents=True, exist_ok=True)
config["logdir"] = logdir
config["mode"] = "train"

# Train the agent
main(config)

# Evaluate a pre-trained agent

Evaluate a pre-trained agent and compare its performance with the newly-trained agent above. 

In [None]:
# Download the pre-trained agent.
!curl -o /content/SMARTS/examples/rl/intersection/logs/pretrained/intersection.zip --create-dirs -L https://github.com/Adaickalavan/SMARTS-zoo/raw/main/intersection-v0/PPO_5800000_steps.zip

time = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
logdir = Path("/content/SMARTS/examples/rl/intersection/logs") / time
logdir.mkdir(parents=True, exist_ok=True)
config["logdir"] = logdir
config["mode"] = "evaluate"
config[
    "model"
] = "/content/SMARTS/examples/rl/intersection/logs/pretrained/intersection"

# Evaluate the pre-trained agent
main(config)

# Tensorboard

For reference, you may want to view the training logs captured during the training of the pre-trained agent.

In [None]:
# Download the tensorboard logs of the pre-trained agent
!curl -o /content/SMARTS/examples/rl/intersection/logs/pretrained/events.out.tfevents.1651601587.gx3.16.0 --create-dirs -L https://github.com/Adaickalavan/SMARTS-zoo/raw/main/intersection-v0/events.out.tfevents.1651601587.gx3.16.0

# Load the TensorBoard extension
%load_ext tensorboard
%tensorboard --logdir /content/SMARTS/examples/rl/intersection/logs/pretrained/