### Practicing Policy Evaluation, Policy Improvement, Policy Iteration and Value Iteration. 

In [34]:
# Requirements.

import numpy as np
import gymnasium as gym
import timeit


In [35]:
# Environments. 

frozen_lake_env = gym.make('FrozenLake-v1')
frozen_lake_env = frozen_lake_env.unwrapped.P

slippery_bandit_walk = {
    0: {
        0: [(1.0,0,0.0,True)],
        1: [(1.0,0,0.0,True)]
    },
    1: {
        0: [(0.8,0,0.0,True),(0.2,2,0.0,False)],
        1: [(0.8,2,0.0,False),(0.2,0,0.0,True)]
    },
    2: {
        0: [(0.8,1,0.0,False),(0.2,3,0.0,False)],
        1: [(0.8,3,0.0,False),(0.2,1,0.0,False)]
    },
    3: {
        0: [(0.8,2,0.0,False),(0.2,4,1.0,True)],
        1: [(0.8,4,1.0,True),(0.2,2,0.0,False)]
    },
    4: {
        0: [(1.0,4,0.0,True)],
        1: [(1.0,4,0.0,True)]
    }
}

In [36]:
# Policy Evaluation, an algorithm that allows us to compute the value function of a given policy. This allows us to compare policies. 

def policy_evaluation(pi, mdp, gamma=1, theta=1e-10):
    ''' Iterates over the states of an MDP and computes the value according to the policy.
        Returns the value function for the policy in the environment. '''

    state_space = len(mdp)
    state_values = np.zeros(state_space)

    while True: # Until convergence.

        current_values = np.zeros(state_space)

        # Iterate over the states and compute the value of that state. 
        for state in range(state_space):

            for transition_probability, state_prime, reward, done in mdp[state][pi[state]]: # Accessing the action that is taken in that state using policy pi. 
                current_values[state] += transition_probability * (reward + gamma * state_values[state_prime] * (not done))
        
        if np.max(np.absolute(state_values - current_values)) < theta:
            break
        
        state_values = current_values.copy()

    return current_values



In [37]:
# Policy Improvement, an algorithm that iterates over the actions in each state, computing the action-value. We then create our new policy by selecting greedily over the action-values in each state. 

def policy_constructor(Q_vals):
    ''' Will construct the optimal policy given a matrix of Q values. '''

    # enumerate returns an object of num: index values, we iterate over and construct the dict.
    return {state:action for state, action in enumerate(np.argmax(Q_vals, axis=1))} 

def policy_improvement(value_function, mdp, gamma=1):
    ''' Iterates over each state computing the action-value, returns an improved policy. '''

    state_space = len(mdp)
    action_space = len(mdp[0])
    Q_vals = np.zeros((state_space, action_space), dtype=np.float64) # States = rows, actions = columns, ensuring its an integer. 

    for state in range(state_space):

        for action in range(action_space):

            for transition_probability, state_prime, reward, done in mdp[state][action]:
                Q_vals[state][action] += transition_probability * (reward + gamma * value_function[state_prime] * (not done)) 

    improved_policy = policy_constructor(Q_vals)

    return improved_policy

In [38]:
# Policy Iteration, an algorithm that alternates between policy evaluation and policy improvement to render the best policy. 

def policy_iteration(mdp, gamma=1, theta=1e-10):
    ''' Creates the optimal policy by iterating between PE and PI, 
        returns the best policy for an env. '''

    state_space = len(mdp)
    action_space = len(mdp[0])
    value_function = None
    
    # Create an random policy.
    current_policy = {state:action for state, action in enumerate(np.random.randint(0, action_space, size=state_space))}

    while True:

        policy_to_improve = current_policy

        value_function = policy_evaluation(policy_to_improve, mdp, gamma, theta)

        improved_policy = policy_improvement(value_function, mdp, gamma)

        # Check if the new policy is equal to the old policy.
        if policy_to_improve == {state:improved_policy[state] for state in range(state_space)}: # Because dictionaries are random, we cannot just use the == operator and compare.
            break
        
        current_policy = improved_policy
        
    return improved_policy, value_function

