In [None]:
import numpy as np
import gymnasium as gym
import random
import imageio
import os
import tqdm
import time
import pickle 
from tqdm.notebook import tqdm

# Environment 

In [None]:
env_id = "FrozenLake-v1"
model_name = "Q_FrozenLakeSlippery"

In [None]:
#create the Frozen Lake slippery gym environnement
env = gym.make(
    'FrozenLake-v1',
    desc=None,
    map_name="4x4",
    is_slippery=True,
    success_rate=1.0/3.0,
    reward_schedule=(1, 0, 0)
)

In [None]:
#Number of possible state
print("_____OBSERVATION SPACE_____ \n")
print("Observation Space", env.observation_space)
print("Sample observation", env.observation_space.sample())  # Get a random observation

In [None]:
#Number of possible actions 
print("\n _____ACTION SPACE_____ \n")
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample())  # Take a random action

In [None]:
state_space = env.observation_space.n
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")

# Q Learning 

### Table initialization

In [None]:
# Let's create our Qtable of size (state_space, action_space) and initialized each values at 0 using np.zeros
def initialize_q_table(state_space, action_space):
    Qtable = np.zeros((state_space, action_space))
    return Qtable

In [None]:
Qtable_frozenlake = initialize_q_table(state_space, action_space)

### Policies 

In [None]:
#Gready action selection used for evalation (Q learning is off-policy) 
def greedy_policy(Qtable, state):
    # Exploitation: take the action with the highest state, action value
    action = np.argmax(Qtable[state][:])

    return action

In [None]:
#Behavior policy used during training (allows for exploration/exploitation) 
def epsilon_greedy_policy(Qtable, state, epsilon):
    # Randomly generate a number between 0 and 1
    random_num = random.uniform(0, 1)
    # if random_num > greater than epsilon --> exploitation
    if random_num > epsilon:
        # Take the action with the highest value given a state
        # np.argmax can be useful here
        action = greedy_policy(Qtable, state)
    # else --> exploration
    else:
        action = env.action_space.sample()

    return action

### Training parameters 

In [None]:
#Hyperparameters 

# Training parameters
n_training_episodes = 300000  # Total training episodes
learning_rate = 0.6  # Learning rate

# Evaluation parameters
n_eval_episodes = 100  # Total number of test episodes

# Environment parameters
max_steps = 150  # Max steps per episode
gamma = 0.95  # Discounting rate
eval_seed = []  # The evaluation seed of the environment

# Exploration parameters
max_epsilon = 1.0  # Exploration probability at start
min_epsilon = 0.05  # Minimum exploration probability
decay_rate = 0.00005  # Exponential decay rate for exploration prob

### Training algorithm (Q learning)

In [None]:
#Training loop method 

def train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable):
  for episode in tqdm(range(n_training_episodes)):
    # Reduce epsilon (because we need less and less exploration)
    epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode)
    # Reset the environment
    state, info = env.reset()
    step = 0
    terminated = False
    truncated = False

    # repeat
    for step in range(max_steps):
      # Choose the action At using epsilon greedy policy
      action = epsilon_greedy_policy(Qtable, state, epsilon)

      # Take action At and observe Rt+1 and St+1
      # Take the action (a) and observe the outcome state(s') and reward (r)
      new_state, reward, terminated, truncated, info = env.step(action)

      # Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]    -> Q-learning update = Bellman optimality target + TD error correction.
      Qtable[state][action] = Qtable[state][action] + learning_rate * (reward + gamma * np.max(Qtable[new_state][:]) - Qtable[state][action]
            )

      # If terminated or truncated finish the episode
      if terminated or truncated:
        break

      # Our next state is the new state
      state = new_state
  return Qtable

### Train it 

In [None]:
#Train the agent

Qtable_frozenlake = train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable_frozenlake)

In [None]:
#Q table visualization 

Qtable_frozenlake

### Evaluate 

In [None]:
#Evaluation method

def evaluate_agent(env, max_steps, n_eval_episodes, Q, seed):
    """
    Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
    :param env: The evaluation environment
    :param n_eval_episodes: Number of episode to evaluate the agent
    :param Q: The Q-table
    :param seed: The evaluation seed array (for taxi-v3)
    """
    episode_rewards = []
    for episode in tqdm(range(n_eval_episodes)):
        if seed:
            state, info = env.reset(seed=seed[episode])
        else:
            state, info = env.reset()
        step = 0
        truncated = False
        terminated = False
        total_rewards_ep = 0

        for step in range(max_steps):
            # Take the action (index) that have the maximum expected future reward given that state
            action = greedy_policy(Q, state)
            new_state, reward, terminated, truncated, info = env.step(action)
            total_rewards_ep += reward

            if terminated or truncated:
                break
            state = new_state
        episode_rewards.append(total_rewards_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward

In [None]:
#Evaluation results 

mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes, Qtable_frozenlake, eval_seed)
print(f"Mean_reward={mean_reward:.8f} +/- {std_reward:.8f}")

### Watch it play 

In [None]:
#Watch it play ! 

# Make a NEW env with human rendering
play_env = gym.make(env_id, render_mode="human")  

state, info = play_env.reset()

for _ in range(200):
    # Greedy action from Q-table
    action = int(np.argmax(Qtable_frozenlake[state]))

    state, reward, terminated, truncated, info = play_env.step(action)

    time.sleep(0.25)  # slow it down so you can see it

    if terminated or truncated:
        state, info = play_env.reset()

play_env.close()


### Save a video 

In [None]:
#Record a video 

from gymnasium.wrappers import RecordVideo

# use your trained model: model_mountainCar
env_video = gym.make(env_id, map_name="4x4", is_slippery=True, render_mode="rgb_array")
env_video = RecordVideo(
    env_video,
    video_folder="videos",
    episode_trigger=lambda ep: True,
    name_prefix="q_FrozenLake")

state, info = env_video.reset()

done = False
while not done:
     action = int(np.argmax(Qtable_frozenlake[state]))
    
     state, reward, terminated, truncated, info = env_video.step(action)
    
     if terminated or truncated:
        done = True 

env_video.close()
print("Video saved, reward = ", reward)