MDP Value iteration and Policy iteration implementation for the Frozen Lake environment from OpenAI Gym

In [2]:
import numpy as np
import gym
import time
from lake_envs import *

In [3]:
np.set_printoptions(precision=3)

"""
For policy_evaluation, policy_improvement, policy_iteration and value_iteration,
the parameters P, nS, nA, gamma are defined as follows:

    P: nested dictionary
        From gym.core.Environment
        For each pair of states in [1, nS] and actions in [1, nA], P[state][action] is a
        tuple of the form (probability, nextstate, reward, terminal) where
            - probability: float
                the probability of transitioning from "state" to "nextstate" with "action"
            - nextstate: int
                denotes the state we transition to (in range [0, nS - 1])
            - reward: int
                either 0 or 1, the reward for transitioning from "state" to
                "nextstate" with "action"
            - terminal: bool
              True when "nextstate" is a terminal state (hole or goal), False otherwise
    nS: int
        number of states in the environment
    nA: int
        number of actions in the environment
    gamma: float
        Discount factor. Number in range [0, 1)
"""

'\nFor policy_evaluation, policy_improvement, policy_iteration and value_iteration,\nthe parameters P, nS, nA, gamma are defined as follows:\n\n    P: nested dictionary\n        From gym.core.Environment\n        For each pair of states in [1, nS] and actions in [1, nA], P[state][action] is a\n        tuple of the form (probability, nextstate, reward, terminal) where\n            - probability: float\n                the probability of transitioning from "state" to "nextstate" with "action"\n            - nextstate: int\n                denotes the state we transition to (in range [0, nS - 1])\n            - reward: int\n                either 0 or 1, the reward for transitioning from "state" to\n                "nextstate" with "action"\n            - terminal: bool\n              True when "nextstate" is a terminal state (hole or goal), False otherwise\n    nS: int\n        number of states in the environment\n    nA: int\n        number of actions in the environment\n    gamma: floa

In [4]:
def policy_evaluation(P, nS, nA, policy, gamma=0.9, tol=1e-3):
    """Evaluate the value function from a given policy.

    Parameters
    ----------
    P, nS, nA, gamma:
        defined at beginning of file
    policy: np.array[nS]
        The policy to evaluate. Maps states to actions.
    tol: float
        Terminate policy evaluation when
            max |value_function(s) - prev_value_function(s)| < tol
    Returns
    -------
    value_function: np.ndarray[nS]
        The value function of the given policy, where value_function[s] is
        the value of state s
    """

    value_function = np.zeros(nS)

    ############################
    # YOUR IMPLEMENTATION HERE #
    prev_value_function=1e7*np.ones(nS)
    
    while np.max(np.abs(prev_value_function-value_function)) > tol : 
        prev_value_function=np.copy(value_function)
        
        for st in range(nS):
            ret=0
            #val func for state st
            for  prob,nxt_st,reward,_ in P[st][policy[st]]:
                ret +=prob*(reward+ gamma*prev_value_function[nxt_st])
            value_function[st]=ret
    ############################
    return value_function

In [5]:
def policy_improvement(P, nS, nA, value_from_policy, policy, gamma=0.9):
    """Given the value function from policy improve the policy.

    Parameters
    ----------
    P, nS, nA, gamma:
        defined at beginning of file
    value_from_policy: np.ndarray
        The value calculated from the policy
    policy: np.array
        The previous policy.

    Returns
    -------
    new_policy: np.ndarray[nS]
        An array of integers. Each integer is the optimal action to take
        in that state according to the environment dynamics and the
        given value function.
    """

    new_policy = np.zeros(nS, dtype='int')
    ############################
    # YOUR IMPLEMENTATION HERE #

    for s in range(nS):
        exp_rew=[]
        for ac in range(nA):
            ret=0
            #val func for state s
            for prob,nxt_st,reward,_ in P[s][ac]:
                ret+=(prob*(reward+gamma*value_from_policy[nxt_st]))
            exp_rew.append(ret)
        new_policy[s]=np.argmax(exp_rew)
    
    ############################
    return new_policy

In [6]:
def policy_iteration(P, nS, nA, gamma=0.9, tol=1e-3):
    """Runs policy iteration.

    You should call the policy_evaluation() and policy_improvement() methods to
    implement this method.

    Parameters
    ----------
    P, nS, nA, gamma:
    defined at beginning of file
    tol: float
        tol parameter used in policy_evaluation()
    Returns:
    ----------
    value_function: np.ndarray[nS]
    policy: np.ndarray[nS]
    """

    value_function = np.zeros(nS)
    policy = np.zeros(nS, dtype=int)

    ############################
    # YOUR IMPLEMENTATION HERE #
    i=0
    while True :
        i+=1
        val_from_policy = policy_evaluation(P, nS, nA, policy, gamma, tol)
        new_pol=policy_improvement(P, nS, nA, val_from_policy, policy, gamma)
        
        if np.sum((new_pol-policy))==0:
            break
        else:
            policy=new_pol
    value_function=val_from_policy
    print(f'Converged in {i} ierations')
    ############################
    return value_function, policy

