In [21]:
pip install gym[toy_text]

Defaulting to user installation because normal site-packages is not writeable
Collecting pygame==2.1.0
  Downloading pygame-2.1.0-cp39-cp39-win_amd64.whl (4.8 MB)
     ---------------------------------------- 4.8/4.8 MB 5.8 MB/s eta 0:00:00
Installing collected packages: pygame
Successfully installed pygame-2.1.0
Note: you may need to restart the kernel to use updated packages.


In [1]:
from __future__ import print_function
from __future__ import division
import numpy as np
import gym
from gym import wrappers
from gym.wrappers import OrderEnforcing
import time
import scipy

In [2]:
env=gym.make('FrozenLake8x8-v1', render_mode='human')

# env = gym.make('FrozenLake8x8-v1',render_mode="rgb_array")
# env = gym.make('FrozenLake8x8-v1', desc=None, map_name="4x4", is_slippery=True)
env.reset()
env.render()

In [12]:
# print(env.nA)
print(env.action_space)
print(env.env.nS)
print(env.env.P[0][1])

Discrete(4)


AttributeError: 'FrozenLakeEnv' object has no attribute 'nS'

In [5]:
 for episode in range(1):
        state = env.reset() 
        for time_step in range(1000): 
            action = env.action_space.sample()
            env.step(action)
            env.render()
            state, reward, done, _, x = env.step(action)
            if done:
                break 


# Value Iteration
Value iteration computes the optimal state value function by iteratively improving the estimate of V(s). The algorithm initialize V(s) to arbitrary random values. It repeatedly updates the Q(s, a) and V(s) values until they converges. Value iteration is guaranteed to converge to the optimal values.

In [11]:

def run_episode(env, policy, gamma, render = False):
    """ Evaluates policy by using it to run an episode and finding its
    total reward.
    args:
    env: gym environment.
    policy: the policy to be used.
    gamma: discount factor.
    render: boolean to turn rendering on/off.
    returns:
    total reward: real value of the total reward recieved by agent under policy.
    """
    obs = env.reset()
    total_reward = 0
    step_idx = 0
    while True:
        if render:
            env.render()
        obs, reward, done , _ = env.step(int(policy[obs]))
        total_reward += (gamma ** step_idx * reward)
        step_idx += 1
        if done:
            break
    return total_reward


def evaluate_policy(env, policy, gamma,  n = 100):
    """ Evaluates a policy by running it n times.
    returns:
    average total reward
    """
    scores = [
            run_episode(env, policy, gamma=gamma, render = False)
            for _ in range(n)]
    return np.mean(scores)

def extract_policy(v, gamma):
    """ Extract the policy given a value-function """
    policy = np.zeros(env.nS)
    for s in range(env.nS):
        q_sa = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for next_sr in env.P[s][a]:
                # next_sr is a tuple of (probability, next state, reward, done)
                p, s_, r, _ = next_sr
                q_sa[a] += (p * (r + gamma * v[s_]))
        policy[s] = np.argmax(q_sa)
    return policy


def value_iteration(env, gamma, epsilon=1e-20, max_iterations=100000):
    """
    This function implements value iteration algorithm for the infinite
    horizon discounted MDPs. If the sup norm of v_k - v_{k-1} is less than
    epsilon or number of iterations reaches max_iterations, it should return
    the value function.
    """
    start = time.time()
    v = np.zeros(env.nS)  # initialize value-function
    ########################### Your Code Here ###########################
    # Hint: see implementation of extract_policy
    
    for i in range(max_iterations):
        prev_v = np.copy(v)
        for s in range(env.nS):
            q_sa = [sum([p*(r + prev_v[s_]) for p, s_, r, _ in env.P[s][a]]) for a in range(env.nA)] 
            v[s] = max(q_sa)
        if (np.sum(np.fabs(prev_v - v)) <= epsilon):
            print ('Value-iteration converged at iteration# %d.' %(i+1))
            break
    
                    
    ########################### End of your code #########################
    end = time.time()
    print("Value iteration took {0} seconds.".format(end - start))
    return v

if __name__ == '__main__':
    np.random.seed(1111)
    env_name  = 'FrozenLake8x8-v1'
    for gamma in [.9, .95, .99, .9999, 1]:
        print("-"*10, "Gamma={0}".format(gamma) ,"-"*10)
        env = gym.make(env_name)
        optimal_v = value_iteration(env, gamma);
        policy = extract_policy(optimal_v, gamma)
        policy_score = evaluate_policy(env, policy, gamma, n=1000)
        print('Average score = ', policy_score)

---------- Gamma=0.9 ----------


AttributeError: 'FrozenLakeEnv' object has no attribute 'nS'

# Policy Iteration
policy-iteration instead of repeated improving the value-function estimate, it will re-define the policy at each step and compute the value according to this new policy until the policy converges. Policy iteration is also guaranteed to converge to the optimal policy and it often takes less iterations to converge than the value-iteration algorithm.

In [7]:
def compute_policy_v(env, policy, gamma):
    """ Iteratively evaluate the value-function under policy.
    Alternatively, we could formulate a set of linear equations in iterms of v[s] 
    and solve them to find the value function.
    """
    v = np.zeros(env.nS)
    eps = 1e-10
    while True:
        prev_v = np.copy(v)
        for s in range(env.nS):
            policy_a = policy[s]
            v[s] = sum([p * (r + gamma * prev_v[s_]) for p, s_, r, _ in env.P[s][policy_a]])
        if (np.sum((np.fabs(prev_v - v))) <= eps):
            # value converged
            break
    return v

def policy_iteration(env, gamma, max_iterations=100000):
    """
    This function implements policy iteration algorithm.
    """
    start = time.time()
    policy = np.random.choice(env.nA, size=(env.nS))  # initialize a random policy
    ########################### Your Code Here ###########################
    for i in range(max_iterations):
        old_policy_v = compute_policy_v(env, policy, gamma)
        new_policy = extract_policy(old_policy_v, gamma)
        if (np.all(policy == new_policy)):
            end = time.time()
            print("Policy iteration took {0} seconds.".format(end - start))
            print ('Policy-Iteration converged at step %d.' %(i+1))
            break
        policy = new_policy
    return policy
    
    
    ########################### End of your code #########################
    end = time.time()
    print("Policy iteration took {0} seconds.".format(end - start))
    return policy


if __name__ == '__main__':
    np.random.seed(1111)
    env_name  = 'FrozenLake8x8-v0'
    for gamma in [.9, .95, .99, .9999, 1]:
        print("-"*10, "Gamma={0}".format(gamma) ,"-"*10)
        env = gym.make(env_name)
        optimal_policy = policy_iteration(env, gamma=gamma)
        scores = evaluate_policy(env, optimal_policy, gamma=gamma)
        print('Average scores = ', np.mean(scores))

---------- Gamma=0.9 ----------
Policy iteration took 0.15405559539794922 seconds.
Policy-Iteration converged at step 5.
Average scores =  0.00473197770117812
---------- Gamma=0.95 ----------
Policy iteration took 0.15418267250061035 seconds.
Policy-Iteration converged at step 3.
Average scores =  0.05249329649060406
---------- Gamma=0.99 ----------
Policy iteration took 0.8403522968292236 seconds.
Policy-Iteration converged at step 8.
Average scores =  0.38577824957168594
---------- Gamma=0.9999 ----------
Policy iteration took 2.7064201831817627 seconds.
Policy-Iteration converged at step 12.
Average scores =  0.9009111682246035
---------- Gamma=1 ----------
Policy iteration took 1.6615610122680664 seconds.
Policy-Iteration converged at step 6.
Average scores =  0.92
