In [4]:
import numpy as np

In [59]:
class Grid_World:
    """This class constructs the Grid World and transition probabililties and discount factor according to the CID.
       It provides th optimal policy and the actual value function states, based on the value iteration algorithm."""
    
    def __init__(self, cid):
        """This funciton sets-up the grid as well as the probabililtes, discount factor, and relevant matrices
           for the computation of the value iteration algorithm. It provides the position of all the unblocked 
           states of the GridWorld of interest.
           input: an (8x1) numpy array with each CID digit."""
        
        self.cid = cid
        
        self.states_names = ["s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11"]
        self.states_pos = np.array([[0, 0], [0, 1], [0, 2], [0, 3], [1, 0], [1, 1], [1, 3], [2, 1], [2, 2], [2, 3], [3, 2]])
        self.dic_states = {self.states_names[0]: self.states_pos[0], self.states_names[1]: self.states_pos[1],\
                           self.states_names[2]: self.states_pos[2], self.states_names[3]: self.states_pos[3],\
                           self.states_names[4]: self.states_pos[4], self.states_names[5]: self.states_pos[5],\
                           self.states_names[6]: self.states_pos[6], self.states_names[7]: self.states_pos[7],\
                           self.states_names[8]: self.states_pos[8], self.states_names[9]: self.states_pos[9],\
                           self.states_names[10]: self.states_pos[10]}
        self.directions = ["north", "south", "east", "west"]
        
        self.gamma = 0.2 + 0.5*0.1*cid[6] 
        self.pba = 0.25 + 0.5*0.1*cid[5]
        self.pbo = (1-self.pba)/3
        self.v = np.zeros(len(self.states_names))
        self.policy = np.zeros(len(self.states_names))
        self.delta = 0
        self.diff = 10
        self.grid = np.ones((4 ,4))
        self.values = [0, 0, 0, 0]
        self.reward_state = self.dic_states[self.states_names[(cid[7]+1)%3]]
        
        
    def new_states(self):
        """Figures out if the agent is in a terminal state and computes the potential new states 
        it could take i.e all its neigbhours. Doing so, it ensures that the agent comes back to its 
        current position if a transition would lead him outside of the Grid World boundary.
        input: self//all the defined parameters
        output: a (4,) np.array of 4 (2,) arrays representing the potential new positions it could take."""
    
        self.terminal = 0 #flage to indicate if the current state is terminal 
        
        if (np.all(self.current_state == self.reward_state) or \
            np.all(self.current_state == self.dic_states["s11"])):  #checking if the current position is terminal
            self.new_state = self.current_state
            self.terminal = 1   #flag to show terminal had been reached
            self.pot_new_state = []  #empty list of potential new states as it is static

        else:
            self.translation = np.array([[-1, 0], [1, 0], [0, 1], [0, -1]])   #Transitions arrays: North, S, E, W
            self.pot_new_state = self.current_state + self.translation  #Updating the potential positions
            for i in range(len(self.pot_new_state)): 
                if not((self.pot_new_state[i].tolist() in self.states_pos.tolist())):  #Checking if out of bounds
                    self.pot_new_state[i] = self.current_state  #Imposing the current position for out of bounds potential

        return (self.pot_new_state, self.terminal)
    
    
    
    def get_rewards(self):
        """Gathers the value for the rewards for each of the four potential new states in which the agent would 
        move. Recall that the if the agent is not in a terminal state, the reward is -100 getting to s11, +10 
        getting to the positive reward state (here s2), and -1 to any other state in the GridWorld. If it is in a
        terminal state, the reward is 0."""
        self.reward = np.zeros(4)

        for i in range(len(self.pot_new_state)):
            if self.terminal == 1:    
                self.reward[i] = 0    #reward of 0 if in termnial state
            else:
                if np.all(self.pot_new_state[i] == self.reward_state):
                    self.reward[i] = 10    #reward of +10 if getting to the positive termnial state
                elif np.all(self.pot_new_state[i] == self.dic_states["s11"]):
                    self.reward[i] = -100  #reward of -100 if getting to the negative termnial state
                else:
                    self.reward[i] = -1    #reward of -1 for any other new state

        self.reward = np.reshape(self.reward, (4, 1))
        return(self.reward)

    
    
    def get_value(self):
        """Copmutes the optimal value function at a given state and the policy which derived it. Performs the
        calculation of the BOE.
        input: self
        output: the updated (11,) state value np array and (11,) policy array"""
        self.p = np.ones((4, 4))*self.pbo
        
        for l in range (self.p.shape[0]):
            self.p[l, l] = self.pba   #input the diagonal terms of the probablility matrix to be the p 
                                      #given by the CID, this is done to perfom the matrix calculation later
                                      #with the highest porbability given to the direction which matches the action
                                    
        if self.terminal == 1:        #all 0s if the state is terminal
            self.v[self.s] = 0
            self.policy[self.s] = np.nan   
        else:
            self.values = np.zeros((4, 1))
            self.new_state = np.zeros(4)
            
            for n in range (self.new_state.shape[0]):   #get the index in the state position list 
                                                        #of each of the potential new states
                self.new_state[n] = self.states_pos.tolist().index(self.pot_new_state[n].tolist())
                
            #gather the present value functions of each of the potential states in a (4, 1) np array
            self.mini_values = np.reshape(np.array([self.v[int(self.new_state[0])], self.v[int(self.new_state[1])],\
                                                    self.v[int(self.new_state[2])], self.v[int(self.new_state[3])]]),\
                                          (4, 1)) 
            self.values = np.matmul(self.p, (self.reward + self.gamma*self.mini_values))  #the 4 potential value fucntions
            self.policy[self.s] = np.argmax(np.array(self.values)) #the favored policy 0:North, 1:South, 3:East, 4:West
            self.v[self.s] = np.max(self.values) #update the value function with maximum
            
        return (self.v[self.s], self.policy[self.s])
    
    def map_policy(self):
        """Maps the best policy given by numbers with letter names. Acts like a dictionary.
        input: self
        output: list of length 11, correspopnding to the policy to take at each respective state."""
        self.policies = ["north", "south", "east", "west"]
        self.pol = []
        for i in range(self.policy.size):
            if np.isnan(self.policy[i]):
                self.pol.append("nan")  #nan for the terminal states
            else:
                self.pol.append(self.policies[int(self.policy[i])])
        return self.pol
    
    def grid_form(self):
        """Displays the grid as a (4x4) array to match the Grid World and clearly represent the value funciton
        input: self
        output: np (4x4) array with the value function at each state. nan in the blocked positions."""
        
        self.grid = np.nan*np.ones((4,4))
        for i in range(len(self.states_names)):
            self.x = self.states_pos[i].tolist()[0]
            self.y = self.states_pos[i].tolist()[1]
            self.grid[self.x, self.y] = self.v[i]
        print(self.grid)
        
        return 

    def policy_form(self):
        """Displays the grid as a (4x4) array to match the Grid World and clearly represent the policy
        input: self
        output: np (4x4) array with the policy at each state. nan in the blocked positions and terminal states."""
        
        l = [6, 8, 12, 13, 15]
        for j in range(len(l)):
            self.policy_directions.insert(l[j], 'nan')
        self.policy_directions = np.reshape(np.asarray(self.policy_directions), (4,4))
        print(self.policy_directions)
        
        return
    
    def value_iteration(self):
        """Performs the value iteration algorithm up to a given convergence threshold delta.
        input: self
        output:"""
        
        while self.diff > 0.01: 
            self.v_old = self.v.copy()
            for self.s in range(len(self.states_names)):
                self.current_state = self.dic_states[self.states_names[self.s]]
                (self.pot_new_state, self.terminal) = self.new_states()
                self.reward = self.get_rewards()
                (self.v[self.s], self.policy[self.s]) = self.get_value()
            self.diff = max(self.delta, np.linalg.norm(self.v_old - self.v))
            self.policy_directions = self.map_policy()
        self.grid_form()
        self.policy_form()
        return (self.v, self.policy_directions)


In [60]:
world = Grid_World(np.array([0,1,0,6,8,4,2,0]))

In [61]:
(final_values, final_policy) = world.value_iteration()

[[10.  0. 10.  9.]
 [ 9. 10. nan  8.]
 [nan  9.  8.  7.]
 [nan nan  0. nan]]
[['east' 'nan' 'west' 'west']
 ['north' 'north' 'nan' 'north']
 ['nan' 'north' 'west' 'north']
 ['nan' 'nan' 'nan' 'nan']]
