In [24]:
import numpy as np
import pprint
import sys
if "../" not in sys.path:
    sys.path.append("../") 
from lib.envs.gridworld import GridworldEnv

In [25]:
env = GridworldEnv()

In [26]:
env.render()

T  o  o  o
o  o  o  o
x  o  o  o
o  o  o  T


In [27]:
def policy_eval(policy, env, discount_factor=1.0, theta=0.000001):
    
    V = np.zeros(env.nS)
    
    while True:
        delta = 0
        for s in range(env.nS):
            v = 0
            for a, action_prob in enumerate(policy[s]):
                for prob, next_s, reward, is_done in env.P[s][a]:
                    v += action_prob*prob*(reward + discount_factor*V[next_s])
            delta = max(delta, abs(V[s]-v))
            V[s]=v
        if delta <=theta:
            break
    return V    

In [31]:
def policy_iter(env, policy_eval_fn=policy_eval, discount_factor=1.0):
    
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI environment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: gamma discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """
    
    # start off with a random policy
    # evaluate the policy
    # change policy to act greedily wrt evaluated policy
    
    def one_step_lookahead(s, v):
        # Calculate action values for all actions in a given state
        A = np.zeros((env.nA))
        for a, action_prob in enumerate(policy[s]):
            for prob, next_s, reward, is_done in env.P[s][a]:
                A[a] += prob*(reward+discount_factor*v[next_s])
        return A
    
    policy = np.ones((env.nS, env.nA))/env.nA
    while True:
        policy_stable = True
        
        v = policy_eval(policy, env, discount_factor=discount_factor)
        
        for s in range(env.nS):
            # In order to act greedily wrt to the above value function we need to calculate the action-value function
            chosen_a = np.argmax(policy[s])
            a = one_step_lookahead(s, v)
            best_a = np.argmax(a)
            
            # Check whether the policy has reached optimal strategy or not
            if chosen_a != best_a:
                policy_stable = False
            policy[s] = np.eye(env.nA)[best_a]
        
        if policy_stable:
            break
    return policy
            
    

In [32]:
optimal_policy = policy_iter(env)

In [34]:
env.get_action_meanings()

{0: 'UP', 1: 'RIGHT', 2: 'DOWN', 3: 'LEFT'}

In [37]:
np.reshape([env.get_action_meanings()[np.argmax(x)] for x in optimal_policy],(4,4))

array([['UP', 'LEFT', 'LEFT', 'DOWN'],
       ['UP', 'UP', 'UP', 'DOWN'],
       ['UP', 'UP', 'RIGHT', 'DOWN'],
       ['UP', 'RIGHT', 'RIGHT', 'UP']], dtype='<U5')

In [20]:
optimal_value_fn = policy_eval(optimal_policy, env)

In [22]:
optimal_value_fn.reshape(4,4)

array([[ 0., -1., -2., -3.],
       [-1., -2., -3., -2.],
       [-2., -3., -2., -1.],
       [-3., -2., -1.,  0.]])

In [23]:
expected_v = np.array([ 0, -1, -2, -3, -1, -2, -3, -2, -2, -3, -2, -1, -3, -2, -1,  0])
np.testing.assert_array_almost_equal(optimal_value_fn, expected_v, decimal=2)