In [2]:
import numpy as np
import sys
# if "../" not in sys.path:
#   sys.path.append("../") 
from lib.envs.gridworld import GridworldEnv, Action

"""
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
UPRIGHT = 4
DOWNRIGHT = 5
DOWNLEFT = 6
UPLEFT = 7
"""
UP = Action.UP.value
RIGHT = Action.RIGHT.value
DOWN = Action.DOWN.value
LEFT = Action.LEFT.value
UPRIGHT = Action.UPRIGHT.value
DOWNRIGHT = Action.DOWNRIGHT.value
DOWNLEFT = Action.DOWNLEFT.value
UPLEFT = Action.UPLEFT .value
print(UP, RIGHT, DOWN, LEFT, UPRIGHT, DOWNRIGHT, DOWNLEFT, UPLEFT)

0 1 2 3 4 5 6 7


In [3]:
env = GridworldEnv([6,6], actionmode=8) # actionmode = 4 or 8

## Policy Evaluation (Prediction Problem)

预测问题和控制问题如何联系起来？

In [4]:
def policy_evaluation(env, policy, discount_factor = 1.0, theta = 1e-4):
    """
    Value Iteration Algorithm.
    
    Args:
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        policy: numpy 2D array object
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
        
    Returns:
        A tuple (V, count) of the optimal policy and the optimal value function.
    """
    
    def qValueInState(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        Q = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                Q[a] += prob * (reward + discount_factor * V[next_state])
        return Q
    
    V = np.zeros(env.nS)
    count = 0
    while True:
        count += 1
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(env.nS):
            # Do a one-step lookahead to find the best action
            Q = qValueInState(s, V)
            # Update the value function.
            new_value = 0
            for action, action_prob in enumerate(policy[s]):
                new_value += action_prob * Q[action]
            delta = max(delta, np.abs(new_value - V[s]))
            V[s] = new_value
        # Check if we can stop
        if delta < theta:
            break  
    return V, count

In [5]:
# value evaluation given a uniform random policy
uniform_random_policy = np.ones([env.nS, env.nA]) / env.nA
V_eval, count = policy_evaluation(env, uniform_random_policy, 0.8)

print('count =',count)
print("Value Function:")
print(np.round(V_eval,3))

print("Reshaped Grid Value Function:")
print(np.round(V_eval.reshape(env.shape),3))


count = 29
Value Function:
[ 0.    -3.896 -4.619 -4.846 -4.92  -4.947 -3.896 -4.108 -4.652 -4.833
 -4.896 -4.92  -4.619 -4.652 -4.746 -4.817 -4.833 -4.846 -4.846 -4.833
 -4.817 -4.746 -4.652 -4.619 -4.92  -4.896 -4.833 -4.652 -4.108 -3.897
 -4.947 -4.92  -4.846 -4.619 -3.897  0.   ]
Reshaped Grid Value Function:
[[ 0.    -3.896 -4.619 -4.846 -4.92  -4.947]
 [-3.896 -4.108 -4.652 -4.833 -4.896 -4.92 ]
 [-4.619 -4.652 -4.746 -4.817 -4.833 -4.846]
 [-4.846 -4.833 -4.817 -4.746 -4.652 -4.619]
 [-4.92  -4.896 -4.833 -4.652 -4.108 -3.897]
 [-4.947 -4.92  -4.846 -4.619 -3.897  0.   ]]


## Policy Iteration To Find Optimal Policy (Control Problem)

In [6]:
def policy_improvement(env, start_policy, policy_eval_fn=policy_evaluation, discount_factor=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI envrionment.
        start_policy: The initial policy.
        policy_eval_fn: Policy Evaluation function.
        discount_factor: gamma discount factor.
        
    Returns:
        A tuple (policy, V, count). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """

    def qValueInState(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        Q = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                Q[a] += prob * (reward + discount_factor * V[next_state])
        return Q
    
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    
    count = 0
    while True:
        count += 1
        # Evaluate the current policy
        V, count0 = policy_eval_fn(env, policy, discount_factor)
        # count += count0
        
        # Will be set to false if we make any changes to the policy
        policy_stable = True
        
        # For each state...
        for s in range(env.nS):
            # The best action we would take under the currect policy
            chosen_a = np.argmax(policy[s])
            
            # Find the best action by one-step lookahead
            # Ties are resolved arbitarily
            Q = qValueInState(s, V)
            best_a = np.argmax(Q)
            
            # Greedily update the policy
            if chosen_a != best_a:
                policy_stable = False
            policy[s] = np.eye(env.nA)[best_a]
        
        # If the policy is stable we've found an optimal policy. Return it
        if policy_stable:
            break
    
    return policy, V, count

In [7]:
# policy iteration start by a uniform random policy
uniform_random_policy = np.ones([env.nS, env.nA]) / env.nA
policy_opt, v_opt, count = policy_improvement(env, uniform_random_policy)
print("Action Set Size:", env.nA, ', count =', count)
print("Optimal Policy Probability Distribution (row: state, column: action probability):")
print(policy_opt)

if env.nA == 4:
    print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
else:
    print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left, 4=upright, 5=downright, 6=downleft, 7=upleft):")
print(np.reshape(np.argmax(policy_opt, axis=1), env.shape))

print("Value Function:")
print(v_opt)

print("Reshaped Grid Value Function:")
print(v_opt.reshape(env.shape))

Action Set Size: 8 , count = 3
Optimal Policy Probability Distribution (row: state, column: action probability):
[[1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 

## Value Iteration To Find Optimal Policy (Control Problem)

07/08/2019 目前的版本感觉有些问题，在同一辅对角线上的策略应该是存在多个选择的。这个后续再研究一下。

07/09/2019 update: value function 在迭代计算的时候使用了每个动作达到的state，靠近边界的state和靠近中间的state肯定是有所区别的。 

In [10]:
def value_iteration(env, discount_factor = 1.0, theta = 1e-4):
    """
    Value Iteration Algorithm.
    
    Args:
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
        
    Returns:
        A tuple (policy, V) of the optimal policy and the optimal value function.
    """
    
    def qValueInState(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        Q = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                Q[a] += prob * (reward + discount_factor * V[next_state])
        return Q
    
    V = np.zeros(env.nS)
    count = 0
    while True:
        count += 1
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(env.nS):
            # Do a one-step lookahead to find the best action
            Q = qValueInState(s, V)
            best_action_value = np.max(Q)
            # Calculate delta across all states seen so far
            delta = max(delta, np.abs(best_action_value - V[s]))
            # Update the value function. Ref: Sutton book eq. 4.10. 
            V[s] = best_action_value  
        # Check if we can stop 
        if delta < theta:
            break
    
    # Create a deterministic policy using the optimal value function
    policy = np.zeros([env.nS, env.nA])
    for s in range(env.nS):
        # One step lookahead to find the best action for this state
        Q = qValueInState(s, V)
        best_action = np.argmax(Q)
        # Always take the best action
        policy[s, best_action] = 1.0
    
    return policy, V, count

In [11]:
policy_opt, v_opt, count = value_iteration(env, 1.0)
print("Action Set Size:", env.nA, ', count =', count)
print("Optimal Policy Probability Distribution (row: state, column: action probability):")
print(policy_opt)

if env.nA == 4:
    print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
else:
    print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left, 4=upright, 5=downright, 6=downleft, 7=upleft):")
print(np.reshape(np.argmax(policy_opt, axis=1), env.shape))

print("Value Function:")
print(v_opt)

print("Reshaped Grid Value Function:")
print(v_opt.reshape(env.shape))

Action Set Size: 8 , count = 6
Optimal Policy Probability Distribution (row: state, column: action probability):
[[1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 

## Act in the Environment

In [46]:
# randomly act to test the env
iter_num = 1
for i_episode in range(iter_num):
    observation = env.reset()
    print(observation)
    count = 0
    while True:
        count += 1
        env.render() # plot the current state of the env
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        print(observation, reward, done, info)
        if done:
            env.render()
            print("Episode {}/{} finished after {} timesteps".format(i_episode+1, iter_num, count))
            break

7
T  o  o  o  o  o
o  x  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
12 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
o  o  o  o  o  o
x  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
6 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
x  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
1 -1.0 False {'prob': 1.0}
T  x  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
8 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
o  o  x  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
3 -1.0 False {'prob': 1.0}
T  o  o  x  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
4 -1.0 False {'prob': 1.0}
T  o  o  o  x  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
3 -1.0 False {'prob': 1.0}
T  o  o  x  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  

In [47]:
# act using optimal policy
iter_num = 1
for i_episode in range(iter_num):
    observation = env.reset() # observation here is state ID.
    print(observation)
    count = 0
    while True:
        count += 1
        env.render() # plot the current state of the env
        action = np.argmax(policy_opt[observation])
        observation, reward, done, info = env.step(action)
        print(observation, reward, done, info)
        if done:
            env.render()
            print("Episode {}/{} finished after {} timesteps".format(i_episode+1, iter_num, count))
            break

25
T  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  x  o  o  o  o
o  o  o  o  o  T
19 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  x  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
13 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
o  o  o  o  o  o
o  x  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
7 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
o  x  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
0 -1.0 True {'prob': 1.0}
x  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  T
Episode 1/1 finished after 4 timesteps


In [12]:
# act using optimal value function generated by uniform random policy
iter_num = 1
for i_episode in range(iter_num):
    observation = env.reset() # observation here is state ID.
    print(observation)
    count = 0
    while True:
        count += 1
        env.render() # plot the current state of the env
        Q = np.zeros(env.nA)
        for a in range(env.action_space.n): # range(env.nA)
            discount_factor = 0.8
            for prob, next_state, reward, done in env.P[observation][a]:
                Q[a] += prob * (reward + discount_factor * V_eval[next_state]) # in this env, reward is given along with next_state
        action = np.argmax(Q)
        observation, reward, done, info = env.step(action)
        print(observation, reward, done, info)
        if done:
            env.render()
            print("Episode {}/{} finished after {} timesteps".format(i_episode+1, iter_num, count))
            break

27
T  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  x  o  o
o  o  o  o  o  T
34 -1.0 False {'prob': 1.0}
T  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  x  T
35 -1.0 True {'prob': 1.0}
T  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  o
o  o  o  o  o  x
Episode 1/1 finished after 2 timesteps


In [48]:
# close the env in the end
env.close()