## Reinforcement Learning - Discrete State

This notebook contains the codes for reinforcement learning in a world with discrete states and discrete actions. It draws out the Gridworld, and implements policy evaluation, policy iteration and value iteration.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Define Gridworld and functions for policy evaluation, policy iteration and value iteration

class Gridworld(object):
    def __init__(self):
        ############### Attributes of the Gridworld ################
        #shape
        self.shape = (6,6)
        #obstacle locations
        self.obstacle = [(1,1),(2,3),(2,5),(3,1),(4,1),(4,2),(4,4)]
        #terminal states
        self.terminal_locs = [(1,3),(4,3)]
        #terminal rewards
        self.term_reward = [10, -100]
        #rewards for other states
        self.other_reward = -1
        #starting location for value_iteration
        self.start = (2,0)
        #actions
        self.action = ['N', 'E', 'S', 'W']
        #number of actions
        self.action_size = len(self.action)
        #randoming action results
        self.action_randomising = [0.55, 0.15, 0.15, 0.15]
        ###########################################################
        
        ############## Internal states ############################
        
        #get attributes of the world
        state_size, T, R, terminal, locs, state_neighbours = self.build_gridworld()
        #to find who the neighbours are
        self.state_neighbours = state_neighbours
        #number of valid states
        self.state_size = state_size
        #transition matrix
        self.T = T
        #reward matrix 
        self.R = R
        #terminal states
        self.terminal = terminal
        #the locations of the valid states
        self.locs = locs
        #the locations of the valid states excluding terminal states
        self.move_locs = self.get_movable_locs(self.locs)
        #the number of the starting state for random start
        self.starting_state = self.loc_to_state(self.start, locs);
        #locating the initial state
        self.initial = np.zeros((1,len(locs)))
        self.initial[0, self.starting_state] = 1
        
        #placing walls on a bitmap
        self.walls = np.zeros(self.shape);
        for ob in self.obstacle:
            self.walls[ob]=1
        
        #placing terminals on a grid for illustration
        self.terminals = np.zeros(self.shape)
        for ter in self.terminal_locs:
            self.terminals[ter] = -1
        
        #placing the reward on a grid for illustration
        self.rewards = np.zeros(self.shape)
        for i, rew in enumerate(self.terminal_locs):
            self.rewards[rew] = self.term_reward[i]
        
        #illustrating the gridworld
        self.paint_maps()
        
        #############################################################

        

    ################## Internal functions #########################
    def get_movable_locs(self, locs):   
        #return the locations that are valid and not terminal
        
        #remove the terminal states
        move_loca = locs.copy()
        for loc in move_loca:
            if loc == self.terminal_locs[0]:
                move_loca.remove(loc)
            elif loc == self.terminal_locs[1]:
                move_loca.remove(loc)
        
        #get all the valid and none terminal locations
        move_locs = []
        for loca in move_loca:
            loci = self.loc_to_state(loca, locs)
            move_locs.append(loci)
            
        return move_locs
                
    
    def build_gridworld(self):
        #get the locations of all valid states and their neighbours by state number
        #and the terminal states (array of 0's with ones in the terminal state)
        
        locations, neighbours, terminal = self.get_topology()
        #get the number of states
        S = len(locations)
        #initialise the transition matrix
        T = np.zeros((S,S,4))
        
        #randomise the outcome of taking an action
        for action in range(4):
            for effect in range(4):
                outcome=(action+effect+1)%4
                if outcome == 0:
                    outcome ==3
                else:
                    outcome == -1
                
                #fill the transition matrix
                prob = self.action_randomising[effect]
                for prior_state in range(S):
                    post_state = neighbours[prior_state, outcome]
                    post_state = int(post_state)
                    T[post_state, prior_state,action]=T[post_state, prior_state,action]+prob
                
        #build the reward matrix
        R = self.other_reward*np.ones((S,S,4))
        for i, sr in enumerate(self.term_reward):
            post_state = self.loc_to_state(self.terminal_locs[i],locations)
            R[post_state,:,:] = sr
        
        return S, T, R, terminal, locations, neighbours

    
    
    def get_topology(self):
        #obtain the locations and neighbours of every state in the Gridworld
        width = self.shape[1]
        height = self.shape[0]
        
        index = 1
        locs = []
        neighbour_locs = []
        
        for i in range(height):
            for j in range(width):
                #get the location of each state
                loc = (i,j)
                #append to the valid state locs if it is in the grid and not an obstacle
                if (self.is_loc(loc)):
                    locs.append(loc)
                    #get an array of neighbours of each state, in terms of locations
                    local_neighbours = [self.get_neighbour(loc, direction) for direction in ['nr', 'ea', 'so', 'we']]
                    neighbour_locs.append(local_neighbours)
                
        #translate neighbour lists from locations to states
        num_states = len(locs)
        state_neighbours = np.zeros((num_states,4))
        
        for state in range(num_states):
            for direction in range(4):
                #find neighbour location
                nloc = neighbour_locs[state][direction]
                #turn location into a state number
                nstate = self.loc_to_state(nloc, locs)
                #insert into neighbour matrix
                state_neighbours[state][direction]=nstate;
        
        #translates terminal locations into terminal state indices
        terminal = np.zeros((1, num_states))
        for a in self.terminal_locs:
            terminal_state = self.loc_to_state(a, locs)
            terminal[0][terminal_state]=1
        
        return locs, state_neighbours, terminal

    
    def is_loc(self, loc):
        #location is valid if it is in the grid and not an obstacle
        if (loc[0]<0 or loc[1]<0 or loc[0]>self.shape[0]-1 or loc[1]>self.shape[1]-1):
            return False
        elif (loc in self.obstacle):
            return False
        else:
            return True
    
    
    def get_neighbour(self,loc,direction):
        #find the valid neighbours i.e. in the grid and not obstacles
        i = loc[0]
        j = loc[1]
        
        nr = (i-1,j)
        ea = (i,j+1)
        so = (i+1,j)
        we = (i,j-1)
        
        #if the neighbour is a valid location, accept it, otherwise, don't move
        if (direction == 'nr' and self.is_loc(nr)):
            return nr
        elif (direction == 'ea' and self.is_loc(ea)):
            return ea
        elif (direction == 'so' and self.is_loc(so)):
            return so
        elif (direction == 'we' and self.is_loc(we)):
            return we
        else:
            return loc
            
    
    def loc_to_state(self, loc, locs):
        #takes list of locations and gives the index of the corresponding location
        return locs.index(tuple(loc))
    
    
    def paint_maps(self):
        #draw the Gridworld
        plt.figure()
        plt.subplot(1,3,1)
        plt.imshow(self.walls)
        plt.title('obstacles')
        plt.subplot(1,3,2)
        plt.imshow(self.terminals)
        plt.title('terminal states')
        plt.subplot(1,3,3)
        plt.imshow(self.rewards)
        plt.title('reward states')
        plt.show()

    
    
    def epsilon_greedy_action(self, policy, initial):
        #choose an action given an epsilon-greedy policy
        
        all_moves = policy[int(initial),:]
        a = all_moves[0]
        b = a + all_moves[1]
        c= b + all_moves[2]
        d = c + all_moves[3]
        roll1 = np.random.rand()
        if roll1<=a:
            mov = 0
        elif roll1 >a and roll1<=b:
            mov = 1
        elif roll1 >b and roll1<=c:
            mov = 2
        elif roll1 >c and roll1 <=d:
            mov = 3
        return mov

    
    def get_trace(self, policy):
        #policy should be a 29x4 matrix
        
        #get the reward matrix, which is not known to agent but needed for getting trace
        R = self.get_reward_matrix()
        
        #store the state, action and reward of each step in three separate lists
        S = []
        A = []
        Re = []
        
        start = np.random.randint(len(self.move_locs))
        S.append(start)
        
        #find the terminal state numbers
        terminating = []
        for a in self.terminal_locs:
            terminal_state = self.loc_to_state(a, self.locs)
            terminating.append(terminal_state)
        
        #keep going until reaching the terminal state
        while S[-1] != terminating[0] and S[-1] != terminating[1]:
            initial = S[-1]
            
            #generate move in nr, ea, so or we according to the policy
            move = self.epsilon_greedy_action(policy, initial)
           
            #generate the actual move after choosing the action
            w = self.action_randomising[0]
            x= w + self.action_randomising[1]
            y= x + self.action_randomising[2]
            z = y +self.action_randomising[3]
            roll2 = np.random.rand()
            if roll2<=w:
                action = move
            elif roll2>w and roll2<=x:
                action = (move+1)%4
            elif roll2>x and roll2<=y:
                action = (move+2)%4
            elif roll2>y and roll2<=z:
                action = (move+3)%4
            
            #get the final state
            final = self.state_neighbours[int(initial)][action]
            rewa = R[int(final), int(initial), int(action)]
            A.append(move)
            Re.append(rewa)
            S.append(final)
        
        return S,A,Re
            
        
        
        
        
