![Logo](../assets/logo.png)

Made by **Domonkos Nagy**

[<img src="https://colab.research.google.com/assets/colab-badge.svg">](https://colab.research.google.com/github/Fortuz/rl_education/blob/main/7.%20Planning%20and%20Learning/maze_solution.ipynb)

# Maze (solution)

In this notebook, we consider a simple maze-solving problem where the agent has to find the shortest path from the start (top left corner) to the exit (bottom right corner).

<img src="assets/maze.gif" width="400"/>

This maze environment was originally made with the old `gym` library, but we apply an API compatibility layer, so it behaves exactly like
a `gymnasium` environment. The states are the x, y coordinates of the agent (which we transform to be respresented by a single integer), and the actions are the 4 directions: 'N', 'S', 'E' and 'W'. The reward
is -0.1/(number of cells) for each step, and a reward of +1 is received for reaching the goal.

The maze is randomly generated each time the environment is created. This notebook uses *prioritized sweeping* to approximate the optimal policy in the (third-party) `maze-random-10x10-v0` environment.

- This notebook is based on Chapter 8 of the book *Reinforcement Learning: An Introduction (2nd ed.)* by R. Sutton & A. Barto, available at http://incompleteideas.net/book/the-book-2nd.html
- Documentation for the Maze environment: https://github.com/MattChanTK/gym-maze

In [1]:
# Install dependencies if running in Colab
import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    !pip install gymnasium==0.29.0
    !pip install setuptools==58.2.0 
    !pip install shimmy[gym-v26]
    !git clone https://github.com/MattChanTK/gym-maze gym-maze
    %cd gym-maze
    !python3 setup.py install
    %cd ..
    !rm -r gym* 

In [2]:
import gym
import gym_maze
import matplotlib.pyplot as plt
import numpy as np
from IPython import display
from gymnasium.wrappers import TransformObservation, RecordVideo
import ipywidgets as widgets
import pickle
import warnings
warnings.filterwarnings('ignore')

In [3]:
# Hyperparameters
N_EPISODES = 5  # Number of training episodes
N_UPDATES_PER_STEP = 200  # Number of planning updates per interaction with the environment
EPSILON = 0.01  # Exploration rate
ALPHA = 0.7  # Learning rate
GAMMA = 1  # Discount factor
THETA = 0.01  # Priority treshold
N_RECORDINGS = 3  # Number of episodes to record
REC_EPISODES = np.linspace(1, N_EPISODES-1, num=N_RECORDINGS, dtype=int)  # Episodes to record (the first episode is not recorded)

In [4]:
# Initialize gym environment with gymnasium compatibility
base_env = gym.make("maze-random-10x10-v0", apply_api_compatibility=True, render_mode='rgb_array')
# Transform observation representation from array to int: e.g. [3, 4] -> 43
base_env = TransformObservation(base_env, lambda obs: int(obs[1] * (base_env.observation_space.high + 1)[0] + obs[0]))
# Wrap environment to record videos throughout the learning process 
trigger = lambda ep: ep in REC_EPISODES
env = RecordVideo(base_env, video_folder="./videos", episode_trigger=trigger, disable_logger=True)

In [5]:
# Initialize Q-table
action_space_size = env.action_space.n
observation_space_size = (env.observation_space.high + 1)[0] * \
    (env.observation_space.high + 1)[1]
q_table_shape = observation_space_size, action_space_size
q_table = np.zeros(q_table_shape)

# Initialize priority 'queue'
priorities = np.zeros(q_table_shape)

In [6]:
# Argmax function that breaks ties randomly
def argmax(arr):
    arr_max = np.max(arr)
    return np.random.choice(np.where(arr == arr_max)[0])

## The Model

In addition to the Q-table, the agent also learns a model of the environment. Since the maze is deterministic, the model is pretty simple:
for each state-action pair, the model stores the next state and reward: $\text{Model}(S_t, A_t) = (R_{t+1}, S_{t+1})$. The `add` method
is used to add new information to the model, while the `get` method returns the reward and next state for a given state-action pair.
The `get_leading` method returns all state-action pairs that lead to a given state: $\text{get} \textunderscore \text{leading}(S_t) = \{(s, a)\in\mathcal{S}\times\mathcal{A}\ |\ Model(s, a)_2 = S_t\}$.

In [7]:
# Class representing a model of the environment
class Model:
    def __init__(self, shape):
        self.transitions = np.zeros(shape, dtype=int) - 1
        self.rewards = np.zeros(shape)
        self.transitions_reverse = {}

    # Add new information to the model:
    # Taking 'action' in 'obs' produces 'reward'
    # and transfers the agent to 'new_obs'
    def add(self, obs, action, reward, new_obs):
        self.transitions[obs, action] = new_obs
        self.rewards[obs, action] = reward

        if new_obs in self.transitions_reverse:
            if not (obs, action) in self.transitions_reverse[new_obs]:
                self.transitions_reverse[new_obs].append((obs, action))
        else:
            self.transitions_reverse[new_obs] = [(obs, action)]

    # Get information from the model:
    def get(self, obs, action):
        new_obs = self.transitions[obs, action]
        reward = self.rewards[obs, action]
        return new_obs, reward

    # Get all obs-action pairs that lead the agent to 'obs'
    def get_leading(self, obs):
        return self.transitions_reverse[obs]

## Prioritized Sweeping

Prioritized sweeping works similarly to Q-learning, but in addition to updates based on real experience (*learning*) it also utilizes updates based on simulated
experience (*planning*). To be able to do this, the algorithm records each state transition, forming a model of the environment.

A simpler, similar algorithm is called *Dyna-Q*: it is essentially Q-learning with state transitions being recorded to a model, and after each step, randomly selected
Q-values are being updated using information from the model.
Prioritized sweeping improves on Dyna-Q by focusing on updating state-action pairs with higher temporal-difference error values more frequently. By prioritizing updates based on the magnitude of the error, it accelerates the learning process by directing attention to the most critical areas of the environment, where value estimates need refinement, leading to quicker convergence.

Since state-action pairs that lead to a state with a high error are likely to have a high TD error themselves, after each update, the errors of state-action pairs that lead to the newly updated state are recalculated. This leads to a quick backpropagation of rewards: for example, in the maze environment, after the goal state is reached, prioritized sweeping will
first update the state-action pair leading to the goal state, then the state-action pairs leading to the state just before the goal state, and so on, spreading backwards from the
goal to the starting state.

In this example, the agent learns to solve the maze in only a few episodes, with 200 planning updates after each step. Due to the nature of this environment, the agent will wander around the maze aimlessly in the first episode until it stumbles upon the goal. From that moment, the newly gained information propagates backwards rapidly, and the agent's performance increases marginally in the second episode.

***

### **Your Task**

Implement this algorithm! The block below only contains code necessary for logging the episode length and queue size after each episode. The algorithm itself is up to you! Pseudocode for this algorithm is shown in the box below.

<img src="assets/prioritized_sweeping.png" width="700"/>

*Pseudocode from page 170 of the Sutton & Barto book*

#### **Hints:**

- For simplicity and readability, it's recommended to separate the "planning" phase (*(g)* in the pseudocode) to its own function.
- Instead of 0-3, the actions in this environment are the strings 'N', 'S', 'E' and 'W'.
- The priority queue is represented by a NumPy array (`priorities`), as Python's built-in implementations don't allow modifying the priority of an element. 
- Truncation is not needed for this environment.

***

In [8]:
# Planning function
def plan():
    for _ in range(N_UPDATES_PER_STEP):
        # Get state-action pair with highest priority
        obs, action = np.unravel_index(np.argmax(priorities), priorities.shape)
        
        # If the highest priority is 0, there is nothing to do
        if priorities[obs, action] == 0:
            break

        # Reset priority
        priorities[obs, action] = 0

        # Get new state and reward from model
        new_obs, reward = model.get(obs, action)

        # Update Q-table
        q_table[obs, action] += ALPHA * (reward + GAMMA * np.max(q_table[new_obs]) - q_table[obs, action])

        # Add leading states to queue
        for prev_obs, prev_action in model.get_leading(obs):
            _, prev_reward = model.get(prev_obs, prev_action)
            priority = abs(prev_reward + GAMMA * np.max(q_table[obs] - q_table[prev_obs, prev_action]))

            if priorities[prev_obs, prev_action] < priority and priority > THETA:
                priorities[prev_obs, prev_action] = priority

In [9]:
# Initalize environment, Q-table, model and priority queue
env = RecordVideo(base_env, video_folder="./videos", episode_trigger=trigger, disable_logger=True)
q_table = np.zeros(q_table_shape)
model = Model(q_table_shape)
priorities = np.zeros(q_table_shape)

# Training loop
for episode in range(N_EPISODES):
    obs, _ = env.reset()
    terminated = False
    n_steps = 0

    while not terminated:
        n_steps += 1
        ############## CODE HERE ###################
        
        # Epsilon-greedy action selection
        if np.random.rand() > EPSILON:
            action = argmax(q_table[obs])
        else:
            action = env.action_space.sample()

        # Take selected action
        new_obs, reward, terminated, truncated, info = env.step(['N', 'S', 'E', 'W'][action])
        
        # Add information to the model
        model.add(obs, action, reward, new_obs)

        # Add state-action pair to queue
        priority = abs(reward + GAMMA * np.max(q_table[new_obs] - q_table[obs, action]))
        if priorities[obs, action] < priority and priority > THETA:
            priorities[obs, action] = priority

        # Store new state
        obs = new_obs

        # Start a planning phase
        plan()

        ############################################
        
    # Log results
    print(f'Episode {episode+1:,}:\n\tSteps: {n_steps:,}\n\tQueue size: {np.count_nonzero(priorities):,}')
    
# Save Q-table
with open('q_table.bin', 'wb') as f:
    pickle.dump(q_table, f)

Episode 1:
	Steps: 1,498
	Queue size: 59
Episode 2:
	Steps: 23
	Queue size: 92
Episode 3:
	Steps: 22
	Queue size: 31
Episode 4:
	Steps: 22
	Queue size: 2
Episode 5:
	Steps: 22
	Queue size: 0


## Results

You can watch the videos recorded throughout the training process here:

*(Note that the first episode is not recorded due to its excessive length)*

In [10]:
# Display recordings
children = [widgets.Video.from_file(f'./videos/rl-video-episode-{episode}.mp4', autoplay=False, loop=False, width=500) for episode in REC_EPISODES]
tab = widgets.Tab()
tab.children = children
titles = tuple([f'Episode {episode + 1:,}' for episode in REC_EPISODES])
for i in range(len(children)):
    tab.set_title(i, titles[i])
display.display(tab)

Tab(children=(Video(value=b'\x00\x00\x00 ftypisom\x00\x00\x02\x00isomiso2avc1mp41\x00\x00\x00\x08free...', aut…