# Navigating a RaceTrack

## Imports

In [19]:
import numpy as np
import pygame
import time
import matplotlib.pyplot as plt #For plotting state-action values, Code for this has been commented out

## Environment and Helper Functions

In [20]:
class Environment:
    def __init__(self):
        
        # The track has been harcoded Manually
        """Create Track:
            -Valid Road: 0
            -Wall: -1
            -Finish Line: +1 """
        self.track_info = np.zeros((8,21))
        for i in range(self.track_info.shape[1]):
            self.track_info[0,i] = -1
        for i in range(0,8):
            self.track_info[1,i] = -1
        self.track_info[1,20] = -1
        for i in range(2,6):
            self.track_info[i,0] = -1
            self.track_info[i,20] = -1
        self.track_info[3,15] = -1
        self.track_info[3,16] = -1
        self.track_info[3,17] = -1
        for i in range(9,14):
            self.track_info[4,i] = -1
        self.track_info[4,19] = -1
        for i in range(0,15):
            self.track_info[5,i] = -1
        self.track_info[5,16] = -1
        self.track_info[5,19] = -1
        self.track_info[6,13] = -1
        self.track_info[6,19] = -1
        self.track_info[7,13] = -1
        self.track_info[7,19] = -1
        for i in range(14,19):
            self.track_info[7,i] = 1
       
        self.START_STATE_1 = [2,1,0,0]
        self.START_STATE_2 = [3,1,0,0]
        self.START_STATE_3 = [4,1,0,0]
        self.START_STATES = [self.START_STATE_1, self.START_STATE_2, self.START_STATE_3]

    

    def take_action(self, curr, action):
        # Given a current state and next action, return the rewards and next state
        
        # New state in case no wall or finish line hit
        new = []
        new.append(curr[0] + curr[2])
        new.append(curr[1] + curr[3])
        new.append(curr[2] + action[0])
        new.append(curr[3] + action[1])
        
    
        # Collision detection with the wall or finish line
        # Track the path of the car along the velocity vector and check which kind of cell is entered
        # This algorithm has been better explained in report
        posi = [curr[0], curr[1]]
        posf = [new[0], new[1]]
        path = [curr[0], curr[1]]
        signal = self.collision_detection(posi, posf, path)
        # Normal state-action
        if(signal == 0): return (new, -1)

        # None as next state means finish line reached
        elif(signal == 1): return (None, -1)

        # Wall hit
        elif(signal == -1): 
            # Random reset
            choice = int(np.random.randint(0, high=3, size=1, dtype=int))
            return (self.START_STATES[choice], -100)
        
        
    def collision_detection(self, posi, posf, path):

        if(posf[0] != posi[0]):
            slope = (posf[1] - posi[1])/(posf[0] - posi[0])
            c = posi[1] - slope * posi[0]
        
        # Set of all horizontal cell borders that the velocity vector intersects
        horizontals = np.linspace(posi[1]+0.5, posf[1] - 0.5, num = posf[1] - posi[1])

        # Set of all vertical cell borders intersected
        verticals = np.linspace(posi[0]+0.5, posf[0] - 0.5, num = posf[0] - posi[0])

        
        marker = 0  # pointer to know which verticals have been crossed already
        signal = 0  # Wall hit, finish line or normal
        for horz in horizontals:
            if (marker >= verticals.shape[0]):
                path[1] += 1
                signal = (self.track_info[path[0], path[1]])
                if (signal != 0):
                    break
            else:
                contact_x = (horz - c)/slope
                for i in range(marker, verticals.shape[0]):
                    if(verticals[i] < contact_x):
                        path[0] += 1
                        marker = i + 1
                        signal = (self.track_info[path[0], path[1]])
                        if (signal != 0):
                            break
                        if (marker >= verticals.shape[0]):
                            path[1] += 1
                            signal = (self.track_info[path[0], path[1]])
                            if (signal != 0):
                                break
                    elif(verticals[i] == contact_x):
                        path[0] += 1
                        path[1] += 1
                        marker = i + 1
                        signal = (self.track_info[path[0], path[1]])
                        break
                    else:
                        path[1] += 1
                        signal = (self.track_info[path[0], path[1]])
                        break
                if (signal != 0):
                    break

        if(signal == 0 and marker < verticals.shape[0]):
            for i in range(marker, verticals.shape[0]):
                path[0] += 1
                signal = (self.track_info[path[0], path[1]])
                if (signal != 0):
                    break
        
        # Take care if signal = -1 or 1
        return signal
        

