In [1]:
import gym
from gym.envs.registration import register
from gym.envs.toy_text.frozen_lake import LEFT, RIGHT, DOWN, UP

register(
    id='Deterministic-4x4-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': False})

env = gym.make('Deterministic-4x4-FrozenLake-v0')
action_names = {0:'L', 1:'D', 2:'R', 3:'U'}

#step function

[2023-01-16 15:23:56,006] Making new env: Deterministic-4x4-FrozenLake-v0
  result = entry_point.load(False)


In [2]:
import pandas as pd
print(env.nS)
print(env.nA)
print(pd.DataFrame(env.P).transpose()) #probability, next state, reward, is terminal

16
4
                          0                        1                        2  \
0    [(1.0, 0, 0.0, False)]   [(1.0, 4, 0.0, False)]   [(1.0, 1, 0.0, False)]   
1    [(1.0, 0, 0.0, False)]    [(1.0, 5, 0.0, True)]   [(1.0, 2, 0.0, False)]   
2    [(1.0, 1, 0.0, False)]   [(1.0, 6, 0.0, False)]   [(1.0, 3, 0.0, False)]   
3    [(1.0, 2, 0.0, False)]    [(1.0, 7, 0.0, True)]   [(1.0, 3, 0.0, False)]   
4    [(1.0, 4, 0.0, False)]   [(1.0, 8, 0.0, False)]    [(1.0, 5, 0.0, True)]   
5       [(1.0, 5, 0, True)]      [(1.0, 5, 0, True)]      [(1.0, 5, 0, True)]   
6     [(1.0, 5, 0.0, True)]  [(1.0, 10, 0.0, False)]    [(1.0, 7, 0.0, True)]   
7       [(1.0, 7, 0, True)]      [(1.0, 7, 0, True)]      [(1.0, 7, 0, True)]   
8    [(1.0, 8, 0.0, False)]   [(1.0, 12, 0.0, True)]   [(1.0, 9, 0.0, False)]   
9    [(1.0, 8, 0.0, False)]  [(1.0, 13, 0.0, False)]  [(1.0, 10, 0.0, False)]   
10   [(1.0, 9, 0.0, False)]  [(1.0, 14, 0.0, False)]   [(1.0, 11, 0.0, True)]   
11     [(1.0, 11, 0, Tr

In [3]:
def print_policy(policy, action_names):
    str_policy = policy.astype('str')
    for action_num, action_name in action_names.items():
        np.place(str_policy, policy == action_num, action_name)
    return str_policy

def plot_policy(policy, length):
    grid = print_policy(policy, action_names)
    for i in range(length):
        line = ""
        for j in range(length):
            line += grid[i*length+j]
        print(line)

def run_optimal_policy(env, opp, gamma=0.9):

    initial_state = env.reset()
    env.render()

    total_reward = 0
    num_steps = 0
    s = initial_state

    while True:
        next_act = opp[s]
        nextstate, reward, is_terminal, debug_info = env.step(next_act)
        env.render()

        total_reward += pow(gamma, num_steps) * reward
        num_steps += 1

        if is_terminal:
            break

        s = nextstate

    return total_reward, num_steps

In [13]:
import numpy as np
import time

# q(s,a) = sum(p(s,s',a) * (r(s,a) + r * v(s')))
def value_iteration(env, gamma, max_iterations=int(1e3), tol=1e-3):
    
    #Initialize V(s) to arbitrary value
    V = np.zeros(env.nS)
    #Initialize policy output
    policy = np.zeros(env.nS, dtype='int')

    iteration_cnt = 0
    for i in range(max_iterations):
        delta = 0 # max delta for each iteration
        V_old = V.copy() #old value function
        for s in range(env.nS):
            max_value = None
            for a in range(env.nA):
                expectation = 0
                for prob, nextstate, reward, is_terminal in env.P[s][a]:
                    if is_terminal:
                        expectation += prob * (reward + gamma * 0)
                    else:
                        expectation += prob * (reward + gamma * V_old[nextstate])
                        
                #max_value = expectation if max_value is None else max(max_value, expectation)
                if max_value is None or max_value < expectation:
                    max_value = expectation
                    policy[s] = a

            V[s] = max_value
            delta = max(delta, abs(V_old[s] - V[s]))
        iteration_cnt += 1
        if delta < tol:
            break

    return V, policy, iteration_cnt

def cal_value_iteration(env, gamma=0.9):
    value_func, policy, iteration_cnt = value_iteration(env, gamma=gamma)
    print("Value Iternation:%d" % iteration_cnt)
    print("Show me the policy:")
    plot_policy(policy, 4)
    print("")
    run_optimal_policy(env, policy)

In [5]:
cal_value_iteration(env)

Value Iternation:7
Show me the policy:
DRDL
DLDL
RDDL
LRRL


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


In [9]:
def evaluate_policy(env, gamma, policy, value_func, max_iterations=int(1e3), tol=1e-3):
    iterations = 0
    v = value_func

    for i in range(max_iterations):
        v_old = v.copy()
        iterations += 1
        delta = 0
        # iterate through each state
        for s in range(env.nS):
            a = policy[s] #only one action per policy (compared to value iteration)
            expected_value = 0.0
            for prob, nextstate, reward, is_terminal in env.P[s][a]:
                if is_terminal == True:
                    expected_value +=  prob * (reward + gamma * 0)
                else:
                    expected_value +=  prob * (reward + gamma * v_old[nextstate])

            # update state value function
            v[s] = expected_value
            delta = max(delta, abs(v[s] - v_old[s]))

        # converage
        if (delta < tol):
            break
    return v, iterations

def improve_policy(env, gamma, value_function, policy):
    policy_stable = True
    for s in range(env.nS):
        old_action = policy[s]
        max_value = None
        # if take this action, calculate the expected reward
        for a in range(env.nA):
            expected_value = 0.0
            for prob, nextstate, reward, is_terminal in env.P[s][a]:
                if is_terminal:
                    expected_value +=  prob * (reward + gamma * 0)
                else:
                    expected_value +=  prob * (reward + gamma * value_function[nextstate])
            # Record the maximum value and corresponding action
            if max_value is None or max_value < expected_value:
                max_value = expected_value
                policy[s] = a
        if old_action != policy[s]:
            policy_stable = False
    return policy_stable, policy

def policy_iteration(env, gamma, max_iterations=int(1e3), tol=1e-3):
    
    #Initialize V(s) to arbitrary value
    value_func = np.zeros(env.nS)
    #Initialize policy output
    policy = np.zeros(env.nS, dtype='int') #random policy
    
    improve_iteration = 0
    evalue_iteration = 0
    policy_stable = False

    for i in range(max_iterations):
        value_func, e_iter = evaluate_policy(env, gamma, policy, value_func, max_iterations, tol)
        policy_stable, policy = improve_policy(env, gamma, value_func, policy)
        improve_iteration += 1
        evalue_iteration += e_iter
        if policy_stable:
            break
    return policy, value_func, improve_iteration, evalue_iteration

def cal_policy_iteration(env, gamma=0.9):
    policy, value_func, improve_iteration, evalue_iteration = policy_iteration(env, gamma)
    print("Policy Improve Iternation:%d" % improve_iteration)
    print("Policy Evaluate Iternation:%d" % evalue_iteration)
    print("Show me the policy:")
    plot_policy(policy, 4)
    print("")
    run_optimal_policy(env, policy)

In [10]:
cal_policy_iteration(env)

Policy Improve Iternation:7
Policy Evaluate Iternation:14
Show me the policy:
DRDL
DLDL
RDDL
LRRL


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


In [11]:
register(
    id='Stochastic-4x4-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': True})

env_slippery = gym.make('Stochastic-4x4-FrozenLake-v0')

[2023-01-16 22:16:36,589] Making new env: Stochastic-4x4-FrozenLake-v0
  result = entry_point.load(False)


In [14]:
cal_value_iteration(env_slippery)

Value Iternation:27
Show me the policy:
LULU
LLLL
UDLL
LRDL


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Down)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFF

In [15]:
cal_policy_iteration(env_slippery)

Policy Improve Iternation:6
Policy Evaluate Iternation:54
Show me the policy:
LULU
LLLL
UDLL
LRDL


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (

In [None]:
# policy iteration is more efficient with complex environment