In [1]:
from pprint import pprint
import numpy as np
import gym
from gym.envs.toy_text import frozen_lake

In [17]:
ENV = gym.make('FrozenLake8x8-v0')
MAP =  ['S-------',
        '--------',
        '---H----',
        '-----H--',
        '---H----',
        '-HH---H-',
        '-H--H-H-',
        '---H---G']
frozen_lake.MAPS['8x8'] = MAP
ENV = gym.make('FrozenLake8x8-v0', is_slippery=False)

ACTION_MAPPING = { 0: '←', 1: '↓', 2: '→', 3: '↑'}

In [18]:
def evaluate_policy(gym_env,
                    policy,
                    discount_factor = 0.9,
                    eps = 1e-6,
                    max_iter = 9999) -> np.ndarray:
    
    V = np.zeros(gym_env.nS)

    for i in range(1, max_iter + 1):
        is_converged = True
        # For all state
        for state in range(gym_env.nS):
            state_value = 0
            # For all actions that can be selected by the policy under the current state
            for action, action_prob in enumerate(policy[state]):
                for state_prob, next_state, reward, terminated in gym_env.P[state][action]:
                    state_value += action_prob * state_prob * (reward + discount_factor * V[next_state])

                if abs(V[state] - state_value) > eps:
                    is_converged = False
                V[state] = state_value

        if is_converged:
            print(f'POLICY EVALUATION: V(s) converged after {i} iterations\n')
            return V

    print(f'POLICY EVALUATION: reached max number of iterations ({max_iter})\n')
    return V

In [19]:
def policy_iteration(gym_env,
                     discount_factor = 0.9,
                     max_iter = 9999) -> (np.ndarray, np.ndarray):
    
    # Init policy with equal prob for all actions
    policy = np.ones([gym_env.nS, gym_env.nA]) / gym_env.nA
    
    for i in range(1, max_iter + 1):
        is_stable = True
        #First step is evaluating current policy
        V = evaluate_policy(gym_env, policy)
        
        for state in range(gym_env.nS):
            current_action = np.argmax(policy[state])
            
            # See if can find any action that is better than current action
            Q = np.zeros(gym_env.nA)
            for action in range(gym_env.nA):
                for prob, next_state, reward, terminated in gym_env.P[state][action]:
                    Q[action] += prob * (reward + discount_factor * V[next_state])

            best_action = np.argmax(Q)

            if current_action != best_action:
                is_stable = False
            
            # Update current policy greedily
            policy[state] = np.zeros(gym_env.nA)
            policy[state][best_action] = 1.0
        
        if is_stable:
            print(f'Policy converged after {i} iterations\n')
            return policy, V
    
    print(f'Policy iteration reached max number of iterations ({max_iter})\n')
    return policy, V

In [20]:
def print_state_value_func(V):
    print(' V(s):')
    print(np.round_(V, 2).reshape(8, 8), '\n')

def print_policy(policy: np.ndarray):
    print(' POLICY: ')
    temp_policy = np.argmax(policy, axis = 1)
    temp_map = ''.join(MAP)
    string_map = list()
    for idx, action in enumerate(temp_policy):
        if temp_map[idx] == 'H':
            string_map.append('□')
        else:
            string_map.append(ACTION_MAPPING[action])
    string_map = np.array(string_map).reshape((8, 8))
    print(string_map, '\n')

In [21]:
policy, V = policy_iteration(ENV)

print_state_value_func(V)
print_policy(policy)

POLICY EVALUATION: reached max number of iterations (9999)

POLICY EVALUATION: reached max number of iterations (9999)

POLICY EVALUATION: reached max number of iterations (9999)

POLICY EVALUATION: reached max number of iterations (9999)

Policy converged after 4 iterations

 V(s):
[[0.25 0.28 0.31 0.35 0.39 0.43 0.48 0.53]
 [0.28 0.31 0.35 0.39 0.43 0.48 0.53 0.59]
 [0.31 0.35 0.39 0.   0.48 0.53 0.59 0.66]
 [0.35 0.39 0.43 0.48 0.53 0.   0.66 0.73]
 [0.31 0.35 0.39 0.   0.59 0.66 0.73 0.81]
 [0.28 0.   0.   0.59 0.66 0.73 0.   0.9 ]
 [0.31 0.   0.48 0.53 0.   0.81 0.   1.  ]
 [0.35 0.39 0.43 0.   0.81 0.9  1.   0.  ]] 

 POLICY: 
[['↓' '↓' '↓' '↓' '↓' '↓' '↓' '↓']
 ['↓' '↓' '↓' '→' '↓' '↓' '↓' '↓']
 ['↓' '↓' '↓' '□' '↓' '→' '↓' '↓']
 ['→' '→' '→' '→' '↓' '□' '↓' '↓']
 ['→' '→' '↑' '□' '↓' '↓' '→' '↓']
 ['↓' '□' '□' '→' '→' '↓' '□' '↓']
 ['↓' '□' '→' '↑' '□' '↓' '□' '↓']
 ['→' '→' '↑' '□' '→' '→' '→' '←']] 



In [22]:
def play(gym_env, policy, num_episodes):
    total_win, total_reward = 0, 0
    
    for episode in range(num_episodes):
        state = gym_env.reset()
        terminated = False
        while not terminated:
            action = np.argmax(policy[state])
            next_state, reward, terminated, info = gym_env.step(action)
            state = next_state
            total_reward += reward
            if terminated and reward == 1.0:
                total_win += 1
    avg_reward = total_reward / num_episodes
    
    print(f'NUM_EPISODES: {num_episodes}')
    print(f'   TOTAL_WIN: {total_win}')
    print(f'  AVG REWARD: {avg_reward}')

In [23]:
NUM_EPISODES = 1000

play(ENV, policy, NUM_EPISODES)

NUM_EPISODES: 1000
   TOTAL_WIN: 1000
  AVG REWARD: 1.0
