<a href="https://colab.research.google.com/github/Aakash3101/Deep-Learning/blob/master/trajectory_highway_planning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Behavioural Planning for Autonomous Highway Driving


##  Setup

We first import useful modules for the environment, agent, and visualization.

In [None]:
# Environment
!pip install highway-env
# to finite mdp
!pip install git+https://github.com/eleurent/finite-mdp

In [2]:
import gym
import highway_env

In [3]:
import numpy as np

class ValueIterationAgent:
    def __init__(self, env, gamma, num_iterations):
        self.finite_mdp = self.is_finite_mdp(env)
        if self.finite_mdp:
            self.mdp = env.mdp
        elif not self.finite_mdp:
            try:
                self.mdp = env.unwrapped.to_finite_mdp()
            except AttributeError:
                raise TypeError("Environment must be of type finite_mdp, install module")
        
        self.env = env
        self.gamma = gamma
        self.iterations = num_iterations


    def act(self, state):
        # If environment is not a finite mdp, it must be converted to one and the state must be recovered
        if not self.finite_mdp:
            self.mdp = self.env.unwrapped.to_finite_mdp()
            state = self.mdp.state
            self.state_action_value = self.get_state_action_value()
        return np.argmax(self.state_action_value[state, :])
    
    def get_state_value(self):
        return self.fixed_point_iterations(
            lambda v: self.best_action_value(self.bellman_expectation(v)),
            np.zeros((self.mdp.transition.shape[0],))
        )

    def get_state_action_value(self):
        return self.fixed_point_iteration(
            lambda q: self.bellman_expectation(self.best_action_value(q)),
            np.zeros((self.mdp.transition.shape[0:2]))
        )
    
    @staticmethod
    def best_action_value(action_values):
        return action_values.max(axis=-1)
    
    def bellman_expectation(self, value):
        if self.mdp.mode == "deterministic":
            next_v = value[self.mdp.transition]
        else:
            raise ValueError("Unknown mode")
        
        next_v[self.mdp.terminal] = 0
        return self.mdp.reward + self.gamma * next_v
    
    def fixed_point_iteration(self, operator, initial):
        value = initial
        for iteration in range(self.iterations):
            next_value = operator(value)
            if np.allclose(value, next_value):
                break
            value = next_value
        return value

    @staticmethod
    def is_finite_mdp(env):
        try:
            finite_mdp = __import__("finite_mdp.envs.finite_mdp_env")
            if isinstance(env, finite_mdp.envs.finite_mdp_env.FiniteMDPEnv):
                return True
        except (ModuleNotFoundError, TypeError):
            return False

## Run an episode

In [4]:
# Make environment

env = gym.make("highway-v0")
env.simplify()
env.reset()

array([[ 1.        ,  1.        ,  0.75      ,  0.41666667,  0.        ],
       [ 1.        ,  0.12989487,  0.        , -0.15206854,  0.        ],
       [ 1.        ,  0.26013313, -0.5       , -0.1731086 ,  0.        ],
       [ 1.        ,  0.41287558,  0.        , -0.0983182 ,  0.        ],
       [ 1.        ,  0.56036382,  0.        , -0.1211421 ,  0.        ]])

In [5]:
import pprint

pprint.pprint(env.config)

{'action': {'type': 'DiscreteMetaAction'},
 'centering_position': [0.3, 0.5],
 'collision_reward': -1,
 'controlled_vehicles': 1,
 'duration': 40,
 'ego_spacing': 2,
 'initial_lane_id': None,
 'lanes_count': 4,
 'manual_control': False,
 'observation': {'type': 'Kinematics'},
 'offroad_terminal': False,
 'offscreen_rendering': False,
 'other_vehicles_type': 'highway_env.vehicle.behavior.IDMVehicle',
 'policy_frequency': 1,
 'real_time_rendering': False,
 'render_agent': True,
 'reward_speed_range': [20, 30],
 'scaling': 5.5,
 'screen_height': 150,
 'screen_width': 600,
 'show_trajectories': False,
 'simulation_frequency': 15,
 'vehicles_count': 50,
 'vehicles_density': 1}


In [None]:
# Visualisation
import sys

!git clone https://github.com/eleurent/highway-env.git
!pip install gym pyvirtualdisplay
!apt-get install -y xvfb python-opengl ffmpeg
sys.path.insert(0, './highway-env/scripts/')


In [7]:
from utils import record_videos, show_videos, capture_intermediate_frames
from tqdm.notebook import trange
env = record_videos(env)
obs, done = env.reset(), False
capture_intermediate_frames(env)

agent = ValueIterationAgent(env, gamma=0.9, num_iterations=100)

for step in trange(env.unwrapped.config["duration"], desc="Running..."):
    
    # get agent to pick action given state s
    action = agent.act(obs)
    obs, reward, done, info = env.step(action)

env.close()
show_videos()

HBox(children=(FloatProgress(value=0.0, description='Running...', max=40.0, style=ProgressStyle(description_wi…


