In [1]:
import gymnasium as gym
import numpy as np
import random
import imageio
import os
import tqdm
from tqdm.notebook import tqdm

Create the FrozenLake-v1 environemnt from gymnasium: [FrozenLake-v1](https://gymnasium.farama.org/environments/toy_text/frozen_lake/)


In [2]:
# create env: lets create a non-slippery version. 
# We want to get a video so set render_mode to 'rgb_array'
env = gym.make('FrozenLake-v1', map_name = '4x4', is_slippery = False, render_mode = 'rgb_array')
# check state and action space
print("------------------------Observation space------------------------")
print(f'obeservation space: {env.observation_space}')
print(f'size of observation space: {env.observation_space.n}')
print(f'Sample observation: {env.observation_space.sample()}')

print("------------------------Action space------------------------")
print(f'action space: {env.action_space}')
print(f'number of possible actions: {env.action_space.n}')
print(f'sample action: {env.action_space.sample()}')


------------------------Observation space------------------------
obeservation space: Discrete(16)
size of observation space: 16
Sample observation: 2
------------------------Action space------------------------
action space: Discrete(4)
number of possible actions: 4
sample action: 3


![SegmentLocal](frozen_lake.gif "segment")

The goal of the agent is to reach the last grid by walking only through the frozen grid avoiding the holes. 
</br>
Action space: 
* 0: left
* 1: down
* 2: right
* 3: up

Reward:
* reach goal +1
* reach hole 0
* reach frozen 0

In [3]:


# initialize the Qtable with all zeros:
def initialize_qtable(n_states, n_actions):
    return np.zeros((n_states, n_actions))

# policy: lets use epsilon greedy algorithm for taking action and greedy algorithm for training (offline)
def greedy_policy(Qtable, state):
    return np.argmax(Qtable[state][:])

def epsilon_greedy_policy(Qtable, state, epsilon):
    # get a random number between 0-1
    rand_num = np.random.uniform(0, 1)
    # if it is greater than epsilon use greedy: exploitation
    if rand_num > epsilon:
        action = greedy_policy(Qtable, state)
    # else take random action: exploration
    else:
        action = env.action_space.sample()
    return action


In [4]:
n_states = env.observation_space.n # number of states
n_actions = env.action_space.n # number of actions

Qtable = initialize_qtable(n_states, n_actions)

Lets implement the Q-learning algorithm as below
</br>
</br>
![](Q-learning.jpg)

In [5]:
def training(num_training_episodes, learning_rate, decay_rate, gamma, max_epsilon, min_epsilon, max_steps, env, Qtable): 
    for episode in tqdm(range(num_training_episodes)):
        # update the epsilon value for each episode
        epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode) 
        # reset the environment to original state
        state, info = env.reset()
        step = 0
        terminated = False
        truncated = False
        for step in range(max_steps):
            # take action using epsilon greedy policy
            action = epsilon_greedy_policy(Qtable, state, epsilon)
            # get the next state, rewards 
            next_state, reward, terminated, truncated, info = env.step(action)
            Qtable[state, action] = Qtable[state, action] + learning_rate*(reward + gamma*np.max(Qtable[next_state, :]) - Qtable[state, action])

            # if terminated or truncated we will finish this episode
            if terminated or truncated:
                break

            state = next_state
    return Qtable

In [6]:
# training params
num_training_episodes = 10000
learning_rate = 0.7

# environment parameters
env_id = 'FrozenLake-v1' # Name of the env
max_steps = 99           # max steps for each episode
gamma = 0.95             # discount rate
eval_seed = []           # Evaluation seed of the environment.

# exploration-exploitation parameters
max_epsilon = 1.0   # exploration probability max at the beginning.
min_epsilon = 0.05  
decay_rate  = 0.0005    # exponential decay rate for exploration probability
# num_training_episodes, learning_rate, decay_rate, gamma, max_epsilon, min_epsilon, max_steps, env, Qtable
Qtable = training(num_training_episodes,learning_rate, decay_rate, gamma, max_epsilon, min_epsilon, max_steps, env, Qtable)

  0%|          | 0/10000 [00:00<?, ?it/s]

In [8]:
print(Qtable) # after training

[[0.73509189 0.77378094 0.77378094 0.73509189]
 [0.73509189 0.         0.81450625 0.77378094]
 [0.77378094 0.857375   0.77378094 0.81450625]
 [0.81450625 0.         0.77378094 0.77378094]
 [0.77378094 0.81450625 0.         0.73509189]
 [0.         0.         0.         0.        ]
 [0.         0.9025     0.         0.81450625]
 [0.         0.         0.         0.        ]
 [0.81450625 0.         0.857375   0.77378094]
 [0.81450625 0.9025     0.9025     0.        ]
 [0.857375   0.95       0.         0.857375  ]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.         0.9025     0.95       0.857375  ]
 [0.9025     0.95       1.         0.9025    ]
 [0.         0.         0.         0.        ]]


Let's evaluate the method.

In [9]:
def evaluate_agent(env, max_steps, n_eval_episodes, Qtable, seed):
    """
    Function to evaluate the agent trained using Q-learning. returns mean and std of reward
    : param env: gymnasium environment
    : param max_steps: max steps in an episode.
    : param n_eval_episodes: number of episodes to evaluate the agent
    : param Qtable: Qtable after training
    : param seed: seed for evaluation
    """
    episode_rewards = []
    for episode in tqdm(range(n_eval_episodes)):
        if seed:
            state, info = env.reset(seed=seed[episode])
        else:
            state, info = env.reset()
        step = 0
        truncated = False
        terminated = False
        total_rewards_ep = 0
        for step in range(max_steps):
            # action: greedy
            action = greedy_policy(Qtable, state)
            next_state, reward, terminated, truncated, info = env.step(action)
            total_rewards_ep += reward
            if terminated or truncated:
                break
            state = next_state
        episode_rewards.append(total_rewards_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    return mean_reward, std_reward

In [11]:
n_eval_episodes = 100
eval_seed = []

# Evaluate our Agent
mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes, Qtable, eval_seed)
print(f"Mean_reward={mean_reward:.2f} +/- {std_reward:.2f}")

  0%|          | 0/100 [00:00<?, ?it/s]

Mean_reward=1.00 +/- 0.00


In [12]:
def record_video(env, Qtable, out_directory, fps=1):
  """
  Generate a replay video of the agent
  :param env
  :param Qtable: Qtable of our agent
  :param out_directory
  :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
  """
  images = []
  terminated = False
  truncated = False
  state, info = env.reset(seed=random.randint(0,500))
  img = env.render()
  images.append(img)
  while not terminated or truncated:
    # Take the action (index) that have the maximum expected future reward given that state
    action = np.argmax(Qtable[state][:])
    state, reward, terminated, truncated, info = env.step(action) # We directly put next_state = state for recording logic
    img = env.render()
    images.append(img)
  imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)

In [15]:
abs_path = os.path.abspath('trained_model_videos')
video_path = os.path.join(abs_path, 'frozen_lake_v1.mp4')
record_video(env, Qtable, video_path, fps= 1)

Trained Model

In [16]:
from IPython.display import HTML

HTML("""
    <video alt="test" controls>
        <source src="trained_model_videos/frozen_lake_v1.mp4" type="video/mp4">
    </video>
""")