In [1]:
import numpy as np
import pprint
import sys
if "../" not in sys.path:
  sys.path.append("../") 
from lib.envs.gridworld import GridworldEnv

In [2]:
pp = pprint.PrettyPrinter(indent=2)
env = GridworldEnv()

In [3]:
# Taken from Policy Evaluation Exercise!

def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    Evaluate a policy given an environment and a full description of the environment's dynamics.
    
    Args:
        policy: [S, A] shaped matrix representing the policy.
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
    
    Returns:
        Vector of length env.nS representing the value function.
    """
    # Start with a random (all 0) value function
    V = np.zeros(env.nS)
    while True:
        delta = 0
        # For each state, perform a "full backup"
        for s in range(env.nS):
            v = 0
            # Look at the possible next actions
            for a, action_prob in enumerate(policy[s]):
                # For each action, look at the possible next states...
                for  prob, next_state, reward, done in env.P[s][a]:
                    # Calculate the expected value
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        # Stop evaluating once our value function change is below a threshold
        if delta < theta:
            break
    return np.array(V)

In [4]:
def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI environment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: gamma discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """

    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        A = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                A[a] += prob * (reward + discount_factor * V[next_state])
        return A
    
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    
    while True:
        # Evaluate the current policy
        V = policy_eval_fn(policy, env, discount_factor)
        
        # Will be set to false if we make any changes to the policy
        policy_stable = True
        
        # For each state...
        for s in range(env.nS):
            # The best action we would take under the current policy
            chosen_a = np.argmax(policy[s])
            
            # Find the best action by one-step lookahead
            # Ties are resolved arbitarily
            action_values = one_step_lookahead(s, V)
            best_a = np.argmax(action_values)
            
            # Greedily update the policy
            if chosen_a != best_a:
                policy_stable = False
            policy[s] = np.eye(env.nA)[best_a]
        
        # If the policy is stable we've found an optimal policy. Return it
        if policy_stable:
            return policy, V

In [13]:
policy, v = policy_improvement(env)
print("Policy Probability Distribution:")
print(policy)
print("")

print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")

print("Value Function:")
print(v)
print("")

print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")



Policy Probability Distribution:
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]

Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):
[[0 3 3 2]
 [0 0 0 2]
 [0 0 1 2]
 [0 1 1 0]]

Value Function:
[ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1.  0.]

Reshaped Grid Value Function:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]



In [2]:
import numpy as np
import gym

# Define the Policy Iteration function
def policy_iteration(env, gamma=0.99, theta=1e-6):
    nA = env.action_space.n  # Number of actions
    nS = env.observation_space.n  # Number of states

    # Initialize the policy with a random action for each state
    policy = np.ones([nS, nA]) / nA
    V = np.zeros(nS)  # Initialize the value function with zeros

    while True:
        # Policy Evaluation: Compute the value function for the current policy
        while True:
            delta = 0
            for s in range(nS):
                v = V[s]
                new_v = 0
                for a in range(nA):
                    for prob, next_state, reward, _ in env.P[s][a]:
                        new_v += policy[s][a] * prob * (reward + gamma * V[next_state])
                V[s] = new_v
                delta = max(delta, abs(v - V[s]))
            if delta < theta:
                break

        policy_stable = True

        # Policy Improvement: Update the policy based on the current value function
        for s in range(nS):
            old_action = np.argmax(policy[s])
            action_values = [sum(prob * (reward + gamma * V[next_state]) for prob, next_state, reward, _ in env.P[s][a]) for a in range(nA)]
            best_action = np.argmax(action_values)

            # Update the policy
            new_policy = np.zeros(nA)
            new_policy[best_action] = 1
            policy[s] = new_policy

            if old_action != best_action:
                policy_stable = False

        if policy_stable:
            break

    return policy, V

# Create the Taxi-v3 environment
env = gym.make('Taxi-v3')

# Run Policy Iteration
optimal_policy, optimal_value = policy_iteration(env)

# Print the optimal policy and value function
print("Optimal Policy:")
print(optimal_policy)

print("Optimal Value Function:")
print(optimal_value)


Optimal Policy:
[[0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1. 0.]
 ...
 [0. 1. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0.]]
Optimal Value Function:
[944.72361809 864.01317574 903.55733909 873.75068256 789.53804327
 864.01317574 789.53804327 816.7669381  864.01317574 826.0272102
 903.55733909 835.3810204  807.59926872 826.0272102  807.59926872
 873.75068256 955.27638191 873.75068256 913.69428191 883.58654804
 934.27638191 854.37304398 893.5217657  864.01317574 798.52327603
 873.75068256 798.52327603 826.0272102  854.37304398 816.7669381
 893.5217657  826.0272102  816.7669381  835.3810204  816.7669381
 883.58654804 944.72361809 883.58654804 903.55733909 893.5217657
 883.58654804 807.59926872 844.82931354 816.7669381  844.82931354
 923.93361809 844.82931354 873.75068256 844.82931354 807.59926872
 883.58654804 816.7669381  826.0272102  844.82931354 826.0272102
 893.5217657  893.5217657  934.27638191 893.5217657  903.55733909
 873.75068256 798.52327603 835.3810204  8