In [1]:
import random

import imageio
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from tqdm.notebook import tqdm

from implementation import SoftmaxPolicy, Qlearning, DoubleQlearning

In [2]:
# Initialise the environment
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True, render_mode="rgb_array")

# Reset the environment to generate the first observation
observation, info = env.reset(seed=42)
for _ in range(100):
    # this is where you would insert your policy
    action = env.action_space.sample()

    # step (transition) through the environment with the action
    # receiving the next observation, reward and if the episode has terminated or truncated
    observation, reward, terminated, truncated, info = env.step(action)
    env.render()

    # If the episode has ended then we can reset to start a new episode
    if terminated or truncated:
        observation, info = env.reset()

env.close()

# Making Q-learning method

### init Q-table

In [3]:
state_space = env.observation_space.n
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")

def initialize_q_table(state_space, action_space):
  Qtable = np.zeros((state_space, action_space))
  return Qtable

There are  16  possible states
There are  4  possible actions


### Create policies

In [4]:
# class Policy:
#     def __call__(Qtable, state):
#         pass

#     def update():
#         None

<h5> Greedy policy

In [5]:
# class GreedyPolicy(Policy):
#     def __init__(self):
#         super().__init__()
    
#     def __call__(self, Qtable, state):
#         return np.argmax(Qtable[state][:])

<h5> Esilon greedy policy

In [6]:
# class EpsilonGreedyPolicy(Policy):
#     def __init__(self):
#         super().__init__()
#         self.greedy = GreedyPolicy()
    
#     def __call__(self, Qtable, state):
#         random_num = np.random.binomial(1, 0.2)
#         # if random_num > greater than epsilon --> exploitation
#         if random_num:
#             # Take the action with the highest value given a state
#             # np.argmax can be useful here
#             action = self.greedy(Qtable, state)
#             # else --> exploration
#         else:
#             action = env.action_space.sample()
#         return action

<h5> Softmax policy

In [7]:
# def softmax(seq):
#   a = np.exp(seq)
#   return a / (1e-10 + a.sum())

# class SoftmaxPolicy(Policy):
#     def __init__(self):
#         super().__init__()

#     def __call__(self, Qtable, state): # take action with prob. of it's value
#         return np.random.choice([0, 1, 2, 3], p=softmax(Qtable[state][:]))
        

### Q-learning algorithm

In [8]:
# class Qlearning:
#     def __init__(self, env, train_policy):
#         self.env = env
#         self.nS = env.observation_space.n
#         self.nA = env.action_space.n
#         self.Qtable = initialize_q_table(self.nS, self.nA)
#         self.train_policy = train_policy

#     def train(self, n_training_episodes=10, max_steps=10, lr=0.7, gamma=0.99):
#         for episode in tqdm(range(n_training_episodes)):
#             # self.train_policy.step()
#             state, info = env.reset()
#             for step in range(max_steps):

#                 action = self.train_policy(self.Qtable, state)
#                 new_state, reward, terminated, truncated, info = self.env.step(action)

#                 done = terminated or truncated # if we dont want to use next ep.

#                 td_error = reward + gamma * np.max(self.Qtable[new_state]) - self.Qtable[state][action]
#                 self.Qtable[state][action] = self.Qtable[state][action] + \
#                 lr * td_error

#                 # If terminated or truncated finish the episode
#                 if terminated or truncated:
#                     break

                
#                 # Our next state is the new state
#                 state = new_state 
#         return self.Qtable


### Double Q-learning algorithm

In [9]:
# class DoubleQlearning:
#     def __init__(self, env, train_policy):
#         self.env = env
#         self.nS = env.observation_space.n
#         self.nA = env.action_space.n
#         self.Qtable = initialize_q_table(self.nS, self.nA)
#         self.train_policy = train_policy

#     def train(self, n_training_episodes=10, max_steps=10, lr=0.7, gamma=0.99):
#         Q1 = np.zeros_like(self.Qtable)
#         Q2 = np.zeros_like(self.Qtable)
        
#         for episode in tqdm(range(n_training_episodes)):
#             # self.train_policy.step()
#             state, info = env.reset()
#             for step in range(max_steps):

#                 action = self.train_policy((Q1 + Q2) / 2.0, state)
#                 new_state, reward, terminated, truncated, info = self.env.step(action)

#                 done = terminated or truncated # if we dont want to use next ep.

#                 if np.random.randint(2):
                
#                     argmax = np.argmax(Q1[state][:])
#                     td_error = reward + gamma * Q2[new_state][argmax] - Q1[state][action]
#                     Q1[state][action] = Q1[state][action] + \
#                     learning_rate * td_error
                
#                 else:
#                     argmax = np.argmax(Q2[state][:])
#                     td_error = reward + gamma * Q1[new_state][argmax] - Q2[state][action]
#                     Q2[state][action] = Q2[state][action] + \
#                     learning_rate * td_error

#                 # If terminated or truncated finish the episode
#                 if terminated or truncated:
#                     break

#                 # Our next state is the new state
#                 state = new_state

#         self.Qtable = (Q1 + Q2) / 2.0
#         return self.Qtable
    

### Dyna-Q

In [10]:
# class DynaQ:
#     def __init__(self, n_state, n_action, train_policy):
#         self.env = env
#         self.nS = self.env.observation_space.n
#         self.nA = self.env.action_space.n

