In [18]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import random
import imageio
import os
from tqdm.notebook import tqdm
import cv2

In [19]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode="rgb_array")

In [20]:
observation_size, action_size = env.observation_space.n, env.action_space.n

In [21]:
print("*********************************")
print("******* Observation Space *******")
print("*********************************")
print("Observation Space:", env.observation_space)
print("Sample observation:", env.observation_space.sample())

*********************************
******* Observation Space *******
*********************************
Observation Space: Discrete(16)
Sample observation: 15


In [22]:
print("*********************************")
print("********* Action Space **********")
print("*********************************")
print("Action Space:", env.observation_space)
print("Sample action:", env.action_space.sample())

*********************************
********* Action Space **********
*********************************
Action Space: Discrete(16)
Sample action: 1


In [23]:
def greedy_policy(Q, state):
    return np.argmax(Q[state])

In [24]:
def epsilon_greedy_policy(Q, state, epsilon):
    if random.uniform(0, 1) <= epsilon:
        action = env.action_space.sample()
    else:
        action = greedy_policy(Q, state)
    return action

In [25]:
# Training parameters

n_training_episodes = 1000  # Total training episodes
alpha = 0.7  # Learning rate

# Evaluation parameters
n_eval_episodes = 100  # Total number of test episodes

# Environment parameters
env_id = "FrozenLake-v1"  # Name of the environment
max_steps = 99  # Max steps per episode
gamma = 0.95  # Discounting rate
eval_seed = []  # The evaluation seed of the environment

# Exploration parameters
max_epsilon = 1.0  # Exploration probability at start
min_epsilon = 0.05  # Minimum exploration probability
decay_rate = 0.0005  # Exponential decay rate for exploration prob

In [26]:
def train_Q_Learning_Agent(env, n_training_episodes, max_epsilon, min_epsilon, alpha, max_steps, gamma, decay_rate):    
    
    Q = np.zeros((env.observation_space.n, env.action_space.n))
    return_per_episode = []

    for episode in range(n_training_episodes):
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode)
        state, _ = env.reset()
        cum_reward = 0

        for step in range(max_steps):
            action = epsilon_greedy_policy(Q, state, epsilon)
            next_state, reward, done, truncated, _ = env.step(action)
            Q[state, action] = Q[state, action] + alpha * (reward + gamma * np.max(Q[next_state]) - Q[state, action])
            cum_reward += reward

            if done or truncated:
                break
            state = next_state
        return_per_episode.append(cum_reward)
        
    return Q, return_per_episode

In [27]:
Q, return_per_episode = train_Q_Learning_Agent(env, n_training_episodes, max_epsilon, min_epsilon, alpha, max_steps, gamma, decay_rate)

In [28]:
Q

array([[0.73509189, 0.77378094, 0.77378094, 0.73509189],
       [0.73509189, 0.        , 0.81450625, 0.77378094],
       [0.77378094, 0.857375  , 0.77378094, 0.81450625],
       [0.81450625, 0.        , 0.77378094, 0.77378094],
       [0.77378094, 0.81450625, 0.        , 0.73509189],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.9025    , 0.        , 0.81450625],
       [0.        , 0.        , 0.        , 0.        ],
       [0.81450625, 0.        , 0.857375  , 0.77378094],
       [0.81450625, 0.9025    , 0.9025    , 0.        ],
       [0.857375  , 0.95      , 0.        , 0.857375  ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.90249986, 0.95      , 0.857375  ],
       [0.9025    , 0.95      , 1.        , 0.9025    ],
       [0.        , 0.        , 0.        , 0.        ]])

In [29]:
def record_video(env, Q, out_directory):
    """
    Generate a replay video of the agent
    :param env
    :param Qtable: Qtable of our agent
    :param out_directory
    :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
    """
    images = []
    done = False
    truncated = False
    state, info = env.reset(seed=random.randint(0, 500))
    img = env.render()
    cv2.imwrite(f"{out_directory}/img_init.JPG", img)
    i=0
    while not done or truncated:
        # Take the action (index) that have the maximum expected future reward given that state
        action = np.argmax(Q[state])
        state, reward, done, truncated, info = env.step(
            action
        )  # We directly put next_state = state for recording logic
        img = env.render()
        cv2.imwrite(f"{out_directory}/img{i}.JPG", img)
        i += 1

In [30]:
record_video(env, Q, "QL_Images")

In [31]:
import glob
from PIL import Image

def make_gif(frame_folder):
    frames = [Image.open(image) for image in glob.glob(f"{frame_folder}/*.JPG")]
    frame_one = frames[0]
    frame_one.save("Q_Agent.gif", format="GIF", append_images=frames,
               save_all=True, duration=750, loop=0)

In [32]:
make_gif("QL_Images")