##################################################################################

          
    ############# Get info ###########
    def get_transition_matrix(self):
        return self.T
    
    def get_reward_matrix(self):
        return self.R
    
    #################################
    

            
    ################################# Methods #######################################
    def td_learning(self, iteration, alpha, epsilon, gamma):
        #get the reward matrix
        R = self.get_reward_matrix()
        
        #initialise the state-action values and a policy
        Q = np.zeros((self.state_size, self.action_size))
        policy = np.zeros((self.state_size, self.action_size))
        
        #set arbitrary policy
        policy[:,0]=((1-epsilon)+(epsilon/self.action_size))
        policy[:,1:4] = epsilon/self.action_size
        
        #find the terminal state numbers
        terminating = []
        for a in self.terminal_locs:
            terminal_state = self.loc_to_state(a, self.locs)
            terminating.append(terminal_state)
        
        #get the total reward
        total_reward = []
        
        #get the state values for all states after each iteration, stored as a nested list
        intervalues = []
        
        #start learning
        for i in range(iteration):
            start = np.random.randint(len(self.move_locs))
            S = start
            #generate an action in nr, ea, so or we according to the eps-greedy policy
            A = self.epsilon_greedy_action(policy, S)
            rew = 0
            #take the action, observe the reward and S'
            count = -1
            while S != terminating[0] and S != terminating[1]:
                
                count += 1
                
                #generate the actual move after choosing the action and the next state
                w = self.action_randomising[0]
                x= w+self.action_randomising[1]
                y= x+ self.action_randomising[2]
                z = y +self.action_randomising[3]
                roll2 = np.random.rand()
                if roll2<=w:
                    action = A
                elif roll2>w and roll2<=x:
                    action = (A+1)%4
                elif roll2>x and roll2<=y:
                    action = (A+2)%4
                elif roll2>y and roll2<=z:
                    action = (A+3)%4
            
                #get S' and R
                S_prime = self.state_neighbours[int(S)][action]
                immed_reward = R[int(S_prime), int(S), int(action)]
                rew+= immed_reward*(gamma**count)
                
                #get A'
                A_prime = self.epsilon_greedy_action(policy, S_prime)
                Q[int(S),int(A)] = Q[int(S),int(A)]+alpha*(immed_reward+gamma*Q[int(S_prime),int(A_prime)]-Q[int(S),int(A)])
                policy[int(S)] = epsilon/self.action_size
                policy[int(S)][np.argmax(Q[int(S),:])] = ((1-epsilon)+(epsilon/self.action_size))
                S = S_prime
                A = A_prime
            
            #get the current value functions
            intervalue = np.zeros(Q.shape[0])
            for i in range(Q.shape[0]):
                intervalue[i] = np.max(Q[i,:])
            intervalues.append(intervalue)
    
            total_reward.append(rew)
        
        return Q, policy, total_reward, intervalues
    
    
    
    
    def monte_carlo(self, iteration, alpha, epsilon, gamma):
        
        #initiate Q and policy
        Q = np.zeros((self.state_size, self.action_size))
        policy = np.zeros((self.state_size, self.action_size))
        
        #set arbitrary policy
        policy[:,0]=((1-epsilon)+(epsilon/self.action_size))
        policy[:,1:4] = epsilon/self.action_size
        
        total_reward = []
        intervalues = []
        
        for i in range(iteration):
            #get trace with the policy
            S, A, Re = self.get_trace(policy)
            #drop the terminal state, for ease of analysis
            S.pop(-1)
            
            #store the already-visited states in this list,
            #so they are not counted twice in one trace
            gone = []
            
            #get reward for this episode
            total = 0
            for kk in range(len(Re)):
                total += Re[kk]*(gamma**kk)
            total_reward.append(total)
            
            for j in range(len(S)):
                if not (S[j]*100+A[j] in gone):
                    haha = S[j]*100+A[j]
                    gone.append(haha)
                    ret = 0
                    #calculate total discounted return from the occurence of S,A
                    for lol in range(len(S)-j):
                        ret += (gamma**lol)*Re[lol+j]
                    #update state-action value
                    Q[int(S[j]),int(A[j])] = Q[int(S[j]),int(A[j])] + alpha*(ret - Q[int(S[j]),int(A[j])])
            
            #get the current value functions
            intervalue = np.zeros(Q.shape[0])
            for i in range(Q.shape[0]):
                intervalue[i] = np.max(Q[i,:])
            intervalues.append(intervalue)
            
            #set policy to be epsilon greedy
            for state in range(self.state_size):
                if not state in self.terminal:
                    policy[state] = epsilon/self.action_size
                    policy[state][np.argmax(Q[state,:])] = ((1-epsilon)+(epsilon/self.action_size))
        
        return Q, policy, total_reward, intervalues

    
    
    def value_iteration(self, gamma, threshold):
        #get transition and reward matrices
        T = self.get_transition_matrix()
        R = self.get_reward_matrix()
        
        #rate is the number of iterations
        rate = 0
        delta = threshold
        V = np.zeros(self.state_size)
        
        #keep going until delta falls below the threshold
        while delta >= threshold:
            rate += 1
            delta = 0
            
            for st in range(self.state_size):
                if not (self.terminal[0, st]):
                    v = V[st]
                    #compute the Q value
                    Q = np.zeros(4)
                    for stp in range(self.state_size):
                        Q+= T[stp, st,:]*(R[stp, st, :]+gamma*V[stp])
                    
                    V[st]=np.max(Q)
                    
                    delta = max(delta, np.abs(v - V[st]))
        
        #when the while loop is finished
        opt_policy = np.zeros((self.state_size, self.action_size))
        
        for st in range(self.state_size):
            #compute Q value
            Q = np.zeros(4)
            for stp in range(self.state_size):
                Q += T[stp,st,:] * (R[stp,st, :] + gamma * V[stp])
            
            #set the optimal policy to be the action with the highest return in each state
            opt_policy[st, np.argmax(Q)]=1
        
        return V, opt_policy, rate
                
    

    def draw_deterministic_policy(self, policy):
        #draw a deterministic policy
        #the policy needs to be a np array of 29 values between 0 and 3 with
        # N->0, E->1, S->2, W->3
        plt.figure()
        
        plt.imshow(self.walls+self.rewards+self.terminals)
        for state, action in enumerate(policy):
            if(self.terminal[0,state]):
                continue
            arrows = [r"$\uparrow$",r"$\rightarrow$", r"$\downarrow$", r"$\leftarrow$"]
            action_arrow = arrows[action]
            location = self.locs[state]
            plt.text(location[1],location[0],action_arrow,ha='center', va='center')
        plt.show()
    
    
    def draw_value(self, Value):
        # Draw a policy value function
        # The value need to be a np array of 29 values 
        plt.figure()
        
        plt.imshow(self.walls+self.rewards +self.terminals) # Create the graph of the grid
        for state, value in enumerate(Value):
            if(self.terminal[0,state]): # If it is an absorbing state, don't plot any value
                continue
            location = self.locs[state] # Compute the value location on graph
            plt.text(location[1], location[0], round(value,2), ha='center', va='center') # Place it on graph
        plt.show()


