In [17]:
import gym
import numpy as np
import time
from IPython import display

In [18]:
env = gym.make('FrozenLake-v0')

In [33]:
def policy_extraction(env, v_values, gamma=0.9):
    policy = np.zeros(env.observation_space.n, dtype=np.int)

    # Compute the best action for each state in the game
    # Compute q-value for each (state-action) pair in the game
    for state in range(env.observation_space.n):
        q_values = []
        # Compute q_value for each action
        for action in range(env.action_space.n):
            q_value = 0
            for prob, next_state, reward, done in env.P[state][action]:
                q_value += prob * (reward + gamma * v_values[next_state])
            q_values.append(q_value)
        
        # Select the best action
        best_action = np.argmax(q_values)
        policy[state] = best_action
    
    return policy

In [19]:
def policy_evaluation(env, policy, gamma = 0.9, term = 1e-3):
  p_value = np.zeros(env.observation_space.n)

  while True:
    pre_p_value = np.copy(p_value)
    for state in range(env.observation_space.n):
      q_value = 0
      policy_action = policy[state]
      for prob, next_state, reward, done in env.env.P[state][policy_action]:
        q_value += prob * (reward + gamma * pre_p_value[next_state])
      p_value[state] = q_value
    if np.sum(np.fabs(pre_p_value - p_value)) <= term:
        return p_value

In [25]:
def policy_improvement(env, value, gamma = 0.9):
  policy = np.zeros(env.observation_space.n)

  for state in range(env.observation_space.n):
    q_value = np.zeros(env.action_space.n)
    for action in range(env.action_space.n):
      for prob, state_, reward, _ in env.env.P[state][action]:
        q_value[action] = sum(prob * (reward + gamma * value[state_]))
    policy[state] = np.argmax(q_value)
    
    return policy

In [31]:
def policy_iteration(env, max_iters = 1000, gamma = 0.9, term = 1e-3):
  value_function = np.zeros(env.observation_space.n)
  policy = np.zeros(env.observation_space.n, dtype=np.int)

  for i in range(max_iters):
    pre_policy = np.copy(policy)
    value_function = policy_evaluation(env, policy, gamma, term)
    policy = policy_extraction(env, value_function, gamma)
    if np.all(policy == pre_policy):
      print(f'Policy Iteration converged at {i}-th.')
      break

  return value_function, policy

In [34]:
value_function, policy = policy_iteration(env)
print(policy)

Policy Iteration converged at 5-th.
[0 3 0 3 0 0 0 0 3 1 0 0 0 2 1 0]


In [None]:
def value_iteration(env, max_iters = 1000, gamma = 0.9):
    v_values = np.zeros(env.observation_space.n)

    for i in range(max_iters):
        prev_v_values = np.copy(v_values)

        # Compute the value for state
        for state in range(env.observation_space.n):
            q_values = []
            # Compute the q-value for each action
            for action in range(env.action_space.n):
                q_value = 0
                # Loop through each possible outcome
                for prob, next_state, reward, done in env.P[state][action]:
                    q_value += prob * (reward + gamma * prev_v_values[next_state])
                
                q_values.append(q_value)
            
            # Select the best action
            best_action = np.argmax(q_values)
            v_values[state] = q_values[best_action]
        
        # Check convergence
        if np.all(np.isclose(v_values, prev_v_values)):
            print(f'Converged at {i}-th iteration.')
            break
    
    return v_values

In [None]:
value_function = value_iteration(env)
policy = policy_extraction(env, value_function, gamma=0.9)
print(policy)

Converged at 117-th iteration.
[3 2 2 2 2 2 2 2 3 3 3 3 2 2 2 1 3 3 0 0 2 3 2 1 3 3 3 1 0 0 2 1 3 3 0 0 2
 1 3 2 0 0 0 1 3 0 0 2 0 0 1 0 0 0 0 2 0 1 0 0 1 1 1 0]


In [None]:
def play(env, policy):
    state = env.reset()
    total_reward = 0
    done = False
    steps = 0
    #time.sleep(1)
    #display.clear_output(wait=True)
    while not done:
        action = policy[state]
        next_state, reward, done, info = env.step(action)
        total_reward += reward
        steps += 1
        #print(f'Step {steps}')
        #env.render()
        #time.sleep(0.2)
        #if not done:
        #    display.clear_output(wait=True)
        state = next_state

    return total_reward

In [None]:
play(env, policy)

1.0

In [29]:
def play_multiple_times(env, policy, max_episodes):
    success = 0

    for i in range(max_episodes):
        reward = play(env, policy)

        if reward > 0:
            success += 1
    
    print(f'Number of successes: {success}/{max_episodes}')

In [30]:
play_multiple_times(env, policy, 1000)

NameError: ignored