# Behavior Planner for Navigating Traffic
Base code from the Udacity - Self-Driving Car Nanodegree.

Navigate through highway traffic where the car needs to be in a specific lane at some distance down the road.

In [1]:
import numpy as np
import random
import time
from IPython.display import clear_output

## Road

Road is in charge of populating and updating the position of all the cars on the road. This includes our car that is navigating traffic refered to as 'ego'.

In [36]:
class Road(object):
    update_width = 70
    
    # Simple marker
    # ego_rep = " *** "

    # State based marker
    ego_rep = {'KL':   "  V  ", 
               'PLCL': "  V: ", 
               'PLCR': " :V  ", 
               'LCL':  " V>> ", 
               'LCR':  " <<V "}
    
    ego_key = -1
    def __init__(self, speed_limit, traffic_density, lane_speeds):
        self.num_lanes = len(lane_speeds)  # each lane has its own speed flow
        self.lane_speeds = lane_speeds
        self.speed_limit = speed_limit
        self.density = traffic_density
        self.camera_center = self.update_width / 2
        self.vehicles = {}
        self.vehicles_added = 0
        
    def get_ego(self):
        return self.vehicles[self.ego_key]
    
    def populate_traffic(self):
        start_s = int(max(self.camera_center - (self.update_width / 2), 0))
        for l in range(self.num_lanes):
            lane_speed = self.lane_speeds[l]
            vehicle_just_added = False
            for s in range(start_s, start_s + self.update_width):
                if vehicle_just_added:
                    vehicle_just_added = False
                    continue
                if random.random() < self.density:
                    vehicle = Vehicle(l, s, lane_speed, 0)
                    vehicle.state = "CS"
                    self.vehicles_added += 1
                    self.vehicles[self.vehicles_added] = vehicle
                    vehicle_just_added = True

    def advance(self):
        predictions = {}
        for v_id, v in self.vehicles.items():
            preds = v.generate_predictions()
            predictions[v_id] = preds
        for v_id, v in self.vehicles.items():
            if v_id == self.ego_key:
                v.update_state(predictions)
                v.realize_state(predictions)
            v.increment()

    def add_ego(self, lane_num, s, config_data):
        for v_id, v in self.vehicles.items():
            if v.lane == lane_num and v.s == s:
                del self.vehicles[v_id]
        ego = Vehicle(lane_num, s, self.lane_speeds[lane_num], 0)
        ego.configure(config_data)
        ego.state = "KL"
        self.vehicles[self.ego_key] = ego

    def cull(self):
        ego = self.vehicles[self.ego_key]
        center_s = ego.s
        claimed = set([(v.lane, v.s) for v in self.vehicles.values()])
        for v_id, v in self.vehicles.items():
            if v.s > (center_s + self.update_width / 2) or v.s < (center_s - self.update_width / 2):
                try:
                    claimed.remove((v.lane,v.s))
                except:
                    continue
                del self.vehicles[v_id]

                placed = False
                while not placed:
                    lane_num = random.choice(range(self.num_lanes))
                    ds = random.choice(range(self.update_width/2-15,self.update_width/2 -1 ))
                    if lane_num > self.num_lanes / 2:
                        ds *= -1
                    s = center_s + ds
                    if (lane_num, s) not in claimed:
                        placed = True
                        speed = self.lane_speeds[lane_num]
                        vehicle = Vehicle(lane_num, s, speed, 0)
                        self.vehicles_added += 1
                        self.vehicles[self.vehicles_added] = vehicle
                        print ('adding vehicle {} at lane {} with s={}'.format(self.vehicles_added, lane_num, s))

    def __repr__(self):
        s = self.vehicles.get(self.ego_key).s
        self.camera_center = max(s, self.update_width / 2)
        s_min = max(self.camera_center - self.update_width /2, 0)
        s_max = s_min + self.update_width
        road = [["     " if i % 3 == 0 else "     "for ln in range(self.num_lanes)] for i in range(self.update_width)]
        for v_id, v in self.vehicles.items():
            if s_min <= v.s < s_max:
                if v_id == self.ego_key:
                    # Simple ' *** ' marker
#                     marker = self.ego_rep

                    # State based marker
                    marker = self.ego_rep[self.vehicles[self.ego_key].state]
                else:
                    marker = " %03d " % v_id
                road[int(v.s - s_min)][v.lane] = marker
        s = ""
        i = s_min
        for l in road:
            if i % 20 == 0:
                s += "%03d - " % i
            else:
                s += "      "
            i += 1
            s += "|" + "|".join(l) + "|"
            s += "\n"
        return s

## Vehicle

Vehicles are responsible for their own behavior such as slowing down to not run into a slower car ahead.

This is also where our behavior planning happens in update_state().