In [None]:
# Plot learning

grid = Gridworld()

values, policy, iteration = grid.value_iteration(0.4, 0.00001)


optimal_Q, opt_policy, total_re, intvalues = grid.monte_carlo(2000, 0.03, 0.6, 0.4)
n=optimal_Q.shape[0]

tdQ, tdpolicy, tdreward, tdvalues = grid.td_learning(2000, 0.03, 0.6, 0.4)

RMS = []
for i in range(len(intvalues)):
    error = 0
    for j in range(n):
        error += (intvalues[i][j]-values[j])**2
    haha = (error/n)**(0.5)
    RMS.append(haha)

tdRMS = []
for i in range(len(tdvalues)):
    error = 0
    for j in range(n):
        error += (tdvalues[i][j]-values[j])**2
    haha = (error/n)**(0.5)
    tdRMS.append(haha)
    
    
plt.figure()
plt.scatter(tdRMS, tdreward, s=15)
plt.xlabel("Root mean square error")
plt.ylabel("Total reward")
plt.show()
    

In [None]:
values, policy, iteration = grid.policy_iteration(0.4, 0.0001)

grid.draw_deterministic_policy(np.array([np.argmax(policy[row,:]) for row in range(grid.state_size)]))
grid.draw_value(values)


In [None]:
# illustrate an arbitrary policy using draw_deterministic_policy()

