## **Creating Your Custom Environment for `highway-env` package**

### **Rendering component declaration for `highway-env` outputs**

__This section declares all the necessary import statements for rendering display on Google colab server and package installation commands for running highway-env.__

__Note: These import statements are different from the ones used in default OpenAI environments.__

In [2]:
# Package download statements.
# Note 1: This time we also use ffmpeg package for handling video recording related tasks.
# Note 2: The package version are generic and doesn't require any specific package version downloads.

# We are building our own version of highway-env having new driving scenarios.
# !pip install git+https://github.com/eleurent/highway-env

!apt-get update
!pip install pyvirtualdisplay
!apt-get install -y xvfb python-opengl ffmpeg
# importing gym and high_env for loading different environment scenarios.
# import gym
# import highway_env

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:13 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [1,929 kB]
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]

In [7]:
# Similar, to earlier rendering procedures for creating virtual display.
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
from gym.wrappers import Monitor
from pathlib import Path
import base64

# Declaring display screen size for rendering highway-env environment.
display = Display(visible=0, size=(1366, 768))
display.start()

# These functions are also available in '/highway-env/scripts/' directory in utils.py
# file of highway-env github repository. These, can be directly accessed with below
# commands but we are using these functions here to fix a specific size of recorded videos.
# Note: commands are stated below for directly using these functions.
# Also, we have changed these functions slightly. Therefore, refer documentation.

# !git clone https://github.com/eleurent/highway-env.git
# import sys
# sys.path.insert(0, './highway-env/scripts/')
# from utils import record_videos, show_videos, capture_intermediate_frames

def wrap_env(env):
    return Monitor(env, './video', force=True, video_callable=lambda episode: True)


def show_video():
    html = []
    for mp4 in Path('./video').glob("*.mp4"):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append('''<video alt="{}" autoplay
                      loop controls style="height: 252px;">
                      <source src="data:video/mp4;base64,{}" type="video/mp4" />
                 </video>'''.format(mp4, video_b64.decode('ascii')))
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))


def capture_intermediate_frames(env):
    env.unwrapped.automatic_rendering_callback = env.video_recorder.capture_frame

### **Creating new environments in highway-env package**

__Below we are creating U-Turn environment for simulating the collision avoidance scenarios at high speed. We'll create the environment with the provided documentation's help and pip install the `highway-env` after that.__

__Note: When reinstalling the package after executing the `pip uninstall` command restart the runtime environment for using the updated `highway-env` package. And define the rendering alternative functions as well again in the new runtime.__

In [3]:
# Cloning the gitub branch in the '/content/' directory.
!git clone https://github.com/eleurent/highway-env.git

