In [1]:
import gymnasium as gym
import numpy as np
import time
from IPython import display
from gymnasium.envs.toy_text.frozen_lake import FrozenLakeEnv
from gymnasium.envs.toy_text.taxi import TaxiEnv
import time

In [2]:
def play(env, policy, render=False):
    state, _ = env.reset()
    total_reward = 0
    steps = 0
    done = False
    while not done:
        action = policy[state]
        next_state, reward, done, info, _ = env.step(action)
        total_reward += reward
        steps += 1
        if render:
            print(env.render())
            time.sleep(0.5)
            if not done:
                display.clear_output(wait=True)
        state = next_state

    return (total_reward, steps)

def play_multiple_times(env, policy, max_episodes):
    success = 0
    list_of_steps = []
    for i in range(max_episodes):
        total_reward, steps = play(env, policy)

        if total_reward > 0:
            success += 1
            list_of_steps.append(steps)

    print(f'Number of successes: {success}/{max_episodes}')
    print(f'Average number of steps: {np.mean(list_of_steps)}')

In [3]:
def value_iteration(env, max_iters=500, gamma=0.9):
    # initialize
    v_values = np.zeros(env.observation_space.n)

    for i in range(max_iters):
        prev_v_values = np.copy(v_values)

        # update the v-value for each state
        for state in range(env.observation_space.n):
            q_values = []

            # compute the q-value for each action that we can perform at the state
            for action in range(env.action_space.n):
                q_value = 0
                # loop through each possible outcome
                for prob, next_state, reward, done in env.P[state][action]:
                    q_value += prob * (reward + gamma * prev_v_values[next_state])

                q_values.append(q_value)

            # select the max q-values
            best_action = np.argmax(q_values)
            v_values[state] = q_values[best_action]

        # check convergence
        if np.all(np.isclose(v_values, prev_v_values)):
            print(f'Converged at {i}-th iteration.')
            break

    return v_values

In [4]:
def policy_extraction(env, v_values, gamma=0.9):
    # initialize
    policy = np.zeros(env.observation_space.n, dtype=np.int32)

    # loop through each state in the environment
    for state in range(env.observation_space.n):
        q_values = []
        # loop through each action
        for action in range(env.action_space.n):
            q_value = 0
            # loop each possible outcome
            for prob, next_state, reward, done in env.P[state][action]:
                q_value += prob * (reward + gamma * v_values[next_state])

            q_values.append(q_value)

        # select the best action
        best_action = np.argmax(q_values)
        policy[state] = best_action

    return policy

In [5]:
import gymnasium as gym
import numpy as np

def policy_evaluation(policy, env, gamma=0.9, theta=1e-8):
    V = np.zeros(env.observation_space.n)
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            v = 0
            a = policy[s]
            for prob, next_state, reward, done in env.P[s][a]:
                v += prob * (reward + gamma * V[next_state])
            delta = max(delta, abs(v - V[s]))
            V[s] = v
        if delta < theta:
            break
    return V

def policy_improvement(env, V, gamma=0.9):
    policy = np.zeros(env.observation_space.n, dtype=int)
    for s in range(env.observation_space.n):
        q_values = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for prob, next_state, reward, done in env.P[s][a]:
                q_values[a] += prob * (reward + gamma * V[next_state])
        policy[s] = np.argmax(q_values)
    return policy

def policy_iteration(env, gamma=0.9, theta=1e-8):
    policy = np.random.choice(env.action_space.n, size=(env.observation_space.n,))
    while True:
        V = policy_evaluation(policy, env, gamma, theta)
        new_policy = policy_improvement(env, V, gamma)
        if np.array_equal(policy, new_policy):
            break
        policy = new_policy
    return policy, V


In [6]:
env_names = ['FrozenLake-v1', 'FrozenLake8x8-v1', 'Taxi-v3']
max_episodes = 1000

for env_name in env_names:
    start = time.time()
    print(f"Running Value Iteration on {env_name}")

    if 'FrozenLake' in env_name:
        env_train = gym.make(env_name, is_slippery=True).unwrapped
    else:
        env_train = gym.make(env_name).unwrapped

    # Run value iteration
    v_values = value_iteration(env_train, gamma=0.9)

    # Extract policy from value function
    policy = policy_extraction(env_train, v_values, gamma=0.9)

    # Evaluate with wrapper (keep TimeLimit etc.)
    env_eval = gym.make(env_name)
    play_multiple_times(env_eval, policy, max_episodes)
    end = time.time()
    runtime = end - start
    print(f"Runtime: {runtime}")
    print("-" * 50)