Policy2 = np.zeros(29).astype(int)
Policy2[2] = 3
Policy2[6] = 2
Policy2[18] = 1
grid.draw_deterministic_policy(Policy2)

In [None]:
policy = np.zeros((grid.state_size, grid.action_size))
policy = policy + 0.25
print("The policy is : {}".format(policy))

val, iterations = grid.policy_evaluation(policy, 0.01, 0.4)
print("The value of that policy is : {}".format(val))
print("Convergence took place after {} iterations.".format(iterations))

In [None]:
# How learning rate affects optimisation

grid = Gridworld()

alpha = [0.05, 0.06, 0.8]

alphass = []
for alp in alpha:
    alphh = []
    for i in range(200):
        optimal_Q, opt_policy, total_re = grid.td_learning(500, alp, 0.6, 0.4)
        alphh.append(total_re)
    alphass.append(alphh)
    
plt.figure()
for i in range(len(alphass)):
    haha = alphass[i]
    rewa00 = np.array(haha)
    rewa00.reshape(200,500)
    rewa0 = np.mean(rewa00, axis=0)
    plt.plot(range(500), rewa0)
plt.xlabel("number of episodes, alpha varies")
plt.ylabel("total reward")
plt.show()

In [None]:
# How the exploration parameter affects optimisation