In [70]:
class Vehicle(object):
    L = 1
    preferred_buffer = 6 # impacts "keep lane" behavior.

    def __init__(self, lane, s, v, a):
        self.lane = lane
        self.s = s 
        self.v = v
        self.a = a
        self.state = "CS"
        self.max_acceleration = None

    # TODO - Implement this method.
    def update_state(self, predictions):
        """
        Updates the "state" of the vehicle by assigning one of the
        following values to 'self.state':

        "KL" - Keep Lane
        - The vehicle will attempt to drive its target speed, unless there is 
        traffic in front of it, in which case it will slow down.

        "LCL" or "LCR" - Lane Change Left / Right
        - The vehicle will IMMEDIATELY change lanes and then follow longitudinal
        behavior for the "KL" state in the new lane.

        "PLCL" or "PLCR" - Prepare for Lane Change Left / Right
        - The vehicle will find the nearest vehicle in the adjacent lane which is
        BEHIND itself and will adjust speed to try to get behind that vehicle.

        INPUTS
        - predictions 
        A dictionary. The keys are ids of other vehicles and the values are arrays
        where each entry corresponds to the vehicle's predicted location at the 
        corresponding timestep. The FIRST element in the array gives the vehicle's
        current position. Example (showing a car with id 3 moving at 2 m/s):

        {
        3 : [
        {"s" : 4, "lane": 0},
        {"s" : 6, "lane": 0},
        {"s" : 8, "lane": 0},
        {"s" : 10, "lane": 0},
        ]
        }

        """
        
        # defining the structure of the finite state machine
        next_states = {'KL': ['KL', 'PLCL', 'PLCR', 'LCL', 'LCR'], 
                       'PLCL': ['PLCL', 'LCL', 'KL'], 
                       'PLCR': ['PLCR', 'LCR', 'KL'], 
                       'LCL': ['KL'], 
                       'LCR': ['KL']}
        
        current_time = 0
        
        next_states_cost = []
        
        # we are in self.state. we need to estimate cost for all options
        for next_state in next_states[self.state]:
            # lane changing 
            if next_state == "LCL":
                delta = 1
            elif next_state == "LCR":
                delta = -1
            else:
                delta = 0
            target_lane = self.lane + delta
            target_s    = self.s
            
            # acceleration
            target_acceleration = self._max_accel_for_lane(predictions, target_lane, target_s)
            
            # collision detection
            collision_detected = False  # general case
            if delta != 0: # changing lanes
                # two possible problems
                # 1. going off the road
                # 2. bumping onto another car
                if (target_lane >= self.lanes_available or target_lane < 0): # 1. off road
                    collision_detected = True
                else: # car is on the road. 2. check for collision with other cars
                    for car_key in predictions:
                        other_car_lane = predictions[car_key][current_time]['lane']
                        other_car_s = predictions[car_key][current_time]['s']
                        if (other_car_lane == target_lane) and (abs(other_car_s - target_s) <= self.L):
                            collision_detected = True
                            
            # calculating cost
            # 1. collision cost
            collision_cost = 1000000000 * int(collision_detected)
            # 2. achieving goal lane
            ds = self.goal_s - self.s
            dl = abs(self.goal_lane - target_lane)
            ds = 0.1 if ds==0 else ds # avoid division by zero
            goal_lane_cost = dl/ds
            # 3. going slow cost
            acceleration_cost = (1 - target_acceleration / self.max_acceleration)
            
            # total cost
            cost = 1.0 * collision_cost + 1000.0 * goal_lane_cost + 10.0 * acceleration_cost
            
            next_states_cost.append((cost, next_state))
            
        # now i know the cost for every option    
        next_states_cost.sort() # sort on next state cost
        # i choose as next state the one with the lower cost
        self.state = next_states_cost[0][1] # the first element on the list, the second in the tuple.

        
    def configure(self, road_data):
        """
        Called by simulator before simulation begins. Sets various
        parameters which will impact the ego vehicle. 
        """
        self.target_speed = road_data['speed_limit']
        self.lanes_available = road_data["num_lanes"]
        self.max_acceleration = road_data['max_acceleration']
        goal = road_data['goal']
        self.goal_lane = goal[1]
        self.goal_s = goal[0]

    def __repr__(self):
        s = "s:    {}\n".format(self.s)
        s +="lane: {}\n".format(self.lane)
        s +="v:    {}\n".format(self.v)
        s +="a:    {}\n".format(self.a)
        return s

    def increment(self, dt=1):
        self.s += self.v * dt
        self.v += self.a * dt


    def state_at(self, t):
        """
        Predicts state of vehicle in t seconds (assuming constant acceleration)
        """
        s = self.s + self.v * t + self.a * t * t / 2
        v = self.v + self.a * t
        return self.lane, s, v, self.a

    def collides_with(self, other, at_time=0):
        """
        Simple collision detection.
        """
        l,   s,   v,   a = self.state_at(at_time)
        l_o, s_o, v_o, a_o = other.state_at(at_time)
        return l == l_o and abs(s-s_o) <= L 

    def will_collide_with(self, other, timesteps):
        for t in range(timesteps+1):
            if self.collides_with(other, t):
                return True, t
        return False, None

    def realize_state(self, predictions):
        """
        Given a state, realize it by adjusting acceleration and lane.
        Note - lane changes happen instantaneously.
        """
        state = self.state
        if   state == "CS"  : self.realize_constant_speed()
        elif state == "KL"  : self.realize_keep_lane(predictions)
        elif state == "LCL" : self.realize_lane_change(predictions, "L")
        elif state == "LCR" : self.realize_lane_change(predictions, "R")
        elif state == "PLCL": self.realize_prep_lane_change(predictions, "L")
        elif state == "PLCR": self.realize_prep_lane_change(predictions, "R")


    def realize_constant_speed(self):
        self.a = 0

    def _max_accel_for_lane(self, predictions, lane, s):
        delta_v_til_target = self.target_speed - self.v
        max_acc = min(self.max_acceleration, delta_v_til_target)
        in_front = [v for (v_id, v) in predictions.items() if v[0]['lane'] == lane and v[0]['s'] > s ]
        if len(in_front) > 0:
            leading = min(in_front, key=lambda v: v[0]['s'] - s)
            next_pos = leading[1]['s']
            my_next = s + self.v
            separation_next = next_pos - my_next
            available_room = separation_next - self.preferred_buffer
            max_acc = min(max_acc, available_room)
        return max_acc

    def realize_keep_lane(self, predictions):
        self.a = self._max_accel_for_lane(predictions, self.lane, self.s)

    def realize_lane_change(self, predictions, direction):
        delta = -1
        if direction == "L": 
            delta = 1
        self.lane += delta
        self.a = self._max_accel_for_lane(predictions, self.lane, self.s)

    def realize_prep_lane_change(self, predictions, direction):
        delta = -1
        if direction == "L": 
            delta = 1
        lane = self.lane + delta
        ids_and_vehicles = [(v_id, v) for (v_id, v) in predictions.items() if v[0]['lane'] == lane and v[0]['s'] <= self.s]
        if len(ids_and_vehicles) > 0:
            vehicles = [v[1] for v in ids_and_vehicles]

            nearest_behind = max(ids_and_vehicles, key=lambda v: v[1][0]['s'])

            print ("nearest behind : {}".format(nearest_behind))
            nearest_behind = nearest_behind[1]
            target_vel = nearest_behind[1]['s'] - nearest_behind[0]['s']
            delta_v = self.v - target_vel
            delta_s = self.s - nearest_behind[0]['s']
            if delta_v != 0:
                print ("delta_v {}".format(delta_v))
                print ("delta_s {}".format(delta_s))
                time = -2 * delta_s / delta_v
                if time == 0:
                    a = self.a
                else:
                    a = delta_v / time
                print ("raw a is {}".format(a))
                if a > self.max_acceleration: 
                    a = self.max_acceleration
                if a < -self.max_acceleration: 
                    a = -self.max_acceleration
                self.a = a
                print ("time : {}".format(time))
                print ("a: {}".format(self.a))
            else :
                min_acc = max(-self.max_acceleration, -delta_s)
                self.a = min_acc

    def generate_predictions(self, horizon=10):
        predictions = []
        for i in range(horizon):
            lane, s, v, a = self.state_at(i)
            predictions.append({'s':s, 'lane': lane})
        return predictions