Running Value Iteration on FrozenLake-v1
Converged at 79-th iteration.
Number of successes: 787/1000
Average number of steps: 45.00381194409149
Runtime: 0.22332525253295898
--------------------------------------------------
Running Value Iteration on FrozenLake8x8-v1
Converged at 117-th iteration.
Number of successes: 731/1000
Average number of steps: 74.12311901504788
Runtime: 0.4404423236846924
--------------------------------------------------
Running Value Iteration on Taxi-v3
Converged at 116-th iteration.
Number of successes: 1000/1000
Average number of steps: 13.109
Runtime: 0.3302128314971924
--------------------------------------------------


In [7]:
env_names = ['FrozenLake-v1', 'FrozenLake8x8-v1', 'Taxi-v3']

for env_name in env_names:
    start = time.time()
    print(f"Running Policy Iteration on {env_name}")
    
    if 'FrozenLake' in env_name:
        env = gym.make(env_name, is_slippery=True).unwrapped
    else:
        env = gym.make(env_name).unwrapped

    policy, V = policy_iteration(env)
    
    print("Optimal Policy:")
    if 'FrozenLake' in env_name:
        size = int(np.sqrt(env.observation_space.n))
        print(policy.reshape((size, size)))
        play_multiple_times(env, policy, 1000)
    else:
        print(policy)
        play_multiple_times(env, policy, 1000)
    end = time.time()
    runtime = end - start
    print(f"Runtime: {runtime}")
    print("-" * 50)


Running Policy Iteration on FrozenLake-v1
Optimal Policy:
[[0 3 0 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
Number of successes: 786/1000
Average number of steps: 43.157760814249365
Runtime: 0.17296195030212402
--------------------------------------------------
Running Policy Iteration on FrozenLake8x8-v1
Optimal Policy:
[[3 2 2 2 2 2 2 2]
 [3 3 3 3 2 2 2 1]
 [3 3 0 0 2 3 2 1]
 [3 3 3 1 0 0 2 1]
 [3 3 0 0 2 1 3 2]
 [0 0 0 1 3 0 0 2]
 [0 0 1 0 0 0 0 2]
 [0 1 0 0 1 1 1 0]]
Number of successes: 736/1000
Average number of steps: 74.64673913043478
Runtime: 0.28209447860717773
--------------------------------------------------
Running Policy Iteration on Taxi-v3
Optimal Policy:
[4 4 4 4 0 0 0 0 0 0 0 0 0 0 0 0 5 0 0 0 3 3 3 3 0 0 0 0 0 0 0 0 0 0 0 0 3
 0 0 0 0 0 0 0 2 2 2 2 0 0 0 0 0 0 0 0 0 2 0 0 0 0 0 0 2 2 2 2 0 0 0 0 0 0
 0 0 0 2 0 0 0 0 0 0 4 4 4 4 0 0 0 0 0 0 0 0 0 5 0 0 1 1 1 1 0 0 0 0 0 0 0
 0 0 0 0 0 1 0 0 0 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 2 2 2 2
 0 0 0 0 0 0 0 0 0 2 0 0

policy iteration nhanh hơn ở các môi trường nhỏ hoặc vừa, do đánh giá chính sách không quá tốn kém.

value Iteration nhannh hơn khi số lượng trạng thái lớn( policy evaluation của policy iterationn tốn kém khi số lượng trạng thái lớn).

FrozenLake-v1 và FrozenLake8x8-v1
tỉ lệ thành công gần như giống nhau (786/787 và 736/731).
số bước trung bình tương đương (khác biệt rất nhỏ).
Policy Iteration chạy nhanh hơn.

Nhận xét:

Với môi trường nhỏ và không quá nhiều trạng thái, Policy Iteration tận dụng được ưu điểm là ít vòng lặp hơn.
Đặc biệt với FrozenLake8x8, tốc độ của Policy Iteration nhanh gần gấp đôi Value Iteration.

Taxi-v3
Cả hai đều đạt 100% thành công và số bước tương đương.
Nhưng Policy Iteration tốn hơn gấp đôi thời gian so với Value Iteration.

Nhận xét:
Với môi trường lớn hơn (Taxi có 500 states), chi phí tính toán trong mỗi vòng Policy Evaluation tăng lên.
Value Iteration cho kết quả tốt hơn về thời gian do mỗi vòng lặp rẻ hơn, dù cần nhiều vòng hơn.