In [1]:
import numpy as np

import matplotlib.pyplot as plt
from random import randrange
from random import choices
import pandas as pd
import sys
import math
from collections import defaultdict
from sklearn.model_selection import learning_curve


ModuleNotFoundError: No module named 'sklearn'

In [None]:
class GridWorld(object):
    
    def __init__(self):
        
        
        #dimension of Gridworld 
        self.shape = (6,6)
        #locations of the obstacles in the grid
        self.obstacle_locs = [(1,1),(2,3),(2,5),(3,1),(4,1),(4,2),(4,4)]
        #locations of the absorbing states
        self.absorbing_locs = [(1,2),(4,3)]
        #specific rewards respective to each terminal state 
        self.special_rewards = [10, -100]
        #rewards for all other states
        self.default_reward = -1
        # Starting location
        while True:
            self.starting_loc = (np.random.randint(6),np.random.randint(6))
            if (self.starting_loc in self.obstacle_locs)==False and (self.starting_loc in self.absorbing_locs)==False:
                break
        #names of the four actions possible
        self.action_names = ['N','E','S','W']
        #number of actions
        self.action_size = len(self.action_names)
        #defines probability of the desired direction
        self.p = 0.45
        #computes probability of the three other directions (all equal)
        self.r = ((1 - self.p) / 3)
        #randomizing action results: [1 0 0 0] to no Noise in the action results
        self.action_randomizing_array = [self.p, self.r, self.r, self.r]
        
        ############################################
        
        ### Internal State ###
        
        #define attributes of the grid
        state_size, T, R, absorbing, locs = self.build_grid_world()
        #Number of valid states in the gridworld (there are 11 of them)\n",
        self.state_size = state_size
        #transition operator (3D tensor)
        self.T = T
        #reward function (3D tensor)
        self.R = R
        #absorbing states
        self.absorbing = absorbing
        #locations of valid states
        self.locs = locs
        #defines location of starting state
        self.starting_state = self.loc_to_state(self.starting_loc, locs);
        #initialise starting state 
        self.initial = np.zeros((1,len(locs)));
        self.initial[0,self.starting_state] = 1
        #defines walls of the grid 
        self.walls = np.zeros(self.shape);
        for ob in self.obstacle_locs:
            self.walls[ob]=1 
        #defines abosrbers of the grid 
        self.absorbers = np.zeros(self.shape)
        for ab in self.absorbing_locs:
            self.absorbers[ab] = -1
        #defines rewarders of the grid
        self.rewarders = np.zeros(self.shape)
        for i, rew in enumerate(self.absorbing_locs):
            self.rewarders[rew] = self.special_rewards[i]
        #illustrate maps 
        self.paint_maps()
        ############################################
        
    ####### Getters ###########
    
    #computes transition matrix 
    def get_transition_matrix(self):
        return self.T
    #computes reward matrix
    def get_reward_matrix(self):
        return self.R
    
    ###########################
    
    ####### Methods #########
    
    def value_iteration(self, discount = 0.2, threshold = 0.0001):
        
        T = self.get_transition_matrix()
        R = self.get_reward_matrix()
        
        #Initialisation
        epochs = 0
        #initialise delta before increasing
        delta = threshold
        #define each value state at 0 for all states 
        V = np.zeros(self.state_size)
        
        while delta >= threshold:
            #increment epoch 
            epochs += 1 
            #reinitialise delta value
            delta = 0
            
            #considers each state 
            for state_idx in range(self.state_size):
                
                #IF not an absorbing state 
                if not(self.absorbing[0, state_idx]):  
                
                    #stores value computed at the previous state
                    v = V[state_idx]
                
                    #computes Q value
                    Q = np.zeros(4)
                    for state_idx_prime in range(self.state_size):
                        Q += T[state_idx_prime,state_idx,:] * (R[state_idx_prime,state_idx, :] + discount * V[state_idx_prime])
                        
                    #set the new state value to the maximum of Q
                    V[state_idx]= np.max(Q)
                
                    #compute new delta
                    delta = max(delta, np.abs(v - V[state_idx]))
                
        #once loop is finished - fill in optimal policy 
        #initialise optimal policy
        optimal_policy = np.zeros((self.state_size, selft.action_size))
        
        #considers each state 
        for state_idx in range(self.state_size):
            
            #computes Q value 
            Q = np.zeros(4)
            #for successor state
            for state_idx_prime in range(self.state_size):
                Q += T[state_idx_prime,state_idx,:] * (R[state_idx_prime,state_idx, :] + discount * V[state_idx_prime])

            #action that maximises Q values gets probability of 1 
            optimal_policy[state_idx, np.argmax(Q)] = 1
            
        return optimal_policy, epochs
    
    def policy_iteration(self, discount = 0.2, threshold = 0.0001):
        
        #get transition and reward matrices
        R = self.get_reward_matrix()
        T = self.get_transition_matrix()
        
        #initialisation of policy
        #sets policy to a vector of 0 
        policy = np.zeros((self.state_size, self.action_size)) 
        #asks policy to always choose an action
        policy[:,0] = 1
        epochs = 0
        #sets condition to stop the main loop 
        policy_stable = False
        
        while not(policy_stable):
            
            #POLICY EVALUATION 
            V, epochs_eval = self.policy_evaluation(policy, threshold, discount)
            #increment epoch 
            epochs += epochs_eval 
            #sets policy to the boolean True - IF policy unstable, will return false
            policy_stable = True
            
            #POLICY ITERATION
            for state_idx in range(policy.shape[0]):
                #IF not an absorbing state 
                if not(self.absorbing[0, state_idx]):
                    
                    # stores old action
                    old_action = np.argmax(policy[state_idx,:])
                    #computes Q value
                    #initialise Q value to 0 
                    Q = np.zeros(4)
                    for state_idx_prime in range(policy.shape[0]):
                        Q += T[state_idx_prime,state_idx,:] * (R[state_idx_prime,state_idx, :] + discount * V[state_idx_prime])
                        
                    #compute corresponding policy
                    new_policy = np.zeros(4)
                    #action that maximises the Q value gets probability of 1 
                    new_policy[np.argmax(Q)] = 1
                    #update policy
                    policy[state_idx] = new_policy
                    
                    #check if policy has converged 
                    if old_action != np.argmax(policy[state_idx]):
                        #stops main loop if policy unstable
                        policy_stable = False
                        
        return V, policy, epochs
    
    def policy_evaluation(self, policy, threshold, discount):
    
        #set a delta bigger than the threshold 
        delta= 2*threshold
        #get transition and reward matrices
        R = self.get_reward_matrix()
        T = self.get_transition_matrix()
        #initialise the value at 0 
        V = np.zeros(policy.shape[0])
        #make a copy of the value array to hold while evaluation's updating
        Vnew = np.copy(V)
        epoch=0
        #while value is being converged (not yet converged)
        while delta>threshold:
            epoch += 1
            for state_idx in range(policy.shape[0]):
                #If absorbing state - continue 
                if(self.absorbing[0,state_idx]):
                     continue  
                #accumulator variable for the Value of a state - temporary value?
                tmpV = 0 
                for action_idx in range(policy.shape[1]):
                    #accumulator variable for the State-Action value 
                    tmpQ = 0
                    for state_idx_prime in range(policy.shape[0]):
                        tmpQ = tmpQ + T[state_idx_prime,state_idx,action_idx] * (R[state_idx_prime,state_idx,action_idx] + discount* V[state_idx_prime])
                        
                    tmpV += policy[state_idx,action_idx] * tmpQ
                
                #updtate state value 
                Vnew[state_idx] = tmpV
                #once all state values updated - update delta 
                delta =  max(abs(Vnew-V))
                #save new value into the old one 
                V=np.copy(Vnew)
            #return optimal value function (?)
            return V, epoch
        
        
    ##########################
    
    
    #### TD METHOD ###
    
    def eps_greedy(self, eps, Q):
       
        # assign same value to all elements
        policy = np.full((Q.shape[0],Q.shape[1]), eps/self.action_size)
        for state_idx in range(policy.shape[0]):
            policy[state_idx, np.argmax(Q[state_idx])] = 1-eps + eps/self.action_size
           
        return policy
    
    
    def sarsa(self, n, eps, gamma, alpha):
        ## Initialization ##
        Q = np.zeros((self.state_size, self.action_size))
        epsilon_decay = 0.99
        rewards = np.zeros(n)
       
        # value function for estimation error
        V = np.zeros(self.state_size)
        V_track = np.zeros((n,self.state_size))
       
        for i in range(n):
           
            # initialize S
            state_idx = randrange(self.state_size)
            while self.absorbing[0,state_idx]:
                state_idx = randrange(self.state_size)
           
            # choose A using policy derivated from Q
            policy = self.eps_greedy(eps, Q)
            action = self.get_action(policy[state_idx])
           
            # for each step of the episode
            while not self.absorbing[0,state_idx]:
               
                # take A from S and observe S' and R
                state_prime_idx = self.get_next_state(state_idx, action)
                reward = self.R[state_prime_idx, state_idx, action]
                rewards[i] = rewards[i] + reward
               
                # choose A' form S' with new policy
                policy = self.eps_greedy(eps, Q)
                action_prime = self.get_action(policy[state_prime_idx])
               
                # update Q
                Q[state_idx,action] = Q[state_idx,action] + math.log(i+1)/(i+1)*(
                    reward + gamma*Q[state_prime_idx,action_prime] - Q[state_idx,action])
               
                # estimate V
                V[int(state_idx)] = Q[int(state_idx), np.argmax(policy[int(state_idx)])]
               
                action = action_prime
                state_idx = state_prime_idx
               
            #save Values
            V_track[i] = V
           
            # decay epsilon
            eps = eps * epsilon_decay
           
        return policy, Q, rewards, V_track
            

    ########### Internal Drawing Functions #####################
        
    def draw_deterministic_policy(self,Policy):
        #draw a deterministic policy - needs to be a np array of 11 values 
        plt.figure()
        #create graph of the grid
        plt.imshow(self.walls+self.rewarders+self.absorbers)
         
        for state, action in enumerate(Policy):
            if(self.absorbing[0,state]):
                continue
            #defines list of arrows corresponding to each possible action
            arrows = [r"$\uparrow$",r"$\rightarrow$", r"$\downarrow$", r"$\leftarrow$"] 
            action_arrow= arrows[action]
            #compute value location on graph
            location = self.locs[state]
            #place it on graph          
            plt.text(location[1], location[0], action_arrow, ha='center', va='center')
        plt.show()
    
    
    def draw_value(self, Value):
        plt.figure()
        #creates graph of the grid
        plt.imshow(self.walls+self.rewarders+self.absorbers)
        for state, value in enumerate(Value):
            if(self.absorbing[0,state]):
               continue
            #computes value location on graph
            location = self.locs[state]
            #places it on graph
            plt.text(location[1], location[0], round(value,2), ha='center', va='center')
        plt.show()
  
    
    def draw_deterministic_policy_grid(self, Policy, title, n_columns, n_lines):
            plt.figure(figsize=(20,8))
            for subplot in range (len(Policy)):
                ax = plt.subplot(n_columns, n_lines, subplot+1)
                ax.imshow(self.walls+self.rewarders+self.absorbers)
                for state, action in enumerate(Policy[subplot]):
                    if(self.absorbing[0,state]):
                        continue
                    arrows = [r"$\uparrow$",r"$\rightarrow$", r"$\downarrow$", r"$\leftarrow$"] 
                    action_arrow = arrows[action]
                    location = self.locs[state]
                    plt.text(location[1], location[0], action_arrow, ha='center',va='center')
                ax.title.set_text(title[subplot])
            plt.show()
   
      
    def draw_value_grid(self, Value, title, n_columns, n_lines):
        plt.figure(figsize=(20,8)) #NEED TO CHANGE VALUES??
        #goes through all values            
        for subplot in range (len(Value)):
            #creates a subplot for each value computed  
            ax = plt.subplot(n_columns, n_lines, subplot+1)
            #creates graph of the grid
            ax.imshow(self.walls+self.rewarders+self.absorbers)
            for state, value in enumerate(Value[subplot]):
                #IF absorbing state - dont plot value 
                if(self.absorbing[0,state]):
                      continue
                #computes value location on graph
                location = self.locs[state]
                #places on the graph      
                plt.text(location[1], location[0], round(value,1), ha='center', va='center')
            ax.title.set_text(title[subplot])
        plt.show()
                      
   ##########################
                      
   ########### Internal Helper Functions #####################
               
    def paint_maps(self):
        plt.figure()
        plt.subplot(1,3,1)
        plt.imshow(self.walls)
        plt.title('Obstacles')
        plt.subplot(1,3,2)
        plt.imshow(self.absorbers)
        plt.title('Absorbing States')
        plt.subplot(1,3,3)
        plt.imshow(self.rewarders)
        plt.title('Reward States')              
        plt.show()
                      
    def build_grid_world(self):
        #get : locations of all valid states, the neighbours of each state (by state number), the absorbing states (array of zeros WITH 1 when absorbing state)
        locations, neighbours, absorbing = self.get_topology()              
        
        #get number of states 
        S = len(locations)
        
        #initialise the transition matrix 
        T = np.zeros((S,S,4))
        
        for action in range(4):
            for effect in range(4):
                #randomise the outcome when taking an action       
                outcome = (action+effect+1) % 4
                if outcome == 0:
                    outcome = 3
                else:
                    outcome -= 1
        
                #fill transition matrix
                prob = self.action_randomizing_array[effect]
                for prior_state in range(S):
                      post_state = neighbours[prior_state, outcome]
                      post_state = int(post_state)
                      T[post_state, prior_state,action] = T[post_state, prior_state,action]+prob
        
        #build reward matrix
        R = self.default_reward*np.ones((S,S,4))
        for i, sr in enumerate(self.special_rewards):
            post_state = self.loc_to_state(self.absorbing_locs[i], locations)
            R[post_state,:,:]=sr          
        return S,T,R, absorbing, locations               
                      
    def get_topology(self):
        height = self.shape[0]
        width = self.shape[1]
        
        index = 1
        locs = []
        neighbour_locs = []
        
        for i in range(height):
            for j in range(width):
                #get location of each state
                loc = (i,j)
                #append it to valid state locations (IF valid state - not absorbing)
                if(self.is_location(loc)):
                    locs.append(loc)

                    #get array with neighbours of each state
                    local_neighbours = [self.get_neighbour(loc,direction) for direction in ['nr', 'ea', 'so', 'we']]
                    neighbour_locs.append(local_neighbours)
        
        #translate neighbours lists from locations to states 
        num_states = len(locs)
        state_neighbours = np.zeros((num_states,4))
        
        for state in range(num_states):
            for direction in range(4):
                #find neighbour location 
                nloc = neighbour_locs[state][direction]
                      
                #turn location into a state number
                nstate = self.loc_to_state(nloc,locs)
                      
                #insert the state number into neighbour matrix
                state_neighbours[state,direction] = nstate;
                      
        #translate absorbing locations into absorbing state indices 
        absorbing = np.zeros((1,num_states))
        for a in self.absorbing_locs:
            absorbing_state = self.loc_to_state(a,locs)
            absorbing[0,absorbing_state]=1
        return locs, state_neighbours, absorbing
        
                      
    def loc_to_state(self,loc,locs):
        #takes list of locations and gives index that corresponds to input loc
        return locs.index(tuple(loc))
    
    def is_location(self,loc):
        #IF in the grid: valid location - not an obstacle
        if(loc[0]<0 or loc[1]<0 or loc[0]>self.shape[0]-1 or loc[1]>self.shape[1]-1):
            return False
        elif(loc in self.obstacle_locs):
            return False
        else:
            return True
    
    def get_neighbour(self,loc,direction):
        #look for valid neighbours
        i = loc[0]
        j = loc[1]
        #defines the transitions 'movements'
        nr = (i-1,j)
        ea = (i,j+1)              
        so = (i+1,j)              
        we = (i, j-1)   
        
        #IF valid location - executes direction requested - otherwise rejects it
        if (direction == 'nr' and self.is_location(nr)):
            return nr 
        elif(direction == 'ea' and self.is_location(ea)):
            return ea
        elif(direction == 'so' and self.is_location(so)):
            return so 
        elif(direction == 'we' and self.is_location(we)):
            return we 
        #IF no other alternative - return to the same location 
        else: 
            return loc
    
        ###########################################         
          
                      

