In [None]:
import numpy as np
import gym
import matplotlib.pyplot as plt
from f110_gym.envs.base_classes import Integrator
import random
import os
import pandas as pd
import time
from sklearn.preprocessing import normalize
from scipy.sparse import csr_matrix
import sys
from f110_gym.envs.f110_env import F110Env

In [None]:
class F1Tenth_navigation:

    def __init__(self,gym_env_code='f110_gym:f110-v0', num_agents=1, map_path=['./f1tenth_racetracks/Austin/Austin_map'], map_ext='.png', sx=0., sy=0., stheta=0., map_centers_file=None, save_path=None, track_name=None, inference=None,reward_file=None,collision_file=None):
        '''
        F1Tenth_navigation class is used to train the agent on the F110 environment.
        Args:
            gym_env_code (str): Gym environment code.
            num_agents (int): Number of agents in the environment.
            map_path (list): List of map paths.
            map_ext (str): Map extension.
            sx (float): Initial x position of the agent.
            sy (float): Initial y position of the agent.
            stheta (float): Initial theta position of the agent.
            map_centers_file (str): Path to the map centers file.
            save_path (str): Path to save the weights and rewards.
            track_name (list): List of track names.
            inference (str): Path to the inference weights file.
            reward_file (str): Path to store the reward file.
            collision_file (str): Path to store the time to collison file.
        '''

        # Environment setup
        self.path_counter = 0
        self.sx, self.sy, self.stheta = sx, sy, stheta
        self.save_path = save_path
        self.track_name = track_name
        self.num_agents = num_agents
        self.map_path = map_path
        self.map_ext = map_ext
        self.map_centers_file = map_centers_file
    
        self.env = gym.make(gym_env_code, map=self.map_path[self.path_counter], map_ext=self.map_ext, num_agents=self.num_agents, timestep=0.01, integrator=Integrator.RK4)
        self.env.add_render_callback(self.render_callback)
       
        file = pd.read_csv(self.map_centers_file[self.path_counter])
        file.columns = ['x', 'y', 'w_r', 'w_l']
        file.index = file.index.astype(int)
        self.map_centers = file.values[:, :2]
        self.track_width = file.loc[0,'w_r'] + file.loc[0,'w_l']
        self.reward_file = reward_file
        self.collision_file = collision_file
        self.track_center_counter = self.map_centers.shape[0]  # Number of centers in the track

        # Random Seed
        self.random_seed = 42
        np.random.seed(self.random_seed)

        # Environment Observation Parameters
        self.num_beams = 1080
        self.n_features = 11
        self.angle = 270

        # LiDAR downsampling parameters
        self.n_sectors = 40
        self.normalized_lidar = np.zeros((1,self.n_sectors))

        # Action Space Parameters
        self.num_angles = self.n_sectors
        self.num_speeds = 10


        # State Space Parameters
        self.num_states = 2 ** self.n_features

        # Speed Parameters
        self.min_speed = 0.8
        self.max_speed = 2

        # Action Space
        self.angles_deg = np.linspace(-self.angle // 2, self.angle // 2, self.num_angles)[::-1]
        self.angles = np.radians(self.angles_deg)
        self.speeds = np.linspace(self.min_speed, self.max_speed, self.num_speeds)
    
        # State Space - Q-Table
        if inference is not None:
            self.weights = np.load(inference)
            self.num_collisions = int(inference.split('_')[-1].split('.')[0])
            print(f'Loaded Weights')
        else:
            self.weights = np.zeros((self.num_states,self.num_angles,self.num_speeds))
            self.num_collisions = 0
        
        self.max_weight = 5

        # ELigibility Trace
        self.ET = np.zeros((1,self.num_states))
        self.IS = np.zeros((self.num_angles,self.num_speeds))

        # projection matrix
        if self.n_features == 10:
            zero_prob = 0.85
            one_prob = 0.15
        if self.n_features == 11:
            zero_prob = 0.8
            one_prob = 0.2
        self.projection_matrix = self.get_projection_matrix(zero_prob=zero_prob,one_prob=one_prob)

        # binary powers
        self.binary_powers = np.array([2 ** i for i in range(self.n_features)])

        # Training Variables
        self.curr_state = None
        self.next_state = None
        
        self.action_threshold_decay = 0.99998
        self.action_threshold = 0.25 * (self.action_threshold_decay ** self.num_collisions)

        # BTSP Parameters
        self.learning_rate = 0.1
        self.ET_decay_rate = 0.9
        self.IS_decay_rate = 0.7

        # Reward
        self.reward = 0
        self.episode_reward = 0
        self.cumulative_reward = 0
        self.episodic_rewards = [0]

        # Time
        self.collision_times = [0]       


    def __update_map(self):
        '''
        Update the map of the environment to the next map in the list.
        This function is called after fixed no. of collision with the environment, usually set to 2000 in the training loop.

        '''
        if self.env.renderer is not None:
            self.env.renderer.close()
        self.path_counter += 1
        if self.path_counter == len(self.map_path):
            self.path_counter = 0
        self.env.map_name = self.map_path[self.path_counter]
        self.env.update_map(f'{self.map_path[self.path_counter]}.yaml',self.map_ext)
        F110Env.renderer = None
        file = pd.read_csv(self.map_centers_file[self.path_counter])
        file.columns = ['x', 'y', 'w_r', 'w_l']
        file.index = file.index.astype(int)
        self.map_centers = file.values[:, :2]
        self.track_width = file.loc[0,'w_r'] + file.loc[0,'w_l']
        self.track_center_counter = self.map_centers.shape[0]  # Reset the counter to the number of centers
        print(f'Map updated to {self.track_name[self.path_counter]}')
        self.logger.info('-------'*20)
        self.logger.info(f'Initializing F1Tenth_navigation with map: {self.map_path[self.path_counter]}')
        
    def render_callback(self, env_renderer):
        '''
        An inbuilt-function that renders map of the environment.
        Render callback function to update the map of the environment.
        Do not modify this function.
        '''
        e = env_renderer
        x = e.cars[0].vertices[::2]
        y = e.cars[0].vertices[1::2]
        top, bottom, left, right = max(y), min(y), min(x), max(x)
        e.score_label.x = left
        e.score_label.y = top - 700
        e.left = left - 800
        e.right = right + 800
        e.top = top + 800
        e.bottom = bottom - 800

    def get_statistical_properties(self,lidar_input,n_sectors=None):
        '''
        Function that is used to downsample the Lidar input.
        This function takes the Lidar input and divides it into n_sectors number of sectors.
        It then calculates the median of each sector and returns the median values.
        Args:
            lidar_input (np.ndarray): Lidar input.
            n_sectors (int): Number of sectors to downsample the Lidar input into.
        '''
        assert n_sectors is not None, "Number of sectors must be provided"
        #  The [100 :-100] is for selecting only those rays corresponding to 220 fov.
        sector_size = np.asarray(lidar_input[100:-100],dtype=np.float32).shape[0] // n_sectors
        sectors = lidar_input[:sector_size * n_sectors].reshape(n_sectors, sector_size)
        return np.median(sectors, axis=1).reshape(1,-1)
    
    def binarize_vector(self,vector):
        '''
        Function that is used to binarize the input.
        This function takes the projected downsampled-Lidar and binarizes it based on the threshold.
        Args:
            vector (np.ndarray): Projected downsampled-Lidar input.
        Returns:
            np.ndarray: Binary represnetation of the Lidar data which is used as a state.
        '''

        threshold = (np.min(vector)+ np.max(vector))/2
        return np.where(vector > threshold, 1, 0)

    def get_projection_matrix(self,zero_prob=0.5,one_prob=0.5):
        '''
        Function that is used to generate the projection matrix.
        This function takes the number of features and the number of angles and generates a random projection matrix.
        This function is called only once to generate the projection matrix and saves it to a file.
        Args:
            zero_prob (float): Probability of selecting 0.
            one_prob (float): Probability of selecting 1.
        Returns:
            np.ndarray: Projection matrix.
        '''

        # Generate a random matrix with values 0 and 1 based on the given probabilities [prob_0,prob_1]
        if not os.path.exists('Projection_matrices'):
            os.mkdir('Projection_matrices')
        if not os.path.exists(os.path.join('Projection_matrices', f'projection_{self.n_features}f_{self.num_angles}a_s{self.random_seed}.npy')):
            matrix = np.random.choice([0, 1], size=(self.n_sectors, self.n_features), p=[zero_prob,one_prob])
            np.save(os.path.join('Projection_matrices', f'projection_{self.n_features}f_{self.num_angles}a_s{self.random_seed}.npy'), matrix)
        else:
            matrix = np.load(os.path.join('Projection_matrices', f'projection_{self.n_features}f_{self.num_angles}a_s{self.random_seed}.npy'))
        return matrix

    def get_binary_representation(self,lidar_input):
        '''
        Function that is used to get the binary representation of the Lidar input.
        This function takes the Lidar input and projects it using the projection matrix.
        It then binarizes the projected Lidar input and returns the binary representation.
        the bias used is here is some types of non linear projections. It is set to zero here.
        Args:
            lidar_input (np.ndarray): Lidar input.
        Returns:
            np.ndarray: Binary representation of the Lidar input.
        '''
        self.normalized_lidar = normalize(lidar_input,axis=1)
        # Do not normalize, just use the raw data
        return self.binarize_vector(np.dot(lidar_input,self.projection_matrix))
    

    def get_state(self, binary):
        '''
        Function that is used to get the state of the agent.
        This function takes the binary representation of the Lidar input and converts it to a state.
        Args:
            binary (np.ndarray): Binary representation of the Lidar input.
        Returns:
            int: State of the agent.
        '''
        return np.dot(binary[0], self.binary_powers)
    

    def select_action(self, state):
        '''
        Function that is used to select the action of the agent.
        This function takes the state of the agent and selects an action based on the Weight matrix.
        The action is selected based on the epsilon greedy policy.
        Args:
            state (int): State of the agent.
        Returns:
            tuple: Angle index and speed index of the selected action.
        '''
        random_number = np.random.rand()
        if random_number < self.action_threshold:
            angle_index = np.random.randint(0, self.num_angles)
            speed_index = np.random.randint(0, self.num_speeds)
        else:
            max_value = np.max(self.weights[state])
            max_indices = np.argwhere(self.weights[state] == max_value)
            angle_index, speed_index  = max_indices[np.random.choice(np.arange(len(max_indices)))]
        
        return angle_index, speed_index

    def select_action_inference(self, state):
        '''
        Function that is used to select the action of the agent during inference.
        This function takes the state of the agent and selects an action based on the Weight matrix.
        Args:
            state (int): State of the agent.
        Returns:
            tuple: Angle index and speed index of the selected action.
        '''
        max_indices = np.argwhere(self.weights[state] == np.max(self.weights[state]))
        angle_index, speed_index  = max_indices[np.random.choice(np.arange(len(max_indices)))]
        return angle_index, speed_index

    def BSTP_weight_update(self,angle_idx,speed_idx,point_reward):
        '''
        The BSTP weight update function is used to update the weights of the agent.The core logic of the BSTP algorithm is implemented here.
        The weights are updated based on the eligibility traces and the IS decay.
        Args:
            angle_idx (int): Angle index of the selected action.
            speed_idx (int): Speed index of the selected action.
            point_reward (float): Reward received from the environment.
        '''
        # Using distance at specific location with IS decay
        ET_IS_product = (csr_matrix(self.ET).T.dot(self.IS.reshape(1,-1))).reshape(self.num_states,self.num_angles,self.num_speeds)
        non_zero_indices = np.argwhere(self.ET!=0)[:,-1]
        self.weights[self.curr_state,angle_idx,speed_idx] += point_reward 
        delta = (self.max_weight - self.weights[non_zero_indices]) * ET_IS_product[non_zero_indices]
        np.add(self.weights[non_zero_indices], self.learning_rate * delta, out=self.weights[non_zero_indices])

    def set_eligibility_traces(self,angle_idx,speed_idx):
        '''
        The set eligibility traces function is used to set the eligibility traces of the agent for the location (state,action) it is in currently.
        Args:
            angle_idx (int): Angle index of the selected action.
            speed_idx (int): Speed index of the selected action.
        '''
        self.ET[0,self.curr_state] = 1
        self.IS[angle_idx,speed_idx] = self.normalized_lidar[0,angle_idx]

    def decay_eligibility_traces(self):
        ''' 
        The decay eligibility traces function is used to decay the eligibility traces of the agent.
        This function decays the eligibility traces based on the decay rate for all the states and action pairs
        '''
        self.ET *= self.ET_decay_rate
        self.IS *= self.IS_decay_rate
        self.IS[self.IS < 1e-6] = 0
        self.ET[self.ET < 1e-4] = 0

    def save_reward_time(self):
        '''
        Helper function to store the episodic reward and collison times.
        '''
        if not os.path.exists(os.path.join(self.save_path)):
            os.mkdir(os.path.join(self.save_path))
        
        if self.reward_file is not None:
            r = np.append(np.load(self.reward_file), self.episodic_rewards)
            t = np.append(np.load(self.collision_file), self.collision_times)
            np.save(os.path.join(self.save_path, f'rewards.npy'), np.array(r))
            np.save(os.path.join(self.save_path, f'times.npy'), np.array(t))
        else:
            np.save(os.path.join(self.save_path, f'rewards.npy'), np.array(self.episodic_rewards))
            np.save(os.path.join(self.save_path, f'times.npy'), np.array(self.collision_times))

    def save_weights(self):
        '''
        Helper function to store the weights of the agent. 
        '''
        if not os.path.exists(os.path.join(self.save_path)):
            os.mkdir(os.path.join(self.save_path))
        np.save(os.path.join(self.save_path, f'{self.track_name[self.path_counter]}_{self.num_collisions + 1}.npy'), self.weights)
        # print(f'File saved')

    def inference(self):
        try:
            obs, step_reward, done, info = self.env.reset(np.array([[self.sx, self.sy, self.stheta[self.track_name[self.path_counter]]]]))
            lidar = obs['scans'][0]
            lidar_down_sampled = self.get_statistical_properties(lidar,n_sectors=self.n_sectors)
            self.curr_state = self.get_state(self.get_binary_representation(lidar_down_sampled))
            angle_index,speed_index = self.select_action_inference(self.curr_state)
            while not done:
                steering_angle,speed = self.angles[angle_index],self.speeds[speed_index]
                obs, step_reward, done, info = self.env.step(np.array([[steering_angle, speed]]))
                lidar = obs['scans'][0]
                lidar_down_sampled = self.get_statistical_properties(lidar,n_sectors=self.n_sectors)
                self.next_state = self.get_state(self.get_binary_representation(lidar_down_sampled))
                angle_index,speed_index = self.select_action_inference(self.next_state)
                self.curr_state = self.next_state
                
                self.env.render(mode='human')
            raise Exception('Done')
        except Exception as e:
            print(f'Exception: {e}')
            self.env.renderer.close()
            self.env.close()
            F110Env.renderer = None

In [None]:
path = './f1tenth_racetracks'
all_map_paths=[]
map_centers = []
maps = []
track_lengths=[]
for folder in os.listdir(path):
    if folder not in ['README.md','.gitignore','convert.py','LICENSE','rename.py','.git']:
        folder_name=folder
        file_name=folder_name.replace(' ','')+'_map'
        map_center = folder_name.replace(' ','')+'_centerline.csv'
        track_lengths.append(len(pd.read_csv(f'{path}/{folder_name}/{map_center}')))
        maps.append(folder_name)
        all_map_paths.append(f'{path}/{folder_name}/{file_name}')
        map_centers.append(f'{path}/{folder_name}/{map_center}')

track_length_list = list(zip(maps,track_lengths))
track_length_list

In [None]:
train_maps = ['Spa','Hockenheim','Shanghai','Nuerburgring','Montreal', 'Austin','Mexico City']
test_maps = [i[0] for i in track_length_list if i[0] not in train_maps]
print(f'Train Maps: {train_maps}')
print(f'Test Maps: {test_maps}')

In [None]:
global num_agents,map_path,map_ext,sx,sy,stheta,indices
num_agents = 1
map_ext = '.png'
sx = 0.
sy = 0.
stheta = {'Oschersleben':0, 
          'BrandsHatch':0.5, 
          'Monza':1, 
          'Catalunya':1, 
          'SaoPaulo':2, 
          'Sepang':0, 
          'Silverstone':1, 
          'YasMarina':0, 
          'Sochi':1, 
          'Melbourne':2, 
          'Budapest':-0.5, 
          'Spielberg':0, 
          'Zandvoort':1, 
          'Sakhir':1, 
          'MoscowRaceway':1,
          'Spa':2, 
          'Hockenheim':2, 
          'Shanghai':0, 
          'Nuerburgring':1, 
          'Montreal':1.5, 
          'Austin':-0.5, 
          'Mexico City':0,
          'IMS_map' : -1.0}

indices = [idx for idx,i in enumerate(maps) if i in test_maps]
# indices = [idx for idx,i in enumerate(maps) if i in train_maps]
map_path_subset = [all_map_paths[i] for i in indices]
map_centers_subset = [map_centers[i] for i in indices]
map_names_subset = [maps[i] for i in indices]

In [None]:
inference_file = 'BTSP_220_Multiple_training/Hockenheim_45500.npy'
reward_file='BTSP_220_Multiple_training/rewards.npy'
collision_file='BTSP_220_Multiple_training/times.npy'

map_index = 0

map_path = [map_path_subset[map_index]]
map_center = [map_centers_subset[map_index]]
map_name = [map_names_subset[map_index]]

save_path = 'Weights_BTSP/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_centers,save_path=save_path,track_name=map_name,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'BTSP Inference on {map_names_subset[map_index]}')
simulator.inference()

In [None]:
inference_file = 'SARSA_220_Multiple_training/Nuerburgring_52000.npy'
reward_file='SARSA_220_Multiple_training/rewards.npy'
collision_file='SARSA_220_Multiple_training/times.npy'

save_path = 'Weights_SARSA/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_center,save_path=save_path,track_name=map_name,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'SARSA Inference on {map_names_subset[map_index]}')
simulator.inference()

# Simple track inference

In [None]:
# Code to run the easy track with no curves
inference_file = 'BTSP_220_Multiple_training/Mexico City_35000.npy'
# inference_file = 'SARSA_220_Multiple_training/Nuerburgring_52000.npy'
reward_file='BTSP_220_Multiple_training/rewards.npy'
collision_file='BTSP_220_Multiple_training/times.npy'

map_path_subset = ['/home/praneeth/shared_f1_tenth /IMS/IMS_map']
map_centers_subset = ['/home/praneeth/shared_f1_tenth /IMS/IMS_centerline.csv']
map_names_subset = ['IMS_map']
map_index = 0

map_path = [map_path_subset[map_index]]
map_center = [map_centers_subset[map_index]]
map_name = [map_names_subset[map_index]]

save_path = 'Weights_BTSP/'

simulator = F1Tenth_navigation(num_agents=num_agents,map_path=map_path_subset,map_ext=map_ext,sx=sx,sy=sy,stheta=stheta,map_centers_file=map_centers_subset,save_path=save_path,track_name=map_names_subset,inference=inference_file,reward_file=reward_file,collision_file=collision_file)
print(f'BTSP Inference on {map_name}')
simulator.inference()