In [1]:
import numpy as np
import pprint
import sys
if "../" not in sys.path:
  sys.path.append("../") 
from lib.envs.gridworld import GridworldEnv

In [2]:
pp = pprint.PrettyPrinter(indent=2)
env = GridworldEnv()

In [17]:
# Taken from Policy Evaluation Exercise!

# def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
#     """
#     Evaluate a policy given an environment and a full description of the environment's dynamics.
    
#     Args:
#         policy: [S, A] shaped matrix representing the policy.
#         env: OpenAI env. env.P represents the transition probabilities of the environment.
#             env.P[s][a] is a (prob, next_state, reward, done) tuple.
#         theta: We stop evaluation one our value function change is less than theta for all states.
#         discount_factor: lambda discount factor.
    
#     Returns:
#         Vector of length env.nS representing the value function.
#     """
#     # Start with a random (all 0) value function
#     V = np.zeros(env.nS)
#     while True:
#         delta = 0
#         # For each state, perform a "full backup"
#         for s in range(env.nS):
#             v = 0
#             # Look at the possible next actions
#             for a, action_prob in enumerate(policy[s]):
#                 # For each action, look at the possible next states...
#                 for  prob, next_state, reward, done in env.P[s][a]:
#                     # Calculate the expected value
#                     v += action_prob * prob * (reward + discount_factor * V[next_state])
#             # How much our value function changed (across any states)
#             delta = max(delta, np.abs(v - V[s]))
#             V[s] = v
#         # Stop evaluating once our value function change is below a threshold
#         if delta < theta:
#             break
#     return np.array(V)

def policy_eval(policy, env, init_V, discount_factor=1.0, theta=0.00000001):
    """
    Evaluate a policy given an environment and a full description of the environment's dynamics.
    
    Args:
        policy: [S, A] shaped matrix representing the policy.
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
        init_V: initialze state values, the result depends it
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: gamma discount factor.
    
    Returns:
        Vector of length env.nS representing the value function.
    """
    # Start with a random (all 0) value function
    V = np.copy(init_V)
    while True:
        delta = 0.0 # 每轮都需要更细delta
        for s in range(env.nS):
            v = V[s]
            
            new_state_value = 0.0
            for a, action_prob in enumerate(policy[s]):
                action_value = 0.0
                for prob, next_state, reward, done in env.P[s][a]:
                    new_state_value += action_prob * prob * (reward + discount_factor * V[next_state])
            V[s] = new_state_value
            
            delta = max(np.abs(V[s] - v), delta)
        
        if(delta < theta):
            break
            
    return np.array(V)

In [21]:
# def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
#     """
#     Policy Improvement Algorithm. Iteratively evaluates and improves a policy
#     until an optimal policy is found.
    
#     Args:
#         env: The OpenAI envrionment.
#         policy_eval_fn: Policy Evaluation function that takes 3 arguments:
#             policy, env, discount_factor.
#         discount_factor: Lambda discount factor.
        
#     Returns:
#         A tuple (policy, V). 
#         policy is the optimal policy, a matrix of shape [S, A] where each state s
#         contains a valid probability distribution over actions.
#         V is the value function for the optimal policy.
        
#     """
#     # Start with a random policy
#     policy = np.ones([env.nS, env.nA]) / env.nA
    
#     r = 0
#     while True:
#         # Implement this!
#         V = policy_eval_fn(policy, env, discount_factor)
# #         print("Round %d state function\n" % r, V.reshape(env.shape))
# #         print("Round %d policy\n" % r, np.reshape(np.argmax(policy, axis=1), env.shape))
        
#         policy_stable = True
#         for s in range(env.nS):
#             b = np.copy(policy[s])  # 必须deep copy，否则永远相等，只有一轮就退出
            
#             all_values = []
#             for a, _ in enumerate(policy[s]):       
#                 this_reward = [prob * (reward + discount_factor * V[next_state]) \
#                                for prob, next_state, reward, done in env.P[s][a]]
#                 action_vector = np.zeros(env.nA)
#                 action_vector[a] = 1
#                 all_values.append((action_vector, this_reward))
                
#             max_action = max(all_values, key = lambda x:x[1])[0] 
#             policy[s] = max_action    
            
#             if not np.array_equal(policy[s], b):
#                 policy_stable = False
          
#         r += 1
#         if policy_stable:
#             return policy, V

        
        
def policy_improvement(env, init_V, policy_eval_fn=policy_eval, discount_factor=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI envrionment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: Lambda discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    V = np.copy(init_V)
    
    r = 0
    while True:
        # Implement this!
        V = policy_eval_fn(policy, env, V, discount_factor)
#         print("Round %d state function\n" % r, V.reshape(env.shape))
#         print("Round %d policy\n" % r, np.reshape(np.argmax(policy, axis=1), env.shape))
        
        policy_stable = True
        for s in range(env.nS):
            b = np.copy(policy[s])  # 必须deep copy，否则永远相等，只有一轮就退出
            
            all_values = []
            for a, _ in enumerate(policy[s]):       
                this_reward = [prob * (reward + discount_factor * V[next_state]) \
                               for prob, next_state, reward, done in env.P[s][a]]
                action_vector = np.zeros(env.nA)
                action_vector[a] = 1
                all_values.append((action_vector, this_reward))
                
            max_action = max(all_values, key = lambda x:x[1])[0] 
            policy[s] = max_action    
            
            if not np.array_equal(policy[s], b):
                policy_stable = False
          
        r += 1
        if policy_stable:
            return policy, V

In [22]:
policy, v = policy_improvement(env, np.zeros(env.nS))
print("Policy Probability Distribution:")
print(policy)
print("")

print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")

print("Value Function:")
print(v)
print("")

print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")



Policy Probability Distribution:
[[ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  1.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]]

Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):
[[0 3 3 2]
 [0 0 0 2]
 [0 0 1 2]
 [0 1 1 0]]

Value Function:
[ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1.  0.]

Reshaped Grid Value Function:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]



In [23]:
# Test the value function
expected_v = np.array([ 0, -1, -2, -3, -1, -2, -3, -2, -2, -3, -2, -1, -3, -2, -1,  0])
np.testing.assert_array_almost_equal(v, expected_v, decimal=1)

In [24]:
# 初始化状态不同，最终收敛的策略和值函数也是不同的
policy, v = policy_improvement(env, np.array(range(env.nS)))
print("Policy Probability Distribution:")
print(policy)
print("")

print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")

print("Value Function:")
print(v)
print("")

print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")


Policy Probability Distribution:
[[ 1.  0.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]]

Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):
[[0 1 1 2]
 [1 1 1 2]
 [1 1 1 2]
 [1 1 1 0]]

Value Function:
[ 0 10 11 12 10 11 12 13 11 12 13 14 12 13 14 15]

Reshaped Grid Value Function:
[[ 0 10 11 12]
 [10 11 12 13]
 [11 12 13 14]
 [12 13 14 15]]