In [21]:
# Map action number to action value
def act_map(st):
    list = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,0), (0,1), (1,-1), (1,0), (1,1)]
    return list[int(st)]

# Map state to its string form
# "XX-YY-Vx-Vy" format of 6 characters
# For example: [2,1,0,0] --> "020100"
def state_map(list):
    st = ""
    i = 0
    while(i < len(list)):
        if (list[i] < 10 and i < 2):
            strin = "0" + str(list[i])
        else:
            strin = str(list[i])
        st += strin
        i += 1
    return st

## Visualiser (PyGame, Custom Made)

In [None]:
def visualise(state_path):
    ENV = Environment()
    # Define constants
    CELL_SIZE = 25
    SCREEN_WIDTH = CELL_SIZE * 8
    SCREEN_HEIGHT = CELL_SIZE * 21

    # Initialize Pygame
    pygame.init()

    # Create the screen
    screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))

    # Define a function to draw the grid
    def draw_grid(state_str, state_path):
    # Create the grid
        #Racetrack 
        screen.fill(0)
        for i in range(21):
            for j in range(8):
                if(i ==1 and (j == 2 or j ==3 or j == 4)):
                    color = (0,255,0)
                    pygame.draw.rect(screen,color,((j*CELL_SIZE,i*CELL_SIZE),(CELL_SIZE,CELL_SIZE)))
                elif (ENV.track_info[j,i]== -1 or ((j == 7 or j == 6) and (i == 20 or i <= 12))):
                    color = (255,0,0)
                    pygame.draw.rect(screen,color,((j*CELL_SIZE,i*CELL_SIZE),(CELL_SIZE,CELL_SIZE)))
                elif ENV.track_info[j,i] == 0:
                    color = (0,0,255)
                    pygame.draw.rect(screen,color,((j*CELL_SIZE,i*CELL_SIZE),(CELL_SIZE,CELL_SIZE)))
                elif ENV.track_info[j,i] == 1:
                    color = (255,165,0)
                    pygame.draw.rect(screen,color,((j*CELL_SIZE,i*CELL_SIZE),(CELL_SIZE,CELL_SIZE)))
                pygame.draw.rect(screen,(0,0,0),((j*CELL_SIZE,i*CELL_SIZE),(CELL_SIZE,CELL_SIZE)), 1)
            
                    
        #Car current position
        x = int(state_str[:2])
        y = int(state_str[2:4])
        color = (255,255,255)
        pygame.draw.rect(screen,color,((x*CELL_SIZE,y*CELL_SIZE),(CELL_SIZE,CELL_SIZE)))
        
        # Car start position
        x_start = int(state_path[0][:2])
        y_start = int(state_path[0][2:4])
        pygame.draw.rect(screen, (0,0,0), ((x_start*CELL_SIZE,y_start*CELL_SIZE),(CELL_SIZE,CELL_SIZE)))
        
        # Car path
        iter = 0
        for state_str in state_path:
            x = int(state_str[:2]) + 0.5
            y = int(state_str[2:4]) + 0.5
            if(iter != 0):
                pygame.draw.line(screen, (255,255,0), (x_old*CELL_SIZE, y_old*CELL_SIZE), (x*CELL_SIZE, y*CELL_SIZE), 2)
            x_old = x
            y_old = y
            iter += 1
            if(iter == len(state_path)):
                x = x_old + int(state_str[4:5])
                y = y_old + int(state_str[5:6])
                if(x > 7.5): x = 7.5
                if(y > 18.5): y = y_old
                pygame.draw.line(screen, (255,255,0), (x_old*CELL_SIZE, y_old*CELL_SIZE), (x*CELL_SIZE, y*CELL_SIZE), 2)
    
    # Main loop
    running = True
    i =0
    while running:
        i += 1
        # Handle events
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
        if(i == 2): running = False
        for state_str in state_path:
            # Draw the grid
            draw_grid(state_str, state_path)
            
            # Update the screen
            pygame.display.update()
            
            time.sleep(0.5)
        
        #For terminal state, as it is not included in the state_path because of the way i have coded
        sec_last = state_path[-1]
        sec_last_x = sec_last[:2]
        sec_last_y = sec_last[2:4]
        sec_last_vx = sec_last[4:5]
        sec_last_vy = sec_last[5:6]
        new_x = int(sec_last_x) + int(sec_last_vx)
        new_y = int(sec_last_y) + int(sec_last_vy)
        if(new_x > 7): new_x = 7
        if(new_y > 18): new_y = int(sec_last_y)
        draw_grid("0" + str(new_x)+ str(new_y), state_path)
        pygame.display.update()
        time.sleep(0.5)
    # Quit Pygame
    pygame.display.quit()
    pygame.quit()


## On-Policy Agent

In [22]:
class On_Agent:
    def __init__(self):
        # Calculate and store possible velocities and actions possible on them
        self.possible_actions = {}
        for i in range(0,6):
            for j in range(0,6):
                list = []
                for k in range(0,9):
                    tup = act_map(str(k))
                    i_new = i + tup[0]
                    j_new = j + tup[1]
                    if((i_new <= 5) and (i_new >= 0) and (j_new <= 5) and (j_new >= 0) and not (i_new == 0 and j_new == 0)):
                        list.append(str(k))
                self.possible_actions[str(i) + str(j)] = (len(list), list)

        
        self.policy = {}    # Greedy Policy
        self.state_action_vals = {} # Q-values
        self.first_visits = {}  # First-visit records for one episode
    

    def decide_action(self, curr_state_str, epsilon):
        # Decide epsilon-soft action using data from greedy policy
        action = ""
        ep = epsilon
        poss_act = self.possible_actions[curr_state_str[len(curr_state_str) - 2:]]
        num_act = poss_act[0]   #Number of possible actions for given state
        act_lst = poss_act[1]   #Set of possible actions for given state
        choice = int(np.random.randint(0, high=num_act, size=1, dtype=int))
        
        # Tossing an epsilon-biased coin
        if (np.random.rand() < ep):
            action = act_lst[choice]
        
        else:
            # Greedy action
            action = self.policy.get(curr_state_str)

            # If greedy action exists but there is a possible action that hasnt been tried for the state, choose that
            if(action != None and self.state_action_vals.get(curr_state_str + "done") == 0):
                action = act_lst[choice]

            # If greedy action not yet decided
            if(action == None):
                action = act_lst[choice]
                self.policy[curr_state_str] = action
        return action
    

    def episode(self, env, epsilon, print_path = 0):
        curr_state = env.START_STATES[int(np.random.randint(0, high=3, size=1, dtype=int))]
        
        count = 0
        self.first_visits.clear()
        state_list = []
        wall_list = []
        while(curr_state != None):
            
            curr_state_str = state_map(curr_state)
            state_list.append(curr_state_str)
            # Decide epsilon-soft action using data from greedy policy
            ep = epsilon
            action = self.decide_action(curr_state_str, ep)
            
            # Find step number if this is first visit 
            if self.first_visits.get(curr_state_str + action) == None :
                self.first_visits[curr_state_str + action] = [count, 0]

            # Get next state and reward
            ret_tup = (env.take_action(curr_state, act_map(action)))
            curr_state = ret_tup[0]
            curr_reward = ret_tup[1]
            count += 1

            #If there was a wall hit
            if(curr_reward == -100): wall_list.append(1)
            else: wall_list.append(0)
        
        
        if(print_path != 1):
            return (count, wall_list)
        elif(print_path == 1):
            return (count, wall_list, state_list)

    def calc_return(self, state_act, episode_info, gamma):
        state_act_ret = 0
        
        # Return from timesteps
        ret_num = -(self.first_visits[state_act][0] - episode_info[0])
        state_act_ret = (1- pow(gamma, ret_num))/(1- gamma)
        
        # Return from Wall hits
        for i in range(self.first_visits[state_act][0], len(episode_info[1])):
            if(episode_info[1][i] == 1):
                diff = i - self.first_visits[state_act][0]
                state_act_ret += pow(gamma, diff) * -10
        
        return state_act_ret


    def update_values_policy(self, state_act, state_act_ret):
        # After an episode, update the state-action values and decide greedy actions
        
        # Update the state-action value
        curr_set = self.state_action_vals.get(state_act)
        if curr_set == None:
            self.state_action_vals[state_act] = [1,state_act_ret]
        else:
            self.state_action_vals[state_act] = [curr_set[0] + 1, (curr_set[1]*curr_set[0] + state_act_ret)/(curr_set[0] + 1)]
        
        # Find the new greedy action for given state
        updated_val = self.state_action_vals[state_act][1]
        state_str = state_act[:6]
        act_str = state_act[6:]
        state_max = self.state_action_vals.get(state_str + "max")
        
        # If another action has got more value.
        if(state_max == None or state_max[1] < updated_val):
            self.state_action_vals[state_str + "max"] = [act_str, updated_val]
            curr_act = self.policy.get(state_str)
            if(curr_act == None or curr_act != act_str):
                self.policy[state_str] = act_str
                
        
        # If the value of previously greedy action has decreased
        elif(state_max[0] == act_str):
            for poss_act in self.possible_actions[state_str[4:]][1]:
                temp = self.state_action_vals.get(state_str + poss_act)
                if (temp != None and updated_val < temp[1]):
                    self.state_action_vals[state_str + "max"] = [poss_act, temp[1]]
                    curr_act = self.policy.get(state_str)
                    if(curr_act == None or curr_act != poss_act):
                        self.policy[state_str] = poss_act
                        
        
        #If there is an action which has not been explored for given state
        if(self.state_action_vals.get(state_str + "done") == None or self.state_action_vals.get(state_str + "done") == 0):
            mark = 0
            for poss_act in self.possible_actions[state_str[4:]][1]:
                temp = self.state_action_vals.get(state_str + poss_act)
                if(temp == None):
                    mark = 1
                    self.state_action_vals[state_str + poss_act] = [0,0]
                    self.state_action_vals[state_str + "max"] = [poss_act, 0]
                    curr_act = self.policy.get(state_str)
                    if(curr_act == None or curr_act != poss_act):
                        self.policy[state_str] = poss_act
                        
                    break
            if(mark == 0):
                self.state_action_vals[state_str + "done"] = 1
        
        return

    def evaluate_episode(self, episode_info, gamma):
        for state_act in self.first_visits:
            state_act_ret = self.calc_return(state_act, episode_info, gamma)
            self.update_values_policy(state_act, state_act_ret)
            
        return
    

    def learn(self, env, epsilons):
        """ self.list1 = []
        self.list2 = [] """
        for epsilon in epsilons:
            epsil = 0.6
            i = 0
            while(i < epsilon[1]):
                i += 1
                if(i > 50): epsil = epsilon[0]
                ep_len = self.episode(env, epsil)
                gamma = 0.9
                self.evaluate_episode(ep_len[:2], gamma)
                """ self.list1.append(None if self.state_action_vals.get("0201005") == None else self.state_action_vals.get("0201005")[1])
                self.list2.append(None if self.state_action_vals.get("0301005") == None else self.state_action_vals.get("0301005")[1]) """
        """ plt.plot(agent.list1)
            plt.show() """
        return
    


        

In [23]:
def On_policy():
    #For learning
    env = Environment()
    agent = On_Agent()
    epsilons = [(0.3, 120)]
    agent.learn(env, epsilons)
    return agent

def On_policy_test(agent):
    #For visualisation
    env = Environment()
    state_path = agent.episode(env, 0, 1)[2]
    visualise(state_path)
    return





## Off-Policy Agent

In [24]:
class Off_Agent:
    def __init__(self):
        # Calculate and store possible velocities and actions possible on them
        self.possible_actions = {}
        for i in range(0,6):
            for j in range(0,6):
                list = []
                for k in range(0,9):
                    tup = act_map(str(k))
                    i_new = i + tup[0]
                    j_new = j + tup[1]
                    if((i_new <= 5) and (i_new >= 0) and (j_new <= 5) and (j_new >= 0) and not (i_new == 0 and j_new == 0)):
                        list.append(str(k))
                self.possible_actions[str(i) + str(j)] = (len(list), list)

        
        self.policy = {}    # Greedy Policy
        self.state_action_vals = {} # Q-values
        self.every_visits = []  # every-visit records for one episode
    

    def episode(self, env, epsilon, print_path = 0):
        curr_state = env.START_STATES[int(np.random.randint(0, high=3, size=1, dtype=int))]
        count = 0
        self.every_visits.clear()
        state_list = []
        wall_list = []
        prob_list = [] #For getting the importance sampling weights
        while(curr_state != None):
            
            curr_state_str = state_map(curr_state)
            state_list.append(curr_state_str)
            #########################################################################################
            # Behaviour Policy
            # Decide epsilon-soft action using data from greedy policy
            action = ""
            ep = epsilon
            poss_act = self.possible_actions[curr_state_str[len(curr_state_str) - 2:]]
            num_act = poss_act[0]   #Number of possible actions for given state
            act_lst = poss_act[1]   #Set of possible actions for given state
            choice = int(np.random.randint(0, high=num_act, size=1, dtype=int))
            
            # Tossing an epsilon-biased coin
            if (np.random.rand() < ep):
                action = act_lst[choice]
                prob_list.append(ep/num_act)
            
            else:
                # Greedy action
                action = self.policy.get(curr_state_str)
                # If greedy action not yet decided
                if(action == None):
                    action = act_lst[choice]
                    self.policy[curr_state_str] = action
                    prob_list.append(1/num_act)
                else: prob_list.append(1 - ep + ep/num_act)
            #############################################################################################
            
            #Update every visit list
            self.every_visits.append(curr_state_str + action)

            # Get next staet and reward
            ret_tup = (env.take_action(curr_state, act_map(action)))
            curr_state = ret_tup[0]
            curr_reward = ret_tup[1]
            count += 1

            # If a wall hit was involved
            if(curr_reward == -100): wall_list.append(1)
            else: wall_list.append(0)

        if(print_path != 1):
            return (count, wall_list, prob_list)
        elif(print_path == 1):
            return (count, wall_list, prob_list, state_list)

    def evaluate_episode(self, episode_length, gamma):
        # Evaluate and update state-action values and target policy after an episode
        state_act_ret = 0
        state_act_W = 1
        for index, state_act in reversed(list(enumerate(self.every_visits))):
            # Timestep reward
            state_act_ret += -1 + gamma * (state_act_ret)

            #If wall hit occurred
            if episode_length[1][index] == 1:
                state_act_ret += -10
           
            # Update state-action value
            curr_set = self.state_action_vals.get(state_act)
            if curr_set == None:
                self.state_action_vals[state_act] = [state_act_W, state_act_ret]
            else:
                self.state_action_vals[state_act] = [curr_set[0] + state_act_W, curr_set[1] + state_act_W * (state_act_ret - curr_set[1])/(curr_set[0] + state_act_W)]
            
            updated_val = self.state_action_vals[state_act][1]
            state_str = state_act[:6]
            act_str = state_act[6:]
            state_max = self.state_action_vals.get(state_str + "max")
            
            # Update max stat-action values and target policy
            if(state_max == None or state_max[1] < updated_val):
                self.state_action_vals[state_str + "max"] = [act_str, updated_val]
                curr_act = self.policy.get(state_str)
                if(curr_act == None or curr_act != act_str):
                    self.policy[state_str] = act_str

            elif(state_max[0] == act_str):
                for poss_act in self.possible_actions[state_str[4:]][1]:
                    temp = self.state_action_vals.get(state_str + poss_act)
                    if (temp != None and updated_val < temp[1]):
                        self.state_action_vals[state_str + "max"] = [poss_act, temp[1]]
                        curr_act = self.policy.get(state_str)
                        if(curr_act == None or curr_act != poss_act):
                            self.policy[state_str] = poss_act
                            
            #Check if next W is going to be 0 or not, if W- 0 it means the trajectory probabiliy is zero
            if(act_str != self.policy[state_str]): break
            
            #Update Relative trajectory probability (weight)
            state_act_W /= episode_length[2][index]
        return
    

    def learn(self, env, epsilons):
        
        for epsilon in epsilons:
            """ self.list1 = []
            self.list2 = [] """
            epsil = 0.3
            i = 0
            while(i < epsilon[1]):
                i += 1
                if(i > 40000): epsil = epsilon[0]
                ep_len = self.episode(env, epsil)
                gamma = 0.9
                self.evaluate_episode(ep_len[:3], gamma)
                """ self.list1.append(None if self.state_action_vals.get("0201005") == None else self.state_action_vals.get("0201005")[1])
                self.list2.append(None if self.state_action_vals.get("0301005") == None else self.state_action_vals.get("0301005")[1]) """
        """ plt.plot(agent.list1)
            plt.show() """
        return
    


        

In [25]:
def Off_policy():
    env = Environment()
    agent = Off_Agent()
    epsilons = [(0.3, 50000)]
    agent.learn(env, epsilons)
    return agent

def Off_policy_test(agent):
    env = Environment()
    state_path = agent.episode(env, 0, 1)[3]
    visualise(state_path)
    return



## Run Agents

### On-Policy

In [None]:
# Train agent, run this first and then visualise
On_agent = On_policy()

In [None]:
# For Visualisation, run multiple times to see paths from different start posiitons
On_policy_test(On_agent)

### Off-Policy

In [None]:
# Train agent, run this first and then visualise
Off_Agent = Off_policy()

In [None]:
# For Visualisation, run multiple times to see paths from different start posiitons
Off_policy_test(Off_Agent)