# Reinforcement Learning 
## Implementing on and off-policy algorithms for learning the optimal policy for exiting a maze

In [1]:
import random
import numpy as np
import matplotlib.pyplot as plt
import pygame
import time

pygame 2.6.1 (SDL 2.28.4, Python 3.12.4)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
N = 3

player = "P"
goal = "G"
initial_state = 0
final_state = N**2 - 1


maze = [0 for i in range(0,N**2)]
maze[initial_state] = player
maze[final_state] = goal
visited_states = [initial_state]

actions = ["up", "down", "left", "right"]

learning_history = [maze.copy()]

In [3]:
def print_maze(maze):
    print("")
    print(maze[:N])
    for i in range(1,N-1):
        print(maze[i*N:(i+1)*N])
    print(maze[N**2-N:])
    print("")


def draw_maze(maze, description):
    # Clear the screen
    screen.fill(WHITE)

    # Draw the maze
    for i in range(N):
        for j in range(N):
            if i * N + j in visited_states:
                pygame.draw.rect(screen, GRAY, (j * CELL_SIZE, i * CELL_SIZE, CELL_SIZE, CELL_SIZE))
            if maze[i * N + j] == 'P':
                pygame.draw.rect(screen, BLACK, (j * CELL_SIZE, i * CELL_SIZE, CELL_SIZE, CELL_SIZE))
            if maze[i * N + j] == 'G':
                pygame.draw.rect(screen, RED, (j * CELL_SIZE, i * CELL_SIZE, CELL_SIZE, CELL_SIZE))
            # Draw grid lines
            pygame.draw.rect(screen, GRAY, (j * CELL_SIZE, i * CELL_SIZE, CELL_SIZE, CELL_SIZE), 1)

    # Draw the description
    description_x = MAZE_WIDTH + 20  # Start description 20 pixels to the right of the maze
    description_y = 10
    for i, line in enumerate(description):
        text_surface = font.render(line, True, BLACK)
        screen.blit(text_surface, (description_x, description_y))
        description_y +=  size + 4 # Move down for the next line
        pygame.display.flip() # Update the display
        if (len(description) < 7) or (i>6 and len(description) > 6): 
            time.sleep(0.2)

    # Update the display
    # pygame.display.flip()

In [4]:
def reset_position(maze, visited_states, learning_history):
    maze = [0 for i in range(0,N**2)]
    maze[initial_state] = player
    maze[final_state] = goal
    visited_states = [initial_state]
    learning_history = [maze.copy()]
    return maze, visited_states, learning_history

In [5]:
print(f"maze:")
print_maze(maze)

maze:

['P', 0, 0]
[0, 0, 0]
[0, 0, 'G']



In [6]:
goal_reward = 1
wall_reward = -1
repeated_state_reward = -1
step_reward = -0.1

gamma = 0.9
alpha = 0.5

In [None]:
def build_transition_model(N):
    positions = range(0,N**2)
    transition_model = {position: {} for position in positions}

    for position in positions:
        transition_model[position]["up"] = position - N
        transition_model[position]["down"] = position + N
        transition_model[position]["right"] = position + 1
        transition_model[position]["left"] = position - 1
        if position < N:
            transition_model[position]["up"] = position
        if position in range(N**2 - N, N**2):
            transition_model[position]["down"] = position
        if position % N == 0:
            transition_model[position]["left"] = position
        if (position + 1) % N == 0:
            transition_model[position]["right"] = position

    return transition_model

In [8]:
def build_q_table(N):
    positions = range(0,N**2)
    q_table = {position: {action: 0 for action in actions} for position in positions}
    return q_table

In [9]:
transition_model = build_transition_model(N)
q_table = build_q_table(N)

In [10]:
def choose_action(current_state, verbose=True):
    log_print = []

    # let's choose an action with ε greedy
    if verbose:
        # print(f"q scores: {q_table.get(current_state)}")
        log_print.append(f"q scores: {q_table.get(current_state)}")
    
    epsilon = 0.05
    sample = random.uniform(0,1)
 
    next_action = random.choice(actions) # setting an initial random action
    best_q_value = q_table.get(current_state).get(next_action) # picking the q value associated to the action 

    for action in actions:
        if q_table.get(current_state).get(action) > best_q_value:
            next_action = action
            best_q_value = q_table.get(current_state).get(next_action)
    
    # for now my next_action is the best according to greedy selection.
    
    if sample > epsilon:
        # greedy case
        next_action = next_action
        if verbose:
            # print(f"greedy action: {next_action}")
            log_print.append(f"greedy action: {next_action}")
            
        
    else:
        actions_without_the_best = [action for action in actions if action != next_action]
        next_action = random.choice(actions_without_the_best)
        best_q_value = q_table.get(current_state).get(next_action)
        if verbose:
            # print(f"random action: {next_action}")
            log_print.append(f"random action: {next_action}")
    
    # print("")
    return next_action, best_q_value, log_print
    