grid = Gridworld()

epsilon = [0, 0.25, 0.5, 0.75, 1]

epss = []
for eps in epsilon:
    epp = []
    for i in range(1000):
        optimal_Q, opt_policy, total_re = grid.td_learning(100, 0.03, eps, 0.4)
        epp.append(total_re)
    epss.append(epp)
    
plt.figure()
for i in range(len(epss)):
    eps0 = epss[i]
    rewa00 = np.array(eps0)
    rewa00.reshape(1000,100)
    rewa0 = np.mean(rewa00, axis=0)
    plt.plot(range(100), rewa0)
plt.xlabel("number of episodes, epsilon varies")
plt.ylabel("total reward")
plt.show()

In [None]:
# Plot rewards
grid = Gridworld()


rewardss = []
for i in range(2000):
    optimal_Q, opt_policy, total_re = grid.td_learning(100, 0.03, 0.6, 0.4)
    rewardss.append(total_re)

rewa = np.array(rewardss)
rewa.reshape(2000,100)
avg_re = np.mean(rewa, axis=0)
re_std = np.std(rewa, axis=0)
avg_less = avg_re - re_std
avg_more = avg_re + re_std

plt.figure()
plt.plot(range(100), avg_re)
plt.plot(range(100), avg_less)
plt.plot(range(100), avg_more)
plt.xlabel("number of episodes")
plt.ylabel("total reward")
plt.show()