#         self.Qtable = initialize_q_table(self.nS, self.nA)
#         self.train_policy = train_policy

#     def train(self, env, n_training_episodes=10, max_steps=10, lr=0.7, gamma=0.99):
        
#         T_count = np.zeros((self.nS, self.nA, self.nS), dtype=np.int)
#         R_model = np.zeros((self.nS, self.nA, self.nS), dtype=np.float64)

#         for e in tqdm(n_training_episodes):

#             state, info = env.reset()

#             for step in range(max_steps):
                
#                 # If terminated or truncated finish the episode
#                 if done:
#                     break

#                 action = self.train_policy(self.Qtable, state)
#                 new_state, reward, terminated, truncated, info = env.step(action)

#                 done = terminated or truncated # if we dont want to use next ep.

#                 T_count[state, action, new_state] += 1
#                 r_diff = reward - R_model[state, action, new_state]
#                 R_model[state, action, new_state] += (r_diff / T_count[state, action, new_state])

                
#                 td_error = reward + gamma * np.max(self.Qtable[new_state]) * (not done) - self.Qtable[state][action]
#                 self.Qtable[state][action] = self.Qtable[state][action] + \
#                 learning_rate * td_error

#                 # Our next state is the new state
#                 state = new_state

### Learning 

<h5> Learning parameters

In [3]:
# Training parameters
n_training_episodes = 30000  # Total training episodes
learning_rate = 0.7          # Learning rate

# Evaluation parameters
n_eval_episodes = 1000        # Total number of test episodes

# Environment parameters
env_id = "FrozenLake-v1"     # Name of the environment
max_steps = 99               # Max steps per episode
gamma = 0.95                 # Discounting rate
eval_seed = []               # The evaluation seed of the environment

# Exploration parameters
max_epsilon = 1.0             # Exploration probability at start
min_epsilon = 0.05            # Minimum exploration probability
decay_rate = 0.0005            # Exponential decay rate for exploration prob

In [4]:
policy = SoftmaxPolicy()
qlearning = DoubleQlearning(env, policy)
Q_frozenlake = qlearning.train(n_training_episodes, max_steps, learning_rate, gamma)

  0%|          | 0/30000 [00:00<?, ?it/s]

In [5]:
Q_frozenlake

array([[2.14313937e-03, 1.14521880e-03, 4.97506484e-03, 1.59943686e-03],
       [4.89849169e-03, 7.10473108e-03, 2.87439902e-03, 8.08939346e-03],
       [1.35333665e-02, 1.38516186e-02, 2.52904783e-02, 1.99679533e-02],
       [1.66371108e-02, 1.61953823e-03, 2.53628583e-03, 1.31565580e-02],
       [5.13494125e-02, 2.72283755e-03, 2.45282946e-03, 9.26799514e-04],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [1.48687359e-03, 1.43527816e-02, 1.07749095e-01, 1.76172022e-04],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [1.66324298e-03, 8.77174546e-02, 2.63618564e-03, 1.44721511e-01],
       [7.70830344e-02, 1.97412049e-01, 6.88534443e-02, 9.13493595e-02],
       [8.03899388e-02, 2.43185458e-01, 4.49521180e-02, 1.67535930e-02],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [6.35612718e-02, 1.96050936e-01, 3.65876053e

In [6]:
def evaluate_agent(env, max_steps, n_eval_episodes, Q, seed):
  """
  Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
  :param env: The evaluation environment
  :param max_steps: Maximum number of steps per episode
  :param n_eval_episodes: Number of episode to evaluate the agent
  :param Q: The Q-table
  :param seed: The evaluation seed array (for taxi-v3)
  """
  episode_rewards = []
  for episode in tqdm(range(n_eval_episodes)):
    if seed:
      state, info = env.reset(seed=seed[episode])
    else:
      state, info = env.reset()
    step = 0
    truncated = False
    terminated = False
    total_rewards_ep = 0

    for step in range(max_steps):
      # Take the action (index) that have the maximum expected future reward given that state
      action = np.argmax(Q[state][:])
      new_state, reward, terminated, truncated, info = env.step(action)
      total_rewards_ep += reward

      if terminated or truncated:
        break
      state = new_state
    episode_rewards.append(total_rewards_ep)
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)

  return mean_reward, std_reward

In [7]:
# Evaluate our Agent
mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes, Q_frozenlake, eval_seed)
print(f"Mean_reward={mean_reward:.2f} +/- {std_reward:.2f}")

  0%|          | 0/1000 [00:00<?, ?it/s]

Mean_reward=0.16 +/- 0.37


In [207]:
def record_video(env, Qtable, out_directory, fps=1):
  """
  Generate a replay video of the agent
  :param env
  :param Qtable: Qtable of our agent
  :param out_directory
  :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
  """
  images = []
  terminated = False
  truncated = False
  state, info = env.reset(seed=random.randint(0,500))
  img = env.render()
  images.append(img)
  while not terminated or truncated:
    # Take the action (index) that have the maximum expected future reward given that state
    action = np.argmax(Qtable[state][:])
    state, reward, terminated, truncated, info = env.step(action) # We directly put next_state = state for recording logic
    img = env.render()
    images.append(img)
  imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)

In [208]:
record_video(env, Qtable_frozenlake, 'record.mp4')

: 