Cloning into 'highway-env'...
remote: Enumerating objects: 112, done.[K
remote: Counting objects: 100% (112/112), done.[K
remote: Compressing objects: 100% (77/77), done.[K
remote: Total 7190 (delta 59), reused 67 (delta 35), pack-reused 7078[K
Receiving objects: 100% (7190/7190), 22.06 MiB | 10.74 MiB/s, done.
Resolving deltas: 100% (4926/4926), done.


In [6]:
# Checking the already existing environments.
# Note: Since, 'u_turn_env.py' is already commited into highway-env
# package it's listed below.
!ls /content/highway-env/highway_env/envs/

common		intersection_env.py  parking_env.py	two_way_env.py
highway_env.py	lane_keeping_env.py  roundabout_env.py	u_turn_env.py
__init__.py	merge_env.py	     summon_env.py


In [7]:
# exploring the __init__.py file
# We have to register our environment in the __init__.py file.
!cat /content/highway-env/highway_env/envs/__init__.py
# We write the last statement
# Note: Since, 'u_turn_env.py' is already commited into highway-env
# package it's listed below. Otherwise, we have to add modules from
# newly added environment coded in xx-env.py file.

from highway_env.envs.highway_env import *
from highway_env.envs.merge_env import *
from highway_env.envs.parking_env import *
from highway_env.envs.summon_env import *
from highway_env.envs.roundabout_env import *
from highway_env.envs.two_way_env import *
from highway_env.envs.intersection_env import *
from highway_env.envs.lane_keeping_env import *
from highway_env.envs.u_turn_env import *


In [5]:
%%writefile /content/highway-env/highway_env/envs/__init__.py

from highway_env.envs.highway_env import *
from highway_env.envs.merge_env import *
from highway_env.envs.parking_env import *
from highway_env.envs.summon_env import *
from highway_env.envs.roundabout_env import *
from highway_env.envs.two_way_env import *
from highway_env.envs.intersection_env import *
from highway_env.envs.lane_keeping_env import *
from highway_env.envs.u_turn_env import *

Overwriting /content/highway-env/highway_env/envs/__init__.py


In [9]:
# Checking the __init__.py file after making our changes.
!cat /content/highway-env/highway_env/envs/__init__.py


from highway_env.envs.highway_env import *
from highway_env.envs.merge_env import *
from highway_env.envs.parking_env import *
from highway_env.envs.summon_env import *
from highway_env.envs.roundabout_env import *
from highway_env.envs.two_way_env import *
from highway_env.envs.intersection_env import *
from highway_env.envs.lane_keeping_env import *
from highway_env.envs.u_turn_env import *

In [10]:
%%writefile /content/highway-env/highway_env/envs/u_turn_env.py

import numpy as np
from gym.envs.registration import register


from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.road.lane import LineType, StraightLane, CircularLane
from highway_env.road.road import Road, RoadNetwork
from highway_env.vehicle.controller import MDPVehicle


class UTurnEnv(AbstractEnv):

    """
    U-Turn risk analysis task: the agent overtakes vehicles that are blocking the
    traffic. High speed overtaking must be balanced with ensuring safety.
    """

    """Penalization received for vehicle collision."""
    COLLISION_REWARD: float = -1.0
    """Reward received for maintaining left most lane."""
    LEFT_LANE_REWARD: float = 0.1
    """Reward received for maintaining cruising speed."""
    HIGH_SPEED_REWARD: float = 0.4

    @classmethod
    def default_config(cls) -> dict:
        config = super().default_config()
        config.update({
            "observation": {
                "type": "TimeToCollision",
                "horizon": 16
            },
            "action": {
                "type": "DiscreteMetaAction",
            },
            "screen_width": 789,
            "screen_height": 289,
            "duration": 10,
            "reward_speed_range": [8, 24],
            "offroad_terminal": False
        })
        return config

    def _reward(self, action: int) -> float:
        """
        The vehicle is rewarded for driving with high speed and collision avoidance.
        :param action: the action performed
        :return: the reward of the state-action transition
        """
        neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)
        lane = self.vehicle.lane_index[2]
        scaled_speed = utils.lmap(self.vehicle.speed, self.config["reward_speed_range"], [0, 1])
        reward = \
            + self.COLLISION_REWARD * self.vehicle.crashed \
            + self.LEFT_LANE_REWARD * lane / max(len(neighbours) - 1, 1) \
            + self.HIGH_SPEED_REWARD * np.clip(scaled_speed, 0, 1)
        reward = utils.lmap(reward, [self.COLLISION_REWARD, self.HIGH_SPEED_REWARD + self.LEFT_LANE_REWARD], [0, 1])
        reward = 0 if not self.vehicle.on_road else reward
        return reward

    def _is_terminal(self) -> bool:
        """
        The episode is over if the ego vehicle crashed or the time is out.
        """
        return self.vehicle.crashed or \
            self.steps >= self.config["duration"]

    def _cost(self, action: int) -> float:
        """
        The constraint signal is the time spent driving on the opposite lane
        and occurrence of collisions.
        """
        return float(self.vehicle.crashed)

    def _reset(self) -> np.ndarray:
        self._make_road()
        self._make_vehicles()

    def _make_road(self, length=128):
        """
        Making double lane road with counter-clockwise U-Turn.
        :return: the road
        """
        net = RoadNetwork()

        # Defining upper starting lanes after the U-Turn.
        # These Lanes are defined from x-coordinate 'length' to 0.
        net.add_lane("c", "d", StraightLane([length, StraightLane.DEFAULT_WIDTH], [0, StraightLane.DEFAULT_WIDTH],
                                            line_types=(LineType.CONTINUOUS_LINE, LineType.STRIPED)))
        net.add_lane("c", "d", StraightLane([length, 0], [0, 0],
                                            line_types=(LineType.NONE, LineType.CONTINUOUS_LINE)))

        # Defining counter-clockwise circular U-Turn lanes.
        center = [length, StraightLane.DEFAULT_WIDTH + 20]  # [m]
        radius = 20  # [m]
        alpha = 0  # [deg]

        radii = [radius, radius+StraightLane.DEFAULT_WIDTH]
        n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
        line = [[c, s], [n, c]]
        for lane in [0, 1]:
            net.add_lane("b", "c",
                         CircularLane(center, radii[lane], np.deg2rad(90 - alpha), np.deg2rad(-90+alpha),
                                      clockwise=False, line_types=line[lane]))

        offset = 2*radius

        # Defining lower starting lanes before the U-Turn.
        # These Lanes are defined from x-coordinate 0 to 'length'.
        net.add_lane("a", "b", StraightLane([0, ((2 * StraightLane.DEFAULT_WIDTH + offset) - StraightLane.DEFAULT_WIDTH)],
                                            [length, ((2 * StraightLane.DEFAULT_WIDTH + offset) - StraightLane.DEFAULT_WIDTH)],
                                            line_types=(LineType.CONTINUOUS_LINE,
                                                        LineType.STRIPED)))
        net.add_lane("a", "b", StraightLane([0, (2 * StraightLane.DEFAULT_WIDTH + offset)],
                                            [length, (2 * StraightLane.DEFAULT_WIDTH + offset)],
                                            line_types=(LineType.NONE,
                                                        LineType.CONTINUOUS_LINE)))

        road = Road(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])
        self.road = road

    def _make_vehicles(self) -> None:
        """
        Strategic addition of vehicles for testing safety behavior limits
        while performing U-Turn manoeuvre at given cruising interval.

        :return: the ego-vehicle
        """

        # These variables add small variations to the driving behavior.
        position_deviation = 2
        speed_deviation = 2

        ego_lane = self.road.network.get_lane(("a", "b", 0))
        ego_vehicle = self.action_type.vehicle_class(self.road,
                                                     ego_lane.position(0, 0),
                                                     speed=16)
        # Stronger anticipation for the turn
        ego_vehicle.PURSUIT_TAU = MDPVehicle.TAU_DS
        # Lower speed range
        ego_vehicle.SPEED_MIN = 8
        ego_vehicle.SPEED_MAX = 24
        ego_vehicle.SPEED_COUNT = 3
        try:
            ego_vehicle.plan_route_to("d")
        except AttributeError:
            pass

        self.road.vehicles.append(ego_vehicle)
        self.vehicle = ego_vehicle

        vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])

        # Note: randomize_behavior() can be commented out if more randomized
        # vehicle interactions are deemed necessary for the experimentation.

        # Vehicle 1: Blocking the ego vehicle
        vehicle = vehicles_type.make_on_lane(self.road,
                                                   ("a", "b", 0),
                                                   longitudinal=25 + self.np_random.randn()*position_deviation,
                                                   speed=13.5 + self.np_random.randn() * speed_deviation)
        vehicle.plan_route_to('d')
        vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Vehicle 2: Forcing risky overtake
        vehicle = vehicles_type.make_on_lane(self.road,
                                                   ("a", "b", 1),
                                                   longitudinal=56 + self.np_random.randn()*position_deviation,
                                                   speed=14.5 + self.np_random.randn() * speed_deviation)
        vehicle.plan_route_to('d')
        # vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Vehicle 3: Blocking the ego vehicle
        vehicle = vehicles_type.make_on_lane(self.road,
                                                   ("b", "c", 1),
                                                   longitudinal=0.5 + self.np_random.randn()*position_deviation,
                                                   speed=4.5 + self.np_random.randn() * speed_deviation)
        vehicle.plan_route_to('d')
        # vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Vehicle 4: Forcing risky overtake
        vehicle = vehicles_type.make_on_lane(self.road,
                                                   ("b", "c", 0),
                                                   longitudinal=17.5 + self.np_random.randn()*position_deviation,
                                                   speed=5.5 + self.np_random.randn() * speed_deviation)
        vehicle.plan_route_to('d')
        # vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Vehicle 5: Blocking the ego vehicle
        vehicle = vehicles_type.make_on_lane(self.road,
                                                   ("c", "d", 0),
                                                   longitudinal=1 + self.np_random.randn()*position_deviation,
                                                   speed=3.5 + self.np_random.randn() * speed_deviation)
        vehicle.plan_route_to('d')
        # vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)

        # Vehicle 6: Forcing risky overtake
        vehicle = vehicles_type.make_on_lane(self.road,
                                                   ("c", "d", 1),
                                                   longitudinal=30 + self.np_random.randn()*position_deviation,
                                                   speed=5.5 + self.np_random.randn() * speed_deviation)
        vehicle.plan_route_to('d')
        # vehicle.randomize_behavior()
        self.road.vehicles.append(vehicle)