### Policy Iteration for Slippery Bandit Walk and Frozen Lake 

In [39]:
# Slippery Bandit Walk
sbw_optimal_policy, sbw_value_function = policy_iteration(slippery_bandit_walk)
print(f"SBW Optimal Policy: {sbw_optimal_policy}")
print(f"SBW Value Function: {sbw_value_function}\n")



SBW Optimal Policy: {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}
SBW Value Function: [0.         0.75294118 0.94117647 0.98823529 0.        ]



In [40]:
# Frozen Lake
fl_optimal_policy, fl_value_function = policy_iteration(frozen_lake_env)
print(f"FL Optimal Policy: {fl_optimal_policy}")
print(f"FL Value Function: {fl_value_function}\n")


FL Optimal Policy: {0: 0, 1: 3, 2: 3, 3: 3, 4: 0, 5: 0, 6: 0, 7: 0, 8: 3, 9: 1, 10: 0, 11: 0, 12: 0, 13: 2, 14: 1, 15: 0}
FL Value Function: [0.82352941 0.82352941 0.82352941 0.82352941 0.82352941 0.
 0.52941176 0.         0.82352941 0.82352941 0.76470588 0.
 0.         0.88235294 0.94117647 0.        ]



### Value Iteration

In [41]:
def policy_constructor(Q_vals):
    ''' Will construct the optimal policy given a matrix of Q values. '''

    # Enumerate returns an object of num: index values, we iterate over and construct the dict.
    return {state:action for state, action in enumerate(np.argmax(Q_vals, axis=1))} 


def value_iteration(mdp, gamma=1, theta=1e-10):
    ''' Performs value iteration, this does not use policy evaluation or iteration. '''

    state_space = len(mdp)
    action_space = len(mdp[0])
    value_function = np.zeros(state_space)

    while True:
        
        Q_vals = np.zeros((state_space, action_space), dtype=np.float64)

        for state in range(state_space):

            for action in range(action_space):

                for transition_probability, state_prime, reward, done in mdp[state][action]:
                    Q_vals[state][action] += transition_probability * (reward + gamma * value_function[state_prime] * (not done))

        if np.max(np.absolute(value_function - np.max(Q_vals, axis=1))) < theta:
            break
    
        value_function = np.max(Q_vals, axis=1)

    policy = policy_constructor(Q_vals)

    return policy, value_function

### Value Iteration on Slippery Bandit Walk and Frozen Lake. 

In [42]:
sbw_optimal_policy, sbw_value_function = value_iteration(slippery_bandit_walk)
print(f"SBW Optimal Policy: {sbw_optimal_policy}")
print(f"SBW Value Function: {sbw_value_function}\n")

SBW Optimal Policy: {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}
SBW Value Function: [0.         0.75294118 0.94117647 0.98823529 0.        ]



In [43]:
# Frozen Lake
fl_optimal_policy, fl_value_function = policy_iteration(frozen_lake_env)
print(f"FL Optimal Policy: {fl_optimal_policy}")
print(f"FL Value Function: {fl_value_function}\n")

FL Optimal Policy: {0: 0, 1: 3, 2: 3, 3: 3, 4: 0, 5: 0, 6: 0, 7: 0, 8: 3, 9: 1, 10: 0, 11: 0, 12: 0, 13: 2, 14: 1, 15: 0}
FL Value Function: [0.82352941 0.82352941 0.82352941 0.82352941 0.82352941 0.
 0.52941176 0.         0.82352941 0.82352941 0.76470588 0.
 0.         0.88235294 0.94117647 0.        ]



### Time Comparison Between Policy Iteration and Value Iteration. 

In [48]:
%timeit policy_iteration(frozen_lake_env)
%timeit value_iteration(frozen_lake_env)

30.3 ms ± 3.15 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
51.2 ms ± 553 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
