In [113]:
%env PATH=$PATH:/opt/X11/bin
from pyvirtualdisplay.display import Display

virtual_display = Display(visible=False, size=(1400, 900))
virtual_display.start()

env: PATH=$PATH:/opt/X11/bin


<pyvirtualdisplay.display.Display at 0x11b5043d0>

In [114]:
import numpy as np
import gymnasium as gym
import random
import imageio
import os

import pickle
from tqdm import tqdm

In [115]:
env = gym.make('FrozenLake-v1', map_name='8x8',
               is_slippery=False, render_mode='rgb_array')
print(f'observation space: {env.observation_space}')
print(f'sample observation: {env.observation_space.sample()}')
print(f'action space shape: {env.action_space}')
print(f'action space sample: {env.action_space.sample()}')

observation space: Discrete(64)
sample observation: 3
action space shape: Discrete(4)
action space sample: 3


In [117]:
# Initialize Q-table.
def initialize_q_table(state_space, action_space):
    return np.zeros((state_space, action_space))


q_table = initialize_q_table(env.observation_space.n, env.action_space.n)
q_table

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],


In [149]:
# Define policies.
def argmax(arr):
    arr_max = np.max(arr)
    return np.random.choice(np.where(arr == arr_max)[0])


def greedy_policy(q_table, state):
    return argmax(q_table[state][:])


def epsilon_greedy_policy(env: gym.Env, q_table: np.ndarray, state: int,
                          epsilon: float):
    random_num = random.uniform(0, 1)
    # Exploit.
    if random_num > epsilon:
        return greedy_policy(q_table, state)
    # Explore.
    else:
        return env.action_space.sample()

In [188]:
# Training params.
n_training_episodes = 1_000_000
learning_rate = 0.1
n_eval_episodes = 100
env_id = 'FrozenLake-v1'
max_steps = 99
gamma = 0.95
eval_seed = []
max_epsilon = 1.0
min_epsilon = 0.001
decay_rate = (2 * max_epsilon) / n_training_episodes

In [189]:
def train(env: gym.Env, q_table: np.ndarray, n_training_episodes: int,
          max_steps: int, learning_rate: float, min_epsilon: float,
          max_epsilon: float, decay_rate: float, gamma: float):
    _q_table = q_table.copy()
    for episode in tqdm(range(n_training_episodes)):
        # Reduce epsilon progressively.
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * \
            np.exp(-decay_rate * episode)
        state, _ = env.reset()
        terminated = False
        truncated = False
        for _ in range(max_steps):
            # Choose action At using epsilon greedy policy.
            action = epsilon_greedy_policy(env, _q_table, state, epsilon)
            new_state, reward, terminated, truncated, _ = env.step(action)
            _q_table[state][action] += learning_rate * \
                (float(reward) + gamma *
                 np.max(_q_table[state]) - _q_table[state][action])
            if terminated or truncated:
                break
            state = new_state
    return _q_table

In [190]:
trained_q_table = train(env, q_table, n_training_episodes, max_steps,
                        learning_rate, min_epsilon, max_epsilon, decay_rate,
                        gamma)

100%|██████████| 1000000/1000000 [05:26<00:00, 3064.02it/s]


In [195]:
trained_q_table

array([[ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.

In [196]:
def evaluate_agent(env, max_steps, n_eval_episodes, q_table, seed):
    episode_rewards = []
    for episode in tqdm(range(n_eval_episodes)):
        state, info = env.reset(seed=seed[episode] if seed else None)
        step = 0
        truncated = False
        terminated = False
        total_reward_ep = 0
        for step in range(max_steps):
            action = greedy_policy(q_table, state)
            new_state, reward, terminated, truncated, _ = env.step(action)
            total_reward_ep += reward
            if terminated or truncated:
                break
            state = new_state
        episode_rewards.append(total_reward_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    return mean_reward, std_reward

In [197]:
mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes,
                                         trained_q_table, eval_seed)
print(f'mean_reward={mean_reward}; std={std_reward}')

100%|██████████| 100/100 [00:00<00:00, 2025.52it/s]

mean_reward=0.0; std=0.0





In [198]:
def record_video(env: gym.Env, q_table: np.ndarray, out_dir, fps=1):
    images = []
    terminated = False
    truncated = False
    state, _ = env.reset(seed=random.randint(0, 500))
    img = env.render()
    images.append(img)
    while True:
        action = np.argmax(q_table[state][:])
        state, reward, terminated, truncated, _ = env.step(action)
        img = env.render()
        images.append(img)
        if terminated or truncated:
            break
    imageio.mimsave(out_dir, [np.array(img) for img in images], fps=fps)

In [199]:
record_video(env, trained_q_table, './out.mp4')