In [7]:
def value_iteration(P, nS, nA, gamma=0.9, tol=1e-3):
    """
    Learn value function and policy by using value iteration method for a given
    gamma and environment.

    Parameters:
    ----------
    P, nS, nA, gamma:
        defined at beginning of file
    tol: float
        Terminate value iteration when
            max |value_function(s) - prev_value_function(s)| < tol
    Returns:
    ----------
    value_function: np.ndarray[nS]
    policy: np.ndarray[nS]
    """

    value_function = np.zeros(nS)
    policy = np.zeros(nS, dtype=int)
    ############################
    # YOUR IMPLEMENTATION HERE #

    prev_value_function=1e7*np.ones(nS)
    i=0
    while np.max(np.abs(prev_value_function-value_function)) > tol : 
        i+=1
        prev_value_function=np.copy(value_function)
        for s in range(nS):
            exp_val=[]
            
            for ac in range(nA):
                ret=0
                #val func for state s
                for prob,nxt_st,reward,_ in P[s][ac]:
                    ret+=(prob*(reward+gamma*prev_value_function[nxt_st]))
                exp_val.append(ret)
            value_function[s]=np.max(exp_val)
    print(f'Converged in {i} iterations')
     
    opt_val=value_function
    
    for s in range(nS):
        exp_rew=[]
        
        for ac in range(nA):
            ret=0
            for prob,nxt_st,reward,_ in P[s][ac]:
                ret+=(prob*(reward+gamma*opt_val[nxt_st]))
            
            exp_rew.append(ret)
        policy[s]=np.argmax(exp_rew)
    
    
    ############################
    return value_function, policy

In [8]:
def render_single(env, policy, max_steps=100):
    """
    This function does not need to be modified
    Renders policy once on environment. Watch your agent play!

    Parameters
    ----------
    env: gym.core.Environment
      Environment to play on. Must have nS, nA, and P as
      attributes.
    Policy: np.array of shape [env.nS]
      The action to take at a given state
    """

    episode_reward = 0
    ob = env.reset()
    for t in range(max_steps):
        env.render()
        time.sleep(0.25)
        a = policy[ob]
        ob, rew, done, _ = env.step(a)
        episode_reward += rew
        if done:
            break
    env.render();
    if not done:
        print("The agent didn't reach a terminal state in {} steps.".format(max_steps))
    else:
        print("Episode reward: %f" % episode_reward)




In [17]:
# Edit below to run policy and value iteration on different environments and
# visualize the resulting policies in action!
# You may change the parameters in the functions below
if __name__ == "__main__":

    # comment/uncomment these lines to switch between deterministic(4x4/8x8)/stochastic environments
    #env = gym.make("Deterministic-4x4-FrozenLake-v0")
    #env = gym.make("Deterministic-8x8-FrozenLake-v0")
    
    env = gym.make("Stochastic-4x4-FrozenLake-v0")

    print("\n" + "-"*25 + "\nBeginning Policy Iteration\n" + "-"*25)

    V_pi, p_pi = policy_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
    print(f'For policy iteration \nOptimal Value function : {V_pi} \nOptimal policy : {p_pi} \n ')
    render_single(env, p_pi, 100)

    print("\n" + "-"*25 + "\nBeginning Value Iteration\n" + "-"*25)

    V_vi, p_vi = value_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
    print(f'For value iteration \nOptimal Value function : {V_vi} \nOptimal policy : {p_vi} \n')      
    render_single(env, p_vi, 100)




-------------------------
Beginning Policy Iteration
-------------------------
Converged in 6 ierations
For policy iteration 
Optimal Value function : [0.062 0.056 0.07  0.051 0.086 0.    0.11  0.    0.141 0.244 0.297 0.
 0.    0.377 0.638 0.   ] 
Optimal policy : [0 3 0 3 0 0 0 0 3 1 0 0 0 2 1 0] 
 

[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Down)
SFFF
FHFH
FFFH
HFF[41mG[0m
Episode reward: 1.000000

-------------------------
Beginning Value Iteration
-------------------------
Converged in 27 iterations
For v