In [None]:
# Draw deterministic policy
grid = Gridworld()

optimal_Q, opt_policy, total_re = grid.td_learning(2000, 0.03, 0.6, 0.4)

values = np.zeros((optimal_Q.shape[0]))
for i in range(optimal_Q.shape[0]):
    values[i] = max(optimal_Q[i,:])
grid.draw_value(values)

grid.draw_deterministic_policy(np.array([np.argmax(opt_policy[row,:]) for row in range(grid.state_size)]))

print(optimal_Q)
print(opt_policy)



In [None]:
# Draw learning curve

grid = Gridworld()

alpha = [0.05, 0.06, 0.8]

alphass = []
for alp in alpha:
    alphh = []
    for i in range(5000):
        optimal_Q, opt_policy, total_re, haha = grid.monte_carlo(100, alp, 0.6, 0.4)
        alphh.append(total_re)
    alphass.append(alphh)
    
plt.figure()
for i in range(len(alphass)):
    haha = alphass[i]
    rewa00 = np.array(haha)
    rewa00.reshape(5000,100)
    rewa0 = np.mean(rewa00, axis=0)
    plt.plot(range(100), rewa0)
plt.xlabel("number of episodes, alpha varies")
plt.ylabel("total reward")
plt.show()

In [None]:
# Draw learning curves

grid = Gridworld()

epsilon = [0, 0.25, 0.5, 0.75, 1]

epss = []
for eps in epsilon:
    epp = []
    for i in range(2000):
        optimal_Q, opt_policy, total_re, haha = grid.monte_carlo(100, 0.03, eps, 0.4)
        epp.append(total_re)
    epss.append(epp)
    
plt.figure()
for i in range(len(epss)):
    eps0 = epss[i]
    rewa00 = np.array(eps0)
    rewa00.reshape(2000,100)
    rewa0 = np.mean(rewa00, axis=0)
    plt.plot(range(100), rewa0)
plt.xlabel("number of episodes, epsilon varies")
plt.ylabel("total reward")
plt.show()

In [None]:
# Total reward

grid = Gridworld()

rewardss=[]
for i in range(2000):
    optimal_Q, opt_policy, total_re, haha = grid.monte_carlo(100, 0.03, 0.6, 0.4)
    rewardss.append(total_re)

rewa = np.array(rewardss)
rewa.reshape(2000,100)
avg_re = np.mean(rewa, axis = 0)
standev = np.std(rewa, axis = 0)
avg_more = avg_re + standev
avg_less = avg_re - standev


plt.figure()
plt.plot(range(100), avg_re)
plt.plot(range(100),avg_more)
plt.plot(range(100), avg_less)
plt.xlabel("number of episodes")
plt.ylabel("total reward")
plt.show()

In [None]:
grid = Gridworld()
optimal_Q, opt_policy, total_re, haha = grid.monte_carlo(2000, 0.03, 0.6, 0.4)

values = np.zeros((optimal_Q.shape[0]))
for i in range(optimal_Q.shape[0]):
    values[i] = max(optimal_Q[i,:])
grid.draw_value(values)

grid.draw_deterministic_policy(np.array([np.argmax(opt_policy[row,:]) for row in range(grid.state_size)]))

print(optimal_Q)
print(opt_policy)

In [None]:
grid = Gridworld()
values, policy, iteration = grid.value_iteration(0.4, 0.00001)
policy[:,[0,1,2,3]]= policy[:,[3,0,1,2]]


grid.draw_value(values)
grid.draw_deterministic_policy(np.array([np.argmax(policy[row,:]) for row in range(grid.state_size)]))
print(policy)
print(values)