# Q-Learning
_By Anton Nikolaychuk_

In this project I will be implementing grid world and frozen lake - both basic reinforcement learning environments - and train an agent to navigate them using Q-learning.

We will start by importing our required libraries. Numpy is used mainly for random number generation while pygame is used to display our maze and training process.

In [1]:
import numpy as np
from time import time, sleep
import pygame

# Prevent showing Q-values in scientific format, for better readability
np.set_printoptions(suppress=True, precision=4)

pygame 2.6.1 (SDL 2.28.4, Python 3.13.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


## Q-Learning Implementation
The following class implements all functions required for the training process.

In [2]:
class QMaze():
    def __init__(self, maze_base):
        """Initialize all future required variables as well as the maze and Q-table"""

        self.maze_base = maze_base

        small_points = 0.2
        goal_points = 1
        self.step_cost = -0.03
        self.field_mapping = {"S": self.step_cost, "W": 0, "G": goal_points, "T": -goal_points, "o": self.step_cost, "+": small_points, "-": -small_points}
        self.action_mapping = {0: "left", 1: "down", 2: "right", 3: "up"}

        self.maze = []
        self.column_count = None
        self.row_count = None
        self.start_field = None
        self.used_boni = []

        self.q_table = None

        self.epsilon = 0.15
        self.slip_chance = 0.2

        self.cur_state = None # int between 0 and number of fields in the maze

        self.construct_maze()
        self.construct_q_table()

    def construct_maze(self):
        """Get the maze fields as a list of characters. Set some variables containing information about the maze layout"""

        rows = self.maze_base.split("\n")
        fields_str = "".join(rows)

        for i in range(len(fields_str)):
            self.maze.append(fields_str[i])

            if fields_str[i] == "S": 
                self.start_field = i
                self.cur_state = i

        self.column_count = len(rows[0])
        self.row_count = len(rows)

    def construct_q_table(self, method="zeros"):
        """Initializes the Q-table with 4 values for each field"""

        if method == "zeros":
            self.q_table = np.zeros((self.row_count*self.column_count, 4))
        elif method=="random":
            np.random.seed(1)
            self.q_table = np.random.uniform(-0.3, 0.3, (self.row_count*self.column_count, 4))
        
    def update_q_table(self, learning_rate, discount_rate):
        """Updates the value of the previous field the agent was on using Q-learning"""

        # Choose action to perform from current field
        intended_action, slip_action = self.choose_action()

        # Get the current Q-value for that action on that field
        old_q_val = self.q_table[self.cur_state, intended_action]

        # Get the new field the agent would be in after performing the chosen action
        new_state = self.get_new_state(slip_action)

        # Get the immediate reward
        immediate_reward = self.field_mapping[self.maze[new_state]]
        if new_state in self.used_boni: # if the reward/punishment of a bonus was already collected this episode, don't give it again
            immediate_reward = self.step_cost

        # Get the maximum Q-value of the actions that can be performed in the new field
        max_future_q_val = max(self.q_table[new_state])

        # Calculate the new Q-value for the chosen action for the current field based on TD
        new_q_val = old_q_val + learning_rate * (immediate_reward + discount_rate * max_future_q_val - old_q_val)

        # Update the Q-table with the value
        self.q_table[self.cur_state, intended_action] = new_q_val

        # Update the state to perform the action
        self.update_state(new_state)

    def action_possible(self, action):
        """Determine if a given action is possible at the current state"""

        new_state = self.get_new_state(action)

        # Prevent moving out of bounds of the maze grid
        if new_state < 0 or new_state >= len(self.maze):
            return False
        # Prevent moving from end of one row to start of next or vice versa by moving right/left
        if (not self.cur_state == 0) and (
            (self.cur_state % self.column_count == 0 and action == 0) or # start of row
            (self.cur_state % self.column_count == self.column_count - 1 and action == 2)): # end of row
            return False
        # Prevent moving onto walls
        if self.maze[new_state] == "W":
            return False
        
        return True
    
    def get_possible_actions(self) -> np.ndarray:
        """Get a list of possible actions"""

        all_actions = np.arange(4)
        pos_actions = []

        for act in all_actions:
            if self.action_possible(act): 
                pos_actions.append(act)

        return np.array(pos_actions)

    def get_new_state(self, action):
        """Get the state the agent will be in after performing a given action"""

        if action == 0: # Move left
            new_state = self.cur_state - 1
        elif action == 1: # Move down
            new_state = self.cur_state + self.column_count
        elif action == 2: # Move right
            new_state = self.cur_state + 1
        elif action == 3: # Move up
            new_state = self.cur_state - self.column_count
        else:
            raise KeyError(f"Invalid action: {action}")

        return new_state

    def update_state(self, new_state):
        """Update the current state based on the new state"""

        # Update state
        self.cur_state = new_state

        next_maze_field = self.maze[new_state]

        # If the agent lands on a bonus field, keep track of it until the end of the episode in order to prevent using bonus multiple times
        if next_maze_field == "+" or next_maze_field == "-":
            self.used_boni.append(new_state)

        # If the agent lands on a goal/trap field, reset his position to the start field and forget about the boni
        if next_maze_field == "G" or next_maze_field == "T":
            self.cur_state = self.start_field
            self.used_boni = []

    def choose_action(self):
        """Choose action using epsilon-greedy policy and incorporate slip chance"""

        pos_actions = self.get_possible_actions()
        intended_action = None
        slip_action = None

        if np.random.random() < 1-self.epsilon: # Choose best action based on highest Q-value
            cur_q_vals = self.q_table[self.cur_state]
            pos_q_vals = cur_q_vals[pos_actions]

            max_q_val = np.max(pos_q_vals)
            best_actions = np.where(cur_q_vals == max_q_val)[0]
            best_actions = [act for act in best_actions if act in pos_actions]
            
            intended_action = np.random.choice(best_actions)
        else: # Choose random action
            intended_action = pos_actions[np.random.randint(0, len(pos_actions))]

        slip_action = intended_action

        # There is a random chance for the agent to "slip" and perform an action different from the one he intended to perform
        if np.random.random() < self.slip_chance:
            slip_action = pos_actions[np.random.randint(0, len(pos_actions))]
        
        return intended_action, slip_action
    
    def reset_random_seed(self):
        """Function to reintroduce randomness after random seed has been set once. Required if Q-table initialization is random and a random seed is set for reproducability"""

        t = 1000 * time()
        np.random.seed(int(t) % 2**32)

## Setup
In the following cell, you can adjust the layout of the maze by changing the "maze_base" variable. The field mappings are as follows: 
- "o" - empty field
- "W" - wall/obstacle
- "S" - start field
- "G" - goal field
- "T" - trap/hole in ice
- "+" -  small reward
- "-" - small punishment

In [None]:
# Chose the maze layout here (all rows should be the same size and all columns should be the same size, but row and column counts do not have to match. Don't add whitespaces)
maze_base = """
ooo+ooo
ooooooo
SoTTToo
ooooooG
"""[1:-1]

#
q = QMaze(maze_base)


### Visualization

#### Pygame

In [None]:
# Initialize pygame window
pygame.init()
CELL_SIZE = 80
WINDOW_WIDTH = q.column_count * CELL_SIZE
WINDOW_HEIGHT = q.row_count * CELL_SIZE
screen = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))
clock = pygame.time.Clock()
pygame.display.set_caption("Q-Learning Maze")

# Color mapping for pygame maze display
colors = {
    "S": (0, 120, 255),     # Start - Bright Blue  
    "W": (60, 60, 60),      # Wall - Dark Gray  
    "G": (0, 200, 0),       # Goal - Vivid Green  
    "T": (180, 90, 200),    # Teleport or special tile - Purple  
    "o": (220, 220, 220),   # Empty - Light Gray  
    "+": (255, 165, 0),     # Bonus - Orange  
    "-": (200, 0, 0)        # Trap - Deep Red  
}

def display_maze():
    """Display maze using pygame"""

    # Stop running if pygame window is closed
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            pygame.quit()
            exit()

    # Draw maze
    for idx, tile in enumerate(q.maze):
        row = idx // q.column_count
        col = idx % q.column_count

        rect = pygame.Rect(col * CELL_SIZE, row * CELL_SIZE, CELL_SIZE, CELL_SIZE)

        color = colors.get(tile, (200, 200, 200))  # Default is light gray
        pygame.draw.rect(screen, color, rect)

        # Draw agent
        if idx == q.cur_state:
            pygame.draw.circle(screen, (0, 0, 0), rect.center, CELL_SIZE // 3)

        pygame.draw.rect(screen, (50, 50, 50), rect, 1)  # Grid lines

    pygame.display.flip()

    # 30 FPS
    clock.tick(30)

#### Terminal

In [5]:
import os

def print_maze():
    """Print maze in terminal"""

    os.system("cls")
    cur_maze = "".join(q.maze)[:q.cur_state] + "A" + "".join(q.maze)[q.cur_state+1:]
    j = q.column_count
    while j < len(q.maze):
        cur_maze = cur_maze[:j] + "\n" + cur_maze[j:]
        j += q.column_count+1
    print(cur_maze)

## Training

Finally, here you can decide for how many episodes the model should be trained and with which hyperparameters it should be trained.

In [6]:
# Training
episodes = 4000
for i in range(episodes):
    # Chooses an action for current state, adjusts Q-values, then performs that action
    q.update_q_table(learning_rate=0.05, discount_rate=0.9)
    
    # Display maze using pygame
    display_maze()

pygame.quit()

Let's also print out the Q-table at the start and end of training to see how the Q-values changed over the course of the training process.

In [7]:
print("Maze Layout: ")
print(maze_base, end="\n\n")

print("Final Q-table |||     Actions:    Left | Down | Right | Up")
q_table = q.q_table
for i in range(q.row_count):
    for j in range(i * q.column_count, (i+1) * q.column_count):
        print(f"Q-Value at row {i+1} for column {j % q.column_count + 1}: {q_table[j]}")


Maze Layout: 
ooo+ooo
ooooooo
SoTTToo
ooooooG

Final Q-table |||     Actions:    Left | Down | Right | Up
Q-Value at row 1 for column 1: [ 0.    -0.03  -0.029  0.   ]
Q-Value at row 1 for column 2: [-0.0301 -0.0355 -0.0172  0.    ]
Q-Value at row 1 for column 3: [-0.0118 -0.0413  0.0048  0.    ]
Q-Value at row 1 for column 4: [-0.0342 -0.0555 -0.0324  0.    ]
Q-Value at row 1 for column 5: [-0.0316 -0.0372 -0.0289  0.    ]
Q-Value at row 1 for column 6: [-0.0222  0.0178 -0.0219  0.    ]
Q-Value at row 1 for column 7: [-0.0126  0.0006  0.      0.    ]
Q-Value at row 2 for column 1: [ 0.     -0.0351 -0.0434 -0.032 ]
Q-Value at row 2 for column 2: [-0.0356 -0.0362 -0.0428 -0.0329]
Q-Value at row 2 for column 3: [-0.1336 -0.2262 -0.124  -0.0475]
Q-Value at row 2 for column 4: [-0.1024 -0.0983 -0.0836 -0.0903]
Q-Value at row 2 for column 5: [-0.0725 -0.1855 -0.0084 -0.0576]
Q-Value at row 2 for column 6: [-0.0174  0.1861 -0.0076 -0.011 ]
Q-Value at row 2 for column 7: [-0.0026  0.1326  0.  