
# Q-Learning on a Custom FrozenLake Environment
In this notebook, we explore the application of Q-learning to train an agent that navigates a custom 11x11 FrozenLake environment.
We aim to help the agent learn to avoid holes and reach the goal with an optimal policy.



## Environment Setup
- **Custom Map**: A customized 11x11 Frozen Lake map with start (S), holes (H), frozen tiles (F), and goal (G).
- **Environment Initialization**: We set `is_slippery=False` to remove randomness in movements.


In [None]:
#import venv
# Create a virtual environment
#venv.create('env', with_pip=True)
#!env\Scripts\activate


In [None]:
# Activate the virtual environment and install dependencies
!pip install gymnasium gymnasium[toy_text] matplotlib "numpy<2.0"

In [None]:

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Fixed 11x11 custom map
custom_map = [
    "SFHFFFHFFFF",
    "HFFFFFFFFFF",
    "FFFHFFHFFFF",
    "FFFFFFFFFFF",
    "FFFFHFFFFFH",
    "FHFFFFFFHFF",
    "FFFHFFFFFFF",
    "FFFFFHFFFFF",
    "FFHFFFFFFFF",
    "HFFFFFFFFFF",
    "FFFFFFFFFFG"
]

# Create Frozen Lake environment
env = gym.make("FrozenLake-v1", desc=custom_map, is_slippery=False, render_mode="rgb_array")



## Q-Table Initialization
- The Q-table stores learned values for each (state, action) pair.
- Initialized as zeros, representing equal preference for all actions initially.


In [None]:

# Initialize the Q-table
state_space = env.observation_space.n
action_space = env.action_space.n
q_table = np.zeros((state_space, action_space))



## Hyperparameters Setup
- **Alpha**: Learning rate controls how much new knowledge overrides the old.
- **Gamma**: Discount factor for future rewards.
- **Epsilon**: Initial exploration rate for epsilon-greedy strategy.
- **Epsilon Decay**: Reduces exploration over time.
- **Episodes & Steps**: Control the duration of training.


In [None]:

# Hyperparameters
alpha = 0.8  # Learning rate
gamma = 0.95  # Discount factor
epsilon = 0.1  # Initial exploration rate
epsilon_decay = 0.01  # Epsilon decay rate
min_epsilon = 0.1  # Minimum epsilon
episodes = 30000  # Number of episodes for training
max_steps = 300  # Max steps per episode



## Loading or Initializing Q-Table
Load an existing Q-table if available to continue learning, otherwise start fresh.


In [None]:

# Load Q-table if it exists
try:
    q_table = np.load("frozenlake_qtable.npy")
    print("Loaded saved Q-table.")
except FileNotFoundError:
    print("No saved Q-table found. Starting fresh.")



## Training Loop
The training loop where the agent learns by interacting with the environment:
- **Epsilon-Greedy**: Chooses between exploration and exploitation.
- **Rewards Structure**: Rewards based on reaching the goal, falling in a hole, or taking steps.
- **Q-Update**: Updates the Q-values based on the observed rewards and future potential rewards.


In [None]:

# Training loop
episode_rewards = []
states_visited = []
actions_taken = []

for episode in range(episodes):
    state, _ = env.reset()
    total_rewards = 0

    for step in range(max_steps):
        if np.random.uniform(0, 1) < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state, :])
        
        next_state, reward, done, _, _ = env.step(action)

        goal_state = state_space - 1
        distance_to_goal = abs(state - goal_state)
        next_distance_to_goal = abs(next_state - goal_state)

        if done and reward == 0:
            reward = -100
        elif done and reward == 1:
            reward = 100
        else:
            step_penalty = -10
            reward = step_penalty + 10 * (distance_to_goal - next_distance_to_goal)

        q_table[state, action] = q_table[state, action] + alpha * (
            reward + gamma * np.max(q_table[next_state, :]) - q_table[state, action]
        )

        states_visited.append(state)
        actions_taken.append(action)
        state = next_state
        total_rewards += reward

        if done:
            break

    epsilon = max(min_epsilon, epsilon * epsilon_decay)
    episode_rewards.append(total_rewards)

    if (episode + 1) % 500 == 0:
        print(f"Episode {episode + 1} | Epsilon: {epsilon:.4f} | Last Reward: {total_rewards}")



## Saving Q-Table
Saves the trained Q-table to continue learning or for future usage.


In [None]:

# Save the trained Q-table
np.save("frozenlake_qtable.npy", q_table)
print("Trained Q-table saved.")



## Testing the Agent
After training, we test the agent to evaluate its performance based on the learned policy.


In [None]:

# Testing the agent
test_episodes = 100
total_rewards = 0

for episode in range(test_episodes):
    state, _ = env.reset()
    for step in range(max_steps):
        action = np.argmax(q_table[state, :])
        next_state, reward, done, _, _ = env.step(action)
        total_rewards += reward
        state = next_state
        if done:
            break

print(f"Average reward over {test_episodes} test episodes: {total_rewards / test_episodes}")



## Visualization of Q-Table and Policy
- **Q-Values Map**: Shows the learned policy in terms of best actions to take in each state.
- **Helper Functions**: Functions to create visualizations of the learned Q-values.


In [None]:
# Helper Functions
output_dir = "output"  # Define the output directory
os.makedirs(output_dir, exist_ok=True)  # Create the directory if it doesn't exist

def qtable_directions_map(q_table, map_size):
    """Get the best learned action & map it to arrows."""
    qtable_val_max = q_table.max(axis=1).reshape(map_size, map_size)
    qtable_best_action = np.argmax(q_table, axis=1).reshape(map_size, map_size)
    directions = {0: "←", 1: "↓", 2: "→", 3: "↑"}
    qtable_directions = np.empty(qtable_best_action.flatten().shape, dtype=str)
    eps = np.finfo(float).eps  # Minimum float number on the machine
    for idx, val in enumerate(qtable_best_action.flatten()):
        if qtable_val_max.flatten()[idx] > eps:
            qtable_directions[idx] = directions[val]
    qtable_directions = qtable_directions.reshape(map_size, map_size)
    return qtable_val_max, qtable_directions

def plot_q_values_map(q_table, env, map_size):
    """Plot the last frame of the simulation and the policy learned."""
    qtable_val_max, qtable_directions = qtable_directions_map(q_table, map_size)

    # Capture the last frame of the environment
    env.reset()
    frame = env.render()  # Render mode should be set to 'rgb_array'

    # Plot the last frame
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))
    ax[0].imshow(frame)
    ax[0].axis("off")
    ax[0].set_title("Last Frame")

    # Plot the policy
    sns.heatmap(
        qtable_val_max,
        annot=qtable_directions,
        fmt="",
        ax=ax[1],
        cmap=sns.color_palette("Blues", as_cmap=True),
        linewidths=0.7,
        linecolor="black",
        xticklabels=[],
        yticklabels=[],
        annot_kws={"fontsize": "xx-large"},
    ).set(title="Learned Q-values\nArrows represent best action")
    plt.tight_layout()
    
    # Instead of plt.show(), save the figure
    plot_filename = os.path.join(output_dir, "q_values_map.png")
    plt.savefig(plot_filename)
    plt.close()

def plot_states_actions_distribution(states, actions):
    """Plot the distributions of states and actions."""
    labels = {0: "LEFT", 1: "DOWN", 2: "RIGHT", 3: "UP"}

    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))

    # Plot distribution of states
    sns.histplot(data=states, ax=ax[0], kde=True, bins=state_space)
    ax[0].set_title("State Distribution")

    # Plot distribution of actions
    sns.histplot(data=actions, ax=ax[1], bins=4)
    ax[1].set_xticks(list(labels.keys()), labels=labels.values())
    ax[1].set_title("Action Distribution")

    plt.tight_layout()
    
    # Instead of plt.show(), save the figure
    plot_filename = os.path.join(output_dir, "states_actions_distribution.png")
    plt.savefig(plot_filename)
    plt.close()

# Visualize the Q-table and policy
plot_q_values_map(q_table, env, len(custom_map))

# Visualize state and action distributions
plot_states_actions_distribution(states_visited, actions_taken)

# Visualize the Q-table
print("Trained Q-Table:")
print(q_table)
