In [1]:
import numpy as np

In [2]:
# Action dictionaries: each action represents a movement, and each has its own id
action_dict = {
    0: np.array([0,1]), #R
    1: np.array([0,-1]), #L
    2: np.array([-1,0]), #U
    3: np.array([1,0]), #D
}

action_dict_desc = {
    0: '>', #R
    1: '<', #L
    2: '^', #U
    3: 'v', #D
}

In [3]:
# Policy visualization: Puts arrows correpsonding to actions given by policy pi
def describe_policy(pi):
    dpi = [[' ' for _ in range(pi.shape[1])] for _ in range(pi.shape[0])]
    for i in range(pi.shape[0]):
        for j in range(pi.shape[1]):
            dpi[i][j] = action_dict_desc[pi[i,j]]
            
    return np.array(dpi)

### Environment specification

In [4]:
# Grid dimensions
H, W = 5, 5

# Gamma: Between 0 and 1.
#   0 - myopic agent
#   1 - farsighted agent
gamma = 0.9

# Special states
A = np.array((0, 1))
A_prime = np.array((4, 1))

B = np.array((0, 3))
B_prime = np.array((2, 3))

In [5]:
# Reward: Given a position and an action, return the new position and reward
def reward(pos, action):
    new_pos = pos+action
    
    if new_pos[0] < 0 or new_pos[0] >= H or new_pos[1] < 0 or new_pos[1] >= W:
        return pos, -1
    if np.all(pos == A):
        return A_prime, 10
    if np.all(pos==B):
        return B_prime, 5
    if np.all(pos==B_prime):
        return pos, 0
    if np.all(pos==A_prime):
        return pos, 0
    
    return new_pos, 0

### Value iteration

In [6]:
# Value iteration: Performs updates on state values until optimal values v_star are obtained
def value_iteration(V, pi):
    theta = 0.01
    while(True):
        delta = 0
        for i in range(V.shape[0]):
            for j in range(V.shape[1]):
                s = np.array([i,j])
                
                old_v = V[*s]
                action_vals = []
                for action_id in action_dict:
                    action = action_dict[action_id]
                    s_prime, r = reward(s, action)
                    
                    action_vals.append(r + gamma*V[*s_prime])
                    
                V[*s] = max(action_vals)
                delta = max(delta, abs(old_v - V[*s]))
                
        if delta < theta:
            return V

In [7]:
# Given a value function, for each state returns the action that maximizes the expected return
def greedy_policy(V):
    pi = np.zeros_like(V)
    
    for i in range(V.shape[0]):
        for j in range(V.shape[1]):
            s = np.array([i,j])
            
            action_vals = np.zeros(len(action_dict))
            for action_id in action_dict:
                action = action_dict[action_id]
                s_prime, r = reward(s, action)
                
                action_vals[action_id] = r + gamma*V[*s_prime]
                
            pi[*s] = np.argmax(action_vals)
            
    return pi

In [8]:
V = np.zeros((H,W))
pi = np.random.randint(0, 4, (H,W))

V_star = value_iteration(V, pi)
pi_star = greedy_policy(V_star)

pi_star_desc = describe_policy(pi_star)

print(pi_star_desc)

[['>' '>' '<' '>' 'v']
 ['>' '^' '<' '<' '<']
 ['>' '^' '<' '>' '^']
 ['>' '^' '<' '<' '<']
 ['^' '>' '^' '<' '<']]


### Policy iteration

In [9]:
# Policy evaluation: Given an initial value function and a policy, iterate the values until they converge
# to the values corresponding to the policy
def policy_evaluation(V, pi):
    delta_max=0.01
    while(True):
        delta=0
        for i in range(V.shape[0]):
            for j in range(V.shape[1]):
                val = V[i,j]
                state = np.array([i,j])
                action = action_dict[pi[i,j]]
                new_state, r = reward(state, action)
                V[i,j] = r + gamma*V[new_state[0], new_state[1]]
                
                delta = max(delta, np.abs(val - V[i,j]))

        if delta < delta_max:
            break
    
    return V 


In [10]:
# Given a policy pi and its value function V, imporve the policy by taking the greedy action wrt V
def policy_improvement(pi, V):
    new_pi = pi.copy()
    for i in range(V.shape[0]):
        for j in range(V.shape[1]):
            s = np.array([i,j])
            a = pi[s[0],s[1]]
            
            max_val = -float('inf')
            for action_id in [0,1,2,3]:
                action = action_dict[action_id]
                new_pos, r = reward(s, action)
                if r + gamma*V[new_pos[0],new_pos[1]] > max_val:
                    max_val = r + gamma*V[new_pos[0],new_pos[1]]
                    new_pi[i,j] = action_id
                    
    return new_pi

In [11]:
# Policy iteration: Combines the policy evaluation and policy imporvement steps, and iterates them until
# the optimal policy is found
def policy_iteration(pi, V):
    while True:
        V = policy_evaluation(V, pi)
        new_pi = policy_improvement(pi, V)
        
        if np.all(new_pi == pi):
            break
        else:
            pi = new_pi
            
    return pi

In [12]:
pi_star = policy_iteration(pi, V)
pi_star_desc = describe_policy(pi_star)

print(pi_star_desc)

[['>' '>' '<' '>' '<']
 ['>' '^' '<' '^' '<']
 ['>' '^' '<' '>' '^']
 ['>' '^' '<' '<' '^']
 ['^' '>' '^' '<' '^']]