def get_reward(current_state, new_state):
    log_print = ""
    if new_state == final_state:
        reward = goal_reward
        # print("I reached the goal!")
        log_print += "I reached the goal!"

    elif current_state == new_state:
        reward = wall_reward
        # print("I hit the wall!")
        log_print += "I hit the wall!"

    elif new_state in visited_states:
        # print("I have already visited this state!")
        reward = repeated_state_reward
        log_print += "I have already visited this state!"
    
    elif current_state != new_state:
        # print("I made a step!")
        reward = step_reward
        log_print += "I made a step!"

    else:
        print("what is going on???")
        exit(0)
        
    
    return reward, [log_print]

def move_player(current_state, new_state):
    maze[current_state] = 0
    maze[new_state] = player
    learning_history.append(maze.copy())
    if new_state not in visited_states:
        visited_states.append(new_state)


def get_best_q_value(current_state):
    next_action = actions[0]
    best_q_value = q_table.get(current_state).get(next_action) # picking the q value associated to the action 

    for action in actions[1:]:
        if q_table.get(current_state).get(action) > best_q_value:
            next_action = action
            best_q_value = q_table.get(current_state).get(next_action)
    return best_q_value
    

In [12]:
import pygame
import time

# Initialize pygame
pygame.init()

# Constants
# N = 3
CELL_SIZE = 100
MAZE_WIDTH = N * CELL_SIZE
DESCRIPTION_WIDTH = 400
WINDOW_SIZE = (MAZE_WIDTH + DESCRIPTION_WIDTH, max(N * CELL_SIZE, 320))
screen = pygame.display.set_mode(WINDOW_SIZE)
pygame.display.set_caption("Maze Evolution with Description")

# Colors
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
GRAY = (200, 200, 200)
RED = (255, 0, 0)

# Fonts
size = 20 
font = pygame.font.Font(None, size)

###############################################################################

maze, visited_states, learning_history = reset_position(maze, visited_states, learning_history)
print_maze(maze)

# algorithm = "sarsa"
algorithm = "q-learning"

training_time = range(0,1000)
current_state = initial_state
current_action, old_q_estimate, log_action = choose_action(current_state)

log_print = ["timestep: 0"]
log_print.append(f"I'm in position {current_state} and the episode just started.")
log_print.extend(log_action)
log_print.append("")

draw_maze(maze, log_print)
time.sleep(2)
# log_print=[]

for t in training_time: # for each episode
    # choose the action according to the q_table
    log_print.append(f"timestep: {t+1}")
    new_state = transition_model.get(current_state).get(current_action)
    log_print.append(f"I moved to the position {new_state}")
    reward, log_reward = get_reward(current_state, new_state)
    log_print.extend(log_reward)
    log_print.append(f"reward: {reward}")
    
    move_player(current_state, new_state)
    print_maze(maze)

    new_action, next_q_estimate, log_action = choose_action(new_state, verbose=(not new_state==final_state))
    log_print.extend(log_action)

    if algorithm == "sarsa":
        updated_q_estimate = round(old_q_estimate + alpha*(reward + gamma*next_q_estimate - old_q_estimate), 3)
    if algorithm == "q-learning":
        updated_q_estimate = round(old_q_estimate + alpha*(reward + gamma*get_best_q_value(current_state) - old_q_estimate), 3)

    q_table[current_state][current_action] = updated_q_estimate

    current_state = new_state
    current_action, old_q_estimate = new_action, next_q_estimate

    draw_maze(maze, log_print)
    time.sleep(2)  # Simulate time passing

    if current_state == N**2-1:
        break
    
    log_print.append(f"")

    for log in log_print:
        print(log)
    
    if t % 2 == 0:
        log_print = []

# Quit pygame
pygame.quit()

print(f"Episode time (iterations): {t}")


['P', 0, 0]
[0, 0, 0]
[0, 0, 'G']


['P', 0, 0]
[0, 0, 0]
[0, 0, 'G']

timestep: 0
I'm in position 0 and the episode just started.
q scores: {'up': 0, 'down': 0, 'left': 0, 'right': -0.05}
greedy action: up

timestep: 1
I moved to the position 0
I hit the wall!
reward: -1
q scores: {'up': 0, 'down': 0, 'left': 0, 'right': -0.05}
greedy action: down


[0, 0, 0]
['P', 0, 0]
[0, 0, 'G']

timestep: 2
I moved to the position 3
I made a step!
reward: -0.1
q scores: {'up': 0, 'down': 0, 'left': -0.5, 'right': -0.5}
greedy action: up


['P', 0, 0]
[0, 0, 0]
[0, 0, 'G']

timestep: 2
I moved to the position 3
I made a step!
reward: -0.1
q scores: {'up': 0, 'down': 0, 'left': -0.5, 'right': -0.5}
greedy action: up

timestep: 3
I moved to the position 0
I have already visited this state!
reward: -1
q scores: {'up': -0.5, 'down': -0.05, 'left': 0, 'right': -0.05}
greedy action: left


['P', 0, 0]
[0, 0, 0]
[0, 0, 'G']

timestep: 4
I moved to the position 0
I hit the wall!
reward: -1
q scores: {'up