register(
    id='u-turn-v0',
    entry_point='highway_env.envs:UTurnEnv'
)


Overwriting /content/highway-env/highway_env/envs/u_turn_env.py


In [7]:
# Uninstalling the highway-env package in case an issue has happened
# While coding your new environment.

# !pip uninstall -y highway-env

# Note: This command comes in handy as we are not sure of the compilation, semantic
# or functional errors till later stages.

Uninstalling highway-env-1.0.dev0:
  Successfully uninstalled highway-env-1.0.dev0


In [8]:
# installing the highway-env package.
!pip install /content/highway-env/

# Note: Press the restart button when reinstalling the highway-env
# package as prompted by the output text below.

Processing ./highway-env
Building wheels for collected packages: highway-env
  Building wheel for highway-env (setup.py) ... [?25l[?25hdone
  Created wheel for highway-env: filename=highway_env-1.0.dev0-cp36-none-any.whl size=86227 sha256=bd10419f642d1e338e4baed64fa54575f95ab14cf16b2fac609e808e33de78b2
  Stored in directory: /root/.cache/pip/wheels/86/f8/1a/561333f2df5a08999032373dfa6641cc1cd21b091cc818e8ff
Successfully built highway-env
Installing collected packages: highway-env
Successfully installed highway-env-1.0.dev0


### **Sample Environment Output**

__Below we output basic environment behavior for referencing and reviewing our changes.__

In [1]:
import gym
import highway_env

In [11]:
env = gym.make("u-turn-v0")
env = wrap_env(env)
obs, done = env.reset(), False
capture_intermediate_frames(env)
for _ in range(3):
    action = env.action_space.sample()
    obs, reward, done, info = env.step(action)
    # env.render(), using render() function here will produce error.
    # As we are already making a 'automatic_rendering_callback'
    # to capture intermediate frames.
env.close()
show_video()

# Note 1: At stage one only add develop the roads and check their functionality.
# After that add other vehicles at your desired location.

### **Training baselines for our newly created environment**

__Below we are training baseline models from `rl-agents` package for 'u-turn-v0' environment.__

```
1. We import our whole rl-agent repository code and install the package.
2. After this we add necessary configuration files for our baseline models.
3. Then we change our execution directory and run the evaluation command.
4. Finally, we create a function to output our trained result agents.
```

In [16]:
# Cloning rl-agent package and installing it.
!git clone https://github.com/eleurent/rl-agents.git
!pip install git+https://github.com/eleurent/rl-agents

Cloning into 'rl-agents'...
remote: Enumerating objects: 255, done.[K
remote: Counting objects: 100% (255/255), done.[K
remote: Compressing objects: 100% (138/138), done.[K
remote: Total 6402 (delta 144), reused 184 (delta 117), pack-reused 6147[K
Receiving objects: 100% (6402/6402), 975.47 KiB | 10.06 MiB/s, done.
Resolving deltas: 100% (4548/4548), done.
Collecting git+https://github.com/eleurent/rl-agents
  Cloning https://github.com/eleurent/rl-agents to /tmp/pip-req-build-rmw7_fvd
  Running command git clone -q https://github.com/eleurent/rl-agents /tmp/pip-req-build-rmw7_fvd
Collecting tensorboardX
[?25l  Downloading https://files.pythonhosted.org/packages/af/0c/4f41bcd45db376e6fe5c619c01100e9b7531c55791b7244815bac6eac32c/tensorboardX-2.1-py2.py3-none-any.whl (308kB)
[K     |████████████████████████████████| 317kB 5.7MB/s 
Building wheels for collected packages: rl-agents
  Building wheel for rl-agents (setup.py) ... [?25l[?25hdone
  Created wheel for rl-agents: filename=

In [17]:
# Making respective directories for MCTSAgent and writing the json files in it.
%mkdir rl-agents/scripts/configs/UTurnEnv/
%mkdir rl-agents/scripts/configs/UTurnEnv/agents/
%mkdir rl-agents/scripts/configs/UTurnEnv/agents/MCTSAgent/ 
# Note: These directory and file structure can be referenced from the already
# existing agent configurations in `rl-agent` package.

In [18]:
%%writefile  rl-agents/scripts/configs/UTurnEnv/env.json

{
    "id": "u-turn-v0",
    "import_module": "highway_env"
}

Writing rl-agents/scripts/configs/UTurnEnv/env.json


In [19]:
%%writefile  rl-agents/scripts/configs/UTurnEnv/agents/MCTSAgent/baseline.json

{
    "__class__": "<class 'rl_agents.agents.tree_search.mcts.MCTSAgent'>",
    "env_preprocessors": [{"method":"simplify"}]
}

Writing rl-agents/scripts/configs/UTurnEnv/agents/MCTSAgent/baseline.json


In [20]:
# Outputting the json file for write operation verification.
!cat rl-agents/scripts/configs/UTurnEnv/env.json


{
    "id": "u-turn-v0",
    "import_module": "highway_env"
}

In [21]:
# Outputting the json file for write operation verification.
!cat  rl-agents/scripts/configs/UTurnEnv/agents/MCTSAgent/baseline.json


{
    "__class__": "<class 'rl_agents.agents.tree_search.mcts.MCTSAgent'>",
    "env_preprocessors": [{"method":"simplify"}]
}

In [22]:
# Changing to execution directory.
%cd rl-agents/scripts/

/content/rl-agents/scripts


In [None]:
# Running the execution command.
!python experiments.py evaluate /content/rl-agents/scripts/configs/UTurnEnv/env.json \
/content/rl-agents/scripts/configs/UTurnEnv/agents/MCTSAgent/baseline.json --train --episodes=10

In [None]:
# Download command for checking agent output videos locally.
!zip -r /content/file.zip /content/rl-agents/scripts/out/UTurnEnv/MCTSAgent/

from google.colab import files
files.download("/content/file.zip")

In [None]:
# DeterministicPlannerAgent agent configuration setting creation.
%mkdir /content/rl-agents/scripts/configs/UTurnEnv/agents/DeterministicPlannerAgent/

In [None]:
%%writefile  /content/rl-agents/scripts/configs/UTurnEnv/agents/DeterministicPlannerAgent/baseline.json

{
    "__class__": "<class 'rl_agents.agents.tree_search.deterministic.DeterministicPlannerAgent'>",
    "env_preprocessors": [{"method":"simplify"}],
    "budget": 75,
    "gamma": 0.7
}

In [None]:
# Running the execution command.
!python experiments.py evaluate /content/rl-agents/scripts/configs/UTurnEnv/env.json \
/content/rl-agents/scripts/configs/UTurnEnv/agents/DeterministicPlannerAgent/baseline.json --train --episodes=10

In [None]:
# Download command for checking agent output videos locally.
!zip -r /content/file.zip /content/rl-agents/scripts/out/UTurnEnv/DeterministicPlannerAgent/

from google.colab import files
files.download("/content/file.zip")

In [None]:
# MDPGapEAgent agent configuration setting creation.
%mkdir /content/rl-agents/scripts/configs/UTurnEnv/agents/MDPGapEAgent/

In [None]:
%%writefile  /content/rl-agents/scripts/configs/UTurnEnv/agents/MDPGapEAgent/baseline.json


{
    "__class__": "<class 'rl_agents.agents.tree_search.mdp_gape.MDPGapEAgent'>",
    "env_preprocessors": [{"method":"simplify"}],
    "gamma": 0.8,
    "budget": 100,
    "accuracy": 0.1,
    "confidence": 1,
    "max_next_states_count": 1,
    "upper_bound":
    {
        "type": "kullback-leibler",
        "time": "global",
        "threshold": "1*np.log(time)"
    },
    "continuation_type": "uniform",
    "step_strategy": "reset"
}

In [None]:
# Running the execution command.
!python experiments.py evaluate /content/rl-agents/scripts/configs/UTurnEnv/env.json \
/content/rl-agents/scripts/configs/UTurnEnv/agents/MDPGapEAgent/baseline.json --train --episodes=64

In [None]:
# Download command for checking agent output videos locally.
!zip -r /content/file.zip /content/rl-agents/scripts/out/UTurnEnv/MDPGapEAgent/

from google.colab import files
files.download("/content/file.zip")

In [None]:
# Outputting specific video files of the trained agent's output directory.
def show_trained_video(path, filename):
    html = []
    for mp4 in Path(path).glob(filename):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append('''<video alt="{}" autoplay
                      loop controls style="height: 476px;">
                      <source src="data:video/mp4;base64,{}" type="video/mp4" />
                 </video>'''.format(mp4, video_b64.decode('ascii')))
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

In [None]:
# Note: run id will be different than the below stated one.
show_trained_video('/content/rl-agents/scripts/out/UTurnEnv/DeterministicPlannerAgent/run_20210211-214834_2398', 'openaigym.video.0.2398.video000000.mp4')

In [None]:
# Note: run id will be different than the below stated one.
show_trained_video('/content/rl-agents/scripts/out/UTurnEnv/MCTSAgent/run_20210211-210340_1781', 'openaigym.video.0.1781.video000001.mp4')