# Frozen lake

In [1]:
import numpy as np
import gymnasium as gym
import random
import time
import imageio
import os

In [2]:
#desc=["SFFF", "FHFH", "FFFH", "HFFG"]
#env = gym.make('FrozenLake-v1', desc=desc, is_slippery=False)

desc8x8 = ["SFFFFFFF", "FFFFFFFF", "FFFHFFFF", "FFFFFHFF", "FFFHFFFF", "FHHFFFHF", "FHFFHFHF", "FFFHFFFG"]
env = gym.make('FrozenLake8x8-v1', desc=desc8x8, is_slippery=True)

In [3]:
# We create our environment with gym.make("<name_of_the_environment>")- `is_slippery=False`: The agent always moves in the intended direction due to the non-slippery nature of the frozen lake (deterministic).
print("_____OBSERVATION SPACE_____ \n")
print("Observation Space", env.observation_space)
print("Sample observation", env.observation_space.sample())  # Get a random observation

_____OBSERVATION SPACE_____ 

Observation Space Discrete(64)
Sample observation 36


In [4]:
print("\n _____ACTION SPACE_____ \n")
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample())  # Take a random action


 _____ACTION SPACE_____ 

Action Space Shape 4
Action Space Sample 3


In [5]:
state_space = env.observation_space.n
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")

There are  64  possible states
There are  4  possible actions


In [6]:
# Let's create our Qtable of size (state_space, action_space) and initialized each values at 0 using np.zeros
def initialize_q_table(state_space, action_space):
    Qtable = np.zeros((state_space, action_space))
    return Qtable

Qtable_frozenlake = initialize_q_table(state_space, action_space)
Qtable_frozenlake

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],


In [7]:
def greedy_policy(Qtable, state):
    # Exploitation: take the action with the highest state, action value
    action = np.argmax(Qtable[state][:])

    return action

def epsilon_greedy_policy(Qtable, state, epsilon):
    # Randomly generate a number between 0 and 1
    random_num = random.uniform(0, 1)
    # if random_num > greater than epsilon --> exploitation
    if random_num > epsilon:
        # Take the action with the highest value given a state
        # np.argmax can be useful here
        action = greedy_policy(Qtable, state)
    # else --> exploration
    else:
        action = env.action_space.sample()

    return action

In [8]:
# N table for Thompson sampling
def initialize_N_table(state_space, action_space):
    N = np.zeros((state_space, action_space))
    return N

def modify_N_table(N, state, action):
    N[state][action] += 1

N = initialize_N_table(state_space, action_space)
    
# alpha table for Thompson sampling
def initialize_alpha_table(state_space, action_space):
    alpha = np.ones((state_space, action_space))
    return alpha

def modify_alpha_table(alpha, state, action):
    alpha[state][action] += 1

alpha = initialize_alpha_table(state_space, action_space)

# Thompson sampling action selection
def thompson_sampling(Qtable, state, N, alpha):
    # Generate a random number from a beta distribution
    beta_dist = np.random.beta(Qtable[state][:] + 1, N[state][:] - Qtable[state][:] + 1)
    # Take the action with the highest beta value
    action = np.argmax(beta_dist)

    modify_N_table(N, state, action)
    modify_alpha_table(alpha, state, action)
    
    return action

In [9]:
# Training parameters
n_training_episodes = 10000  # Total training episodes
learning_rate = 0.2  # Learning rate

# Evaluation parameters
n_eval_episodes = 3  # Total number of test episodes

# Environment parameters
max_steps = 500  # Max steps per episode
gamma = 0.9  # Discounting rate
eval_seed = []  # The evaluation seed of the environment

# Exploration parameters
max_epsilon = 1.0  # Exploration probability at start
min_epsilon = 0.05  # Minimum exploration probability
decay_rate = 0.0005  # Exponential decay rate for exploration prob

In [10]:
def train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable):

    for episode in range(n_training_episodes):
        # Reduce epsilon (because we need less and less exploration)
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode)
        # Reset the environment
        state, info = env.reset()
        step = 0
        terminated = False
        truncated = False

        # repeat
        for step in range(max_steps):
            # Choose the action At using epsilon greedy policy
            #action = epsilon_greedy_policy(Qtable, state, epsilon)

            action = thompson_sampling(Qtable, state, N, alpha)


            # Take action At and observe Rt+1 and St+1
            # Take the action (a) and observe the outcome state(s') and reward (r)
            new_state, reward, terminated, truncated, info = env.step(action)

            # Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
            Qtable[state][action] = Qtable[state][action] + learning_rate * (
                reward + gamma * np.max(Qtable[new_state]) - Qtable[state][action]
            )

            # If terminated or truncated finish the episode
            if terminated or truncated:
                break

            # Our next state is the new state
            state = new_state
    return Qtable

In [13]:
Qtable_frozenlake = train(n_training_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable_frozenlake)

#print(np.sum(Qtable_frozenlake))
Qtable_frozenlake

array([[2.27528587e-02, 2.55020090e-02, 2.56161744e-02, 2.66497073e-02],
       [2.65855901e-02, 2.77777243e-02, 2.95655385e-02, 3.22339669e-02],
       [3.59115534e-02, 4.15010252e-02, 4.12807873e-02, 3.86924475e-02],
       [4.69635096e-02, 5.39584566e-02, 5.08802960e-02, 4.53132707e-02],
       [5.87556688e-02, 6.55437945e-02, 7.04444620e-02, 6.67076802e-02],
       [7.68404248e-02, 7.30497686e-02, 8.74224605e-02, 8.17001334e-02],
       [1.01498646e-01, 9.45561576e-02, 1.17180608e-01, 9.55253658e-02],
       [1.09765092e-01, 1.26419020e-01, 1.23216301e-01, 1.06154819e-01],
       [2.30630939e-02, 2.19420277e-02, 2.26169521e-02, 2.39321147e-02],
       [2.53337696e-02, 2.86268747e-02, 2.72982432e-02, 2.74487470e-02],
       [3.38046791e-02, 3.32011519e-02, 3.95412642e-02, 4.02986748e-02],
       [2.90823199e-02, 1.98525558e-02, 4.03653058e-02, 4.89356642e-02],
       [5.88350842e-02, 6.09059360e-02, 6.79233749e-02, 7.53403364e-02],
       [8.69428741e-02, 9.49633790e-02, 9.38920603e

In [14]:
# 8x8 Frozen Lake doesn't always learn anything, so need to re-train possibly multiple times.

def evaluate_agent(env, max_steps, n_eval_episodes, Q, seed):
    """
    Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
    :param env: The evaluation environment
    :param n_eval_episodes: Number of episode to evaluate the agent
    :param Q: The Q-table
    :param seed: The evaluation seed array (for taxi-v3)
    """
    episode_rewards = []
    for episode in range(n_eval_episodes):
        if seed:
            state, info = env.reset(seed=seed[episode])
        else:
            state, info = env.reset()
        step = 0
        truncated = False
        terminated = False
        total_rewards_ep = 0

        for step in range(max_steps):

            # Take the action (index) that have the maximum expected future reward given that state
            action = greedy_policy(Q, state)
            new_state, reward, terminated, truncated, info = env.step(action)
            total_rewards_ep += reward

            if terminated or truncated:
                break
            
            state = new_state
        episode_rewards.append(total_rewards_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward

mean_reward, std_reward = evaluate_agent(env, max_steps, n_eval_episodes, Qtable_frozenlake, eval_seed)
print(f"Mean_reward={mean_reward:.2f} +/- {std_reward:.2f}")

Mean_reward=0.33 +/- 0.47