## Run Simulation

Here we set some hyperparameters and run the simulation.  The simulation outputs an animation of our target car navigating through traffic.

In [72]:
# impacts default behavior for most states
SPEED_LIMIT = 60 

# all traffic in lane (besides ego) follow these speeds
LANE_SPEEDS = [6,7,8,9]
# Random lane speeds
# LANE_SPEEDS = np.random.randint(1, 10, 4)

# Number of available "cells" which should have traffic
TRAFFIC_DENSITY = 0.15
# Random traffic density
# TRAFFIC_DENSITY = np.random.uniform(0.1, 0.5)

# At each timestep, ego can set acceleration to value between 
# -MAX_ACCEL and MAX_ACCEL
MAX_ACCEL = 2

# s value and lane number of goal.
GOAL = (300, 3)

# Apply a random goal distance and goal lane
# GOAL = (np.random.randint(100,500), np.random.randint(0,len(LANE_SPEEDS)-1))

# These affect the visualization
FRAMES_PER_SECOND = 5
AMOUNT_OF_ROAD_VISIBLE = 40


road = Road(SPEED_LIMIT, TRAFFIC_DENSITY, LANE_SPEEDS)
road.update_width = AMOUNT_OF_ROAD_VISIBLE
road.populate_traffic()
ego_config = config = {
    'speed_limit' : SPEED_LIMIT,
    'num_lanes' : len(LANE_SPEEDS),
    'goal' : GOAL,
    'max_acceleration': MAX_ACCEL
}
road.add_ego(2, 0, ego_config)
timestep = 0
while road.get_ego().s <= GOAL[0]:
    timestep += 1
    if timestep > 150: 
        print("Taking too long to reach goal. Go faster!")
        break
    road.advance()
    clear_output(wait=True)
    print(road)
    time.sleep(float(1.0) / FRAMES_PER_SECOND)
ego = road.get_ego()
if ego.lane == GOAL[1]:
    print("You got to the goal in {} seconds!".format(timestep))
else:
    print("You missed the goal. You are in lane {} instead of {}.".format(ego.lane, GOAL[1]))

      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
300 - |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |  V  |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
320 - |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      |     |     |     |     |
      | 