In [363]:
import random as random
import numpy as np

# SARSA Algorithm Implementation for Maze
- Here we implement the SARSA algorithm to find an optimum path through a square maze
- There are several *pitfalls* - places where the agent cannot go, and only one exit
- We punish pitfalls with $-50$, and reward finding the exit with $10^5$
- We use a $\epsilon$-greedy policy with $\epsilon(t) = \frac{0.4}{1+10^{-4}t}$
- At each step of the algorihtm, the action-value function is updated according to
$q(s,a) \to q(s,a) - \alpha(r(s') + \gamma q(s',a') - q(s,a))$
- Here $(s,a)$ are the current state and action choice. $s'$ is the state we will go to next under action $a$, and $a'$ is the action the greed-policy algo would choose at this current instance in state $s'$.
- If we hit a wall, the action-value function is updated but the state is not: we stay put and try again
- $\alpha$ is the learning rate and $\gamma$ is the discount factor (incentives high reward actions in the near future)

### Designing the maze

In [367]:
length = 5 #maze is a length x length box
pitfalls = [np.array((0,1)),np.array((1,3)),
            np.array((1,4)),np.array((2,2)),
            np.array((3,3)),np.array((4,3)),
            np.array((4,4))]

start = np.array((4,0))
end = np.array((0,4))

In [381]:
class Agent:
    """
    Agent is the algorithm which moves around the board.
    We initialize the agent with an initial position.
    It has internal clock t which measures its number of steps taken
    The action_val_funcs determines probabilistically which policy the agent will take, it is updated in each experiment
    
    """
    
    
    def __init__(self,initial_pos):
        self.initial_pos = np.array(initial_pos)
        self.current_pos = np.array(initial_pos)
        self.t=0
        self.action_val_funcs = np.zeros((length*length,4)) #num states x num actions
        self.finished = False
        self.gamma = 0.9
        self.alpha = 0.1
        
        
    def _epsilon(self):
        return(0.4/(1+(10**-4)*self.t))
        
    def _find_best_choice(self,state):
        """Given a state, this identifies the best action
        If multiple have the same reward, it chooses a random one.
        """
        index = self._convert_state_to_matrix_index(state)
        find_max_indices = np.argwhere(self.action_val_funcs[index] == np.amax(self.action_val_funcs[index]))
        if np.size(find_max_indices) == 1:
            return(np.argmax(self.action_val_funcs[index]))
        else: 
            return random.choice(find_max_indices)[0]
            
        
    def _greedy_choice(self,state):
        """Implement the epsilon greedy policy: choose the best policy (1-epsilon) times.
        """
        best_option = self._find_best_choice(state)
        if random.random() <= (1-self._epsilon()):
            return best_option
        else:
            return(random.choice(np.delete(np.array(range(4)),best_option)))
    
    def _get_action(self,index):
        options = [np.array((1,0)),np.array((-1,0)),np.array((0,1)),np.array((0,-1))]
        return options[index]
    
    def _convert_state_to_matrix_index(self,state):
        #convert state (a,b) to 0,..lengthxlength-1 matrix index
        return state[0]+length*state[1]
    
    def _get_penalty(self,pos):
        if np.array_equal(pos,end):
            self.finished = True
            return 10**5
        if np.any(np.all(pos == pitfalls, axis=1)):
            self.finished = True
            return -50
        if pos[0] <0 or pos [0] >length-1 or pos[1] <0 or pos[1] >length-1:
            return -2
        else:
            return -1
    
        
    def move_step(self):
        """Moves the agent one step in its environment
        Computes penalty and updates action-value functions based on new data
        """
        action_choice = self._greedy_choice(self.current_pos) 
        
        new_pos = self.current_pos + self._get_action(action_choice)
        
        penalty = self._get_penalty(new_pos)
        state_index = self._convert_state_to_matrix_index(self.current_pos)
        new_state_index = self._convert_state_to_matrix_index(new_pos)
                        
        if penalty == -2 or penalty == -50: #then new state is old state
            new_action_choice = self._greedy_choice(self.current_pos)
            self.action_val_funcs[state_index][action_choice] += self.alpha *(penalty + (self.gamma*self.action_val_funcs[state_index][new_action_choice])
                                                                              - self.action_val_funcs[state_index][action_choice])                                 
        else:   
            new_action_choice = self._greedy_choice(new_pos)
            self.action_val_funcs[state_index][action_choice] += self.alpha *(penalty + (self.gamma*self.action_val_funcs[new_state_index][new_action_choice])
                                                                              - self.action_val_funcs[state_index][action_choice])
                                                                           
        if penalty != -2: #update to new_pos unless illegal move.
            self.current_pos = new_pos
        self.t +=1
        
        
    def _reset(self):
        self.current_pos = np.array(self.initial_pos)
        self.t =0
        self.finished = False
        
    def do_experiment(self):
        while self.t < 1000 and self.finished != True:
            self.move_step()
        self._reset()
    
    def do_experiment_of_length_L(self,L):
        while self.t < L and self.finished !=True:
            self.move_step()
        self._reset()
    
    def get_action_val_func(self):
        return(self.action_val_funcs)
        


In [382]:
agent = Agent((4,0))
for i in range(1000):
    agent.do_experiment()

## Visualizing the Outcome
- The action-value function takes the form of a matrix. Each entry shows the "reward" of choosing a particular action given a particular state

In [383]:
output = agent.get_action_val_func()
output

array([[26624.68733271,  9273.24797948,  3989.57071342,  5221.29554506],
       [20970.32151493, 15694.93261534, 40102.7418863 , 27793.25382699],
       [19245.16437225, 29523.2665254 , 30318.92658173, 23496.3522064 ],
       [15226.88866781, 25398.60091857, 20159.38954819, 19355.50484595],
       [15262.70378069, 20077.3602207 , 16656.01796288, 15866.8579092 ],
       [    0.        ,     0.        ,     0.        ,     0.        ],
       [31083.53655922, 43320.49049783, 55628.20698675, 21202.72257781],
       [21689.01617837, 41239.93468692, 29171.17080115, 23114.58523602],
       [15021.92290463, 32364.21590105,  9307.41640291, 19730.71340709],
       [11762.70546143, 25107.83461293,  5425.90153487, 14406.60517137],
       [51193.08925123, 63470.68564348, 82742.48747292, 66140.52138912],
       [52452.61954037, 67108.75811386, 52992.05202135, 35330.54011644],
       [    0.        ,     0.        ,     0.        ,     0.        ],
       [ 2265.89195589,  3761.68501222,  6115.07613

## Converting Outcome to Directionality
- We convert the above matrix notation into the diagram which demonstrates what the agent has learned.
- We see that in each position of the matrix, it has identified a best action to take
- The blank options are cases where the algorithm has reached no determination (either the region is out of bounds, or a pitfall)

In [384]:
def convert_output_to_diagram(output):
    results = [["","","","",""],["","","","",""],["","","","",""],["","","","",""],["","","","",""]]
    for i in range(length**2):
        translate = {0:"right",1:"left",2:"up",3:"down"}
        if np.array_equal(output[i], np.array((0,0,0,0))):
            results[length-1-int(i/length)][i%length] == "n/a"
        else:
            results[length-1-int(i/length)][i%length] = translate[np.argmax(output[i])]
        
    return(results)

convert_output_to_diagram(output)

[['', '', '', '', ''],
 ['up', '', '', '', ''],
 ['up', 'left', '', 'down', 'down'],
 ['', 'up', 'left', 'left', 'left'],
 ['right', 'up', 'up', 'left', 'left']]