In [14]:
import numpy as np
import gym

In [15]:
env = gym.make('FrozenLake-v1')

In [16]:
def policy_evaluation(policy, env, discount_factor, theta, max_iterations):
    V = np.zeros(env.observation_space.n)
    for i in range(max_iterations):
        delta = 0
        for s in range(env.observation_space.n):
            v = 0
            for a, action_prob in enumerate(policy[s]):
                for prob, next_state, reward, done in env.P[s][a]:
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
            delta = max(delta, np.abs(V[s] - v))
            V[s] = v
        if delta < theta:
            break
    return V

def policy_improvement(V, env, discount_factor):
    policy = np.zeros([env.observation_space.n, env.action_space.n])
    for s in range(env.observation_space.n):
        q = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for prob, next_state, reward, done in env.P[s][a]:
                q[a] += prob * (reward + discount_factor * V[next_state])
        best_action = np.argmax(q)
        policy[s, best_action] = 1.0
    return policy

def policy_iteration(policy, env, discount_factor, theta, max_iterations):
    while True:
        V = policy_evaluation(policy, env, discount_factor, theta, max_iterations)
        new_policy = policy_improvement(V, env, discount_factor)
        if (new_policy == policy).all():
            break
        policy = new_policy
    return policy, V


In [17]:
def value_iteration(env, discount_factor, theta, max_iterations):
    V = np.zeros(env.observation_space.n)
    for i in range(max_iterations):
        delta = 0
        for s in range(env.observation_space.n):
            v = V[s]
            V[s] = max([sum([prob * (reward + discount_factor * V[next_state]) 
                            for prob, next_state, reward, done in env.P[s][a]])
                        for a in range(env.action_space.n)])
            delta = max(delta, np.abs(v - V[s]))
        if delta < theta:
            break
            
    policy = np.zeros([env.observation_space.n, env.action_space.n])
    for s in range(env.observation_space.n):
        best_action_value = np.max([sum([prob * (reward + discount_factor * V[next_state]) 
                                          for prob, next_state, reward, done in env.P[s][a]])
                                    for a in range(env.action_space.n)])
        best_actions = [a for a in range(env.action_space.n) if np.isclose(sum([prob * (reward + discount_factor * V[next_state]) 
                                                                     for prob, next_state, reward, done in env.P[s][a]]),
                                                               best_action_value)]
        policy[s, best_actions] = 1.0 / len(best_actions)
        
    return policy, V


In [18]:
def simulate_episodes(policy, env, num_episodes=1000):
    wins = 0
    total_reward = 0
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
            
            if type(state) == type((1,2,4)):
                action = np.argmax(policy[state[0]])
            else:
                action = np.argmax(policy[state])
            
            state, reward, done, info = env.step(action)
            if done and reward == 1:
                wins += 1
            total_reward += reward
    average_return = total_reward / num_episodes
    return wins, average_return

num_states = env.observation_space.n
num_actions = env.action_space.n


initial_policy = np.ones([num_states, num_actions]) / num_actions
pi_policy, _ = policy_iteration(initial_policy, env, discount_factor=0.99, theta=0.0001, max_iterations=1000)
pi_wins, pi_average_return = simulate_episodes(pi_policy, env)


vi_policy, _ = value_iteration(env, discount_factor=0.99, theta=0.0001, max_iterations=1000)
vi_wins, vi_average_return = simulate_episodes(vi_policy, env)

print(f"Policy Iteration: {pi_wins} wins, Average Return: {pi_average_return}")
print(f"Value Iteration: {vi_wins} wins, Average Return: {vi_average_return}")

def print_policy(policy, env):
    direction_map = ['←', '↓', '→', '↑']
    lake_map = env.desc.astype(str)

    for row in range(env.nrow):
        for col in range(env.ncol):
            state = row * env.ncol + col
            best_action = np.argmax(policy[state])
            if lake_map[row, col] not in ['H', 'G']:
                lake_map[row, col] = direction_map[best_action]
    
    print("\n".join("".join(row) for row in lake_map))


print("Policy Iteration Policy Map:")
print_policy(pi_policy, env)

print("\nValue Iteration Policy Map:")
print_policy(vi_policy, env)


Policy Iteration: 739 wins, Average Return: 0.739
Value Iteration: 757 wins, Average Return: 0.757
Policy Iteration Policy Map:
←↑↑↑
←H←H
↑↓←H
H→↓G

Value Iteration Policy Map:
←↑↑↑
←H←H
↑↓←H
H→↓G