In [2]:
###defines the grid
print("Representation of my Grid world:\n")
grid = GridWorld()
Policy = np.zeros((grid.state_size, grid.action_size))
Policy = Policy + 0.25

Representation of my Grid world:



NameError: name 'GridWorld' is not defined

In [4]:
#TD learning curve 

number_of_iteration = 300
number_of_episode=500
array_episodes=np.arange(0,499)
reward_array = np.zeros([number_of_iteration,number_of_episode])
for i in range(number_of_iteration):
    policy, Q, reward_4, V_track = grid.sarsa(number_of_episode,0.01,0.2,0.5)
    reward_array[i,:]=reward_4 

#observation: as we increase prob and gamma - becomes more accurate (same for numb of iteration)

#part a 
rewards_avg_0 = reward_array.mean(axis=0)

print("\nNumber of episodes vs Rewards\n")
plt.plot((array_episodes, rewards_avg),linewidth=0.1,color='r',label="average_reward")
plt.show()

rewards_avg_1 = reward_array.mean(axis=0) - reward_array.std(axis=0)
#print(rewards_avg_0) #1D 
rewards_avg_2 = reward_array.mean(axis=0) + reward_array.std(axis=0)


#part b -std 
print("\nNumber of episodes vs Rewards(-std)\n")
plt.plot((array_episodes, rewards_avg_1),linewidth=0.1,color='r',label="-std")
plt.show()

#part b +std 
print("\nNumber of episodes vs Rewards(+std)\n")
plt.plot((array_episodes, rewards_avg_2),linewidth=0.1,color='r',label="+std")
plt.show()

#array_episodes=np.arange(500)

#plt.plot(array_episodes, rewards_avg)
#plt.show()

KeyboardInterrupt: 