## Setup a Virtual Display
To generate a replay video of agent and environment.

In [None]:
%%capture
!pip install pyglet==1.5.1
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip3 install pyvirtualdisplay

# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

## Install dependencies

In [None]:
%%capture
!pip install gym==0.24
!pip install pygame
!pip install numpy

!pip install imageio imageio_ffmpeg

## Import the packages

In [None]:
import numpy as np
import gym
import random
import imageio
from tqdm.notebook import trange

## Frozen Lake

In [None]:
# Create the FrozenLake-v1 environment using 4x4 map and non-slippery version
env = gym.make("FrozenLake-v1",map_name="4x4",is_slippery=False)

### Understanding the FrozenLake environment

In [None]:
print("_____OBSERVATION SPACE_____ \n")
print("Observation Space", env.observation_space)
print("Sample observation", env.observation_space.sample()) # Get a random observation

In [None]:
print("\n _____ACTION SPACE_____ \n")
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample()) # Take a random action

In [None]:
map_description = [[str(cell, 'utf-8') for cell in row] for row in env.desc]

print("Enviroment map:")
for row in map_description:
    print("".join(row))

## Create and Initialize the Q-table

In [None]:
state_space = env.observation_space.n
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")

In [None]:
def initialize_q_table(state_space, action_space):
  Qtable = np.zeros((state_space, action_space))
  return Qtable

In [None]:
Qtable_frozenlake = initialize_q_table(state_space, action_space)

## Define the epsilon-greedy policy

In [None]:
def epsilon_greedy_policy(Qtable, state, epsilon):
  random_int = random.uniform(0,1)
  if random_int > epsilon:
    action = np.argmax(Qtable[state])
  else:
    action = env.action_space.sample()

  return action

## Define the greedy policy

In [None]:
def greedy_policy(Qtable, state):
  action = np.argmax(Qtable[state])

  return action

## Define the hyperparameters

In [None]:
n_training_episodes = 10000
learning_rate = 0.7

n_eval_episodes = 100

env_id = "FrozenLake-v1"
max_steps = 99
gamma = 0.95
eval_seed = []

max_epsilon = 1.0
min_epsilon = 0.05
decay_rate = 0.0005

## Training the model

In [None]:
def train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable):
  for episode in trange(n_training_episodes):
    epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode)
    state = env.reset()
    step = 0
    done = False

    for step in range(max_steps):
      action = epsilon_greedy_policy(Qtable, state, epsilon)

      new_state, reward, done, info = env.step(action)

      Qtable[state][action] = Qtable[state][action] + learning_rate * (reward + gamma * np.max(Qtable[new_state]) - Qtable[state][action])

      if done:
        break

      state = new_state
  return Qtable

In [None]:
Qtable_frozenlake = train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable_frozenlake)

## Tranined Q-Learning table

In [None]:
Qtable_frozenlake

## Model evaluation

In [None]:
def evaluate_agent(env, max_steps, n_eval_episodes, Q, seed):
  """
  Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
  :param env: The evaluation environment
  :param n_eval_episodes: Number of episode to evaluate the agent
  :param Q: The Q-table
  :param seed: The evaluation seed array (for taxi-v3)
  """
  episode_rewards = []
  for episode in range(n_eval_episodes):
    if seed:
      state = env.reset(seed=seed[episode])
    else:
      state = env.reset()
    step = 0
    done = False
    total_rewards_ep = 0

    for step in range(max_steps):

      action = np.argmax(Q[state][:])
      new_state, reward, done, info = env.step(action)
      total_rewards_ep += reward

      if done:
        break
      state = new_state
    episode_rewards.append(total_rewards_ep)
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)

  return mean_reward, std_reward

In [None]:
mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes, Qtable_frozenlake, eval_seed)
print(f"Mean_reward={mean_reward:.2f} +/- {std_reward:.2f}")

## Visualizing the results

In [None]:
def record_video(env, Qtable, out_directory, fps=1):
  images = []
  done = False
  state = env.reset(seed=random.randint(0,500))
  img = env.render(mode='rgb_array')
  images.append(img)
  while not done:
    action = np.argmax(Qtable[state][:])
    state, reward, done, info = env.step(action)
    img = env.render(mode='rgb_array')
    images.append(img)
  imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)

Saving animated file as gif with 1 frame per second

In [None]:
video_path="/content/replay.gif"
video_fps=1

In [None]:
record_video(env, Qtable_frozenlake, video_path, video_fps)

In [None]:
from IPython.display import Image
Image('./replay.gif')