# The algorithm

<b>Input</b>:<br>
	$\quad\quad$ none<br>
<b>Algorithm parameter</b>:<br>
	$\quad\quad$ discount factor $\gamma$<br>
	$\quad\quad$ step size $\alpha \in (0,1]$<br>
	$\quad\quad$ small $\epsilon > 0$<br><br>

<b>FOR EACH</b> episode<br>
	$\quad$ $\mathbf{s} \leftarrow \text{env.reset}()$<br>
	$\quad$ <b>DO</b> <br>
		$\quad\quad$ $\mathbf{a} \leftarrow \epsilon\text{-greedy}(\mathbf{s}, \hat{Q}_{\mathbf{\omega}}$ $)$<br>
		$\quad\quad$ $r, \mathbf{s'} \leftarrow \text{env.step}(\mathbf{a})$<br>
		$\quad\quad$ $y \leftarrow
		\begin{cases}
			r & \text{for terminal } \mathbf{s'}\\
			r + \gamma \max_{\mathbf{a}^\star \in A} \hat{Q}_{\mathbf{\omega}}(\mathbf{s'})_{\mathbf{a}^\star} & \text{for non-terminal } \mathbf{s'}
		\end{cases}$<br>
		$\quad\quad$ $\mathbf{\omega} \leftarrow \mathbf{\omega} + \alpha \left[ y - \hat{Q}_{\mathbf{\omega}}(\mathbf{s})_{\mathbf{a}} \right] ~ \nabla_{\mathbf{\omega}} \hat{Q}_{\mathbf{\omega}}(\mathbf{s})_{\mathbf{a}}$ <br>
		$\quad\quad$ $\mathbf{s} \leftarrow \mathbf{s'}$ <br>
	$\quad$ <b>UNTIL</b> $\mathbf{s}$ is final<br><br>
<b>RETURN</b> $\mathbf{\omega}$ <br>

---


In [43]:
import torch

import matplotlib.pyplot as plt
import numpy as np

import seaborn as sns
from tqdm.notebook import tqdm # pour les barres de progression
import random # pour epsilon greedy

import collections # pour le relay buffer
from typing import Callable, cast, List, Tuple, Union

In [37]:
import sys
import os

sys.path.append(os.path.abspath(os.path.join(os.path.dirname('../qoridor'))))


from qoridor.game import QoridorGame
from qoridor.move import Move, MoveType
from qoridor.board import WallOrientation
from qoridor.visualization import QoridorVisualizer
from agents.random_agent import RandomAgent
from qoridor.environment import QoridorEnv

In [14]:
game = QoridorGame(board_size=5, num_walls=3)
visualizer = QoridorVisualizer()

In [18]:
game.state.get_observation()

{'board_size': 5,
 'player_positions': {1: (0, 2), 2: (4, 2)},
 'walls_left': {1: 3, 2: 3},
 'horizontal_walls': array([[False, False, False, False],
        [False, False, False, False],
        [False, False, False, False],
        [False, False, False, False]]),
 'vertical_walls': array([[False, False, False, False],
        [False, False, False, False],
        [False, False, False, False],
        [False, False, False, False]]),
 'current_player': 1,
 'done': False,
 'winner': None}

In [33]:
def state_to_vector(state):
    """Converts the state to a vector representation.

    Args:
        state (_type_): _description_
    """
    player_1_pos = state.get_observation()['player_positions'][1] # tuple (x,y)
    player_2_pos = state.get_observation()['player_positions'][2] # tuple (x,y)
    walls_1 = state.get_observation()['walls_left'][1]  #int
    walls_2 = state.get_observation()['walls_left'][2] #int
    current_player = state.get_observation()['current_player'] # 1 ou 2 
    horizontal_walls = state.get_observation()['horizontal_walls'].astype(int) # matrice (N-1)*(N-1) avec bit de présence
    vertical_walls = state.get_observation()['vertical_walls'].astype(int) # matrice (N-1)*(N-1) avec bit de présence

    state_vector = np.array([player_1_pos[0],player_1_pos[1],player_2_pos[0],player_2_pos[1],walls_1,walls_2,current_player])

    horizontal_walls_vector = horizontal_walls.flatten()
    vertical_walls_vector = vertical_walls.flatten()

    full_state_vector = np.concatenate((state_vector,horizontal_walls_vector,vertical_walls_vector))

    return full_state_vector




In [34]:
state_to_vector(game.state)

array([0, 2, 4, 2, 3, 3, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

<div class="alert alert-block alert-warning">  
<b> Warning :</b> We could discuss the architecture of the network (fully connected, CNN etc)
</div>

In [35]:
class QNetwork(torch.nn.Module):
    """
    A Q-Network implemented with PyTorch.

    Attributes
    ----------
    layer1 : torch.nn.Linear
        First fully connected layer.
    layer2 : torch.nn.Linear
        Second fully connected layer.
    layer3 : torch.nn.Linear
        Third fully connected layer.

    Methods
    -------
    forward(x: torch.Tensor) -> torch.Tensor
        Define the forward pass of the QNetwork.
    """

    def __init__(self, n_observations: int, n_actions: int, nn_l1: int, nn_l2: int):
        """
        Initialize a new instance of QNetwork.

        Parameters
        ----------
        n_observations : int
            The size of the observation space.
        n_actions : int
            The size of the action space.
        nn_l1 : int
            The number of neurons on the first layer.
        nn_l2 : int
            The number of neurons on the second layer.
        """
        super(QNetwork, self).__init__()
        self.layer1 = torch.nn.Linear(n_observations, nn_l1)
        self.layer2 = torch.nn.Linear(nn_l1, nn_l2)
        self.layer3 = torch.nn.Linear(nn_l2, n_actions)


    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Define the forward pass of the QNetwork.

        Parameters
        ----------
        x : torch.Tensor
            The input tensor (state).

        Returns
        -------
        torch.Tensor
            The output tensor (Q-values).
        """
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        output_tensor = self.layer3(x) # linear activation = no activation function



        return output_tensor

In [41]:
class EpsilonGreedy:
    """
    An Epsilon-Greedy policy.

    Attributes
    ----------
    epsilon : float
        The initial probability of choosing a random action.
    epsilon_min : float
        The minimum probability of choosing a random action.
    epsilon_decay : float
        The decay rate for the epsilon value after each action.
    env : ????
    q_network : torch.nn.Module
        The Q-Network used to estimate action values.

    Methods
    -------
    __call__(state: np.ndarray) -> np.int64
        Select an action for the given state using the epsilon-greedy policy.
    decay_epsilon()
        Decay the epsilon value after each action.
    """

    def __init__(
        self,
        epsilon_start: float,
        epsilon_min: float,
        epsilon_decay: float,
        env: QoridorEnv,
        q_network: torch.nn.Module,
    ):
       
        self.epsilon = epsilon_start
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.env = env
        self.q_network = q_network

    def __call__(self, state: np.ndarray) -> np.int64:

        if random.random() < self.epsilon:

            action = self.env.get_legal_actions()
            action = random.choice(action)
        else:
            with torch.no_grad():
                state_tensor = torch.tensor(state_to_vector(state), dtype=torch.float32).unsqueeze(0)
                q_values = self.q_network(state_tensor)
                action = torch.argmax(q_values, dim=1).item()
        return action

    def decay_epsilon(self):
        """
        Decay the epsilon value after each episode.

        The new epsilon value is the maximum of `epsilon_min` and the product of the current
        epsilon value and `epsilon_decay`.
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)


<div class="alert alert-block alert-warning">  
<b> Warning :</b> We need to check more the env.step and change the rewards (for the moment : +1 if victory -1 if loss)
</div>

In [44]:
class ReplayBuffer:
    """
    A Replay Buffer.

    Attributes
    ----------
    buffer : collections.deque
        A double-ended queue where the transitions are stored.

    Methods
    -------
    add(state: np.ndarray, action: np.int64, reward: float, next_state: np.ndarray, done: bool)
        Add a new transition to the buffer.
    sample(batch_size: int) -> Tuple[np.ndarray, float, float, np.ndarray, bool]
        Sample a batch of transitions from the buffer.
    __len__()
        Return the current size of the buffer.
    """

    def __init__(self, capacity: int):
        """
        Initializes a ReplayBuffer instance.

        Parameters
        ----------
        capacity : int
            The maximum number of transitions that can be stored in the buffer.
        """
        self.buffer: collections.deque = collections.deque(maxlen=capacity)

    def add(
        self,
        state: np.ndarray,
        action: np.int64,
        reward: float,
        next_state: np.ndarray,
        done: bool,
    ):
        """
        Add a new transition to the buffer.

        Parameters
        ----------
        state : np.ndarray
            The state vector of the added transition.
        action : np.int64
            The action of the added transition.
        reward : float
            The reward of the added transition.
        next_state : np.ndarray
            The next state vector of the added transition.
        done : bool
            The final state of the added transition.
        """
        self.buffer.append((state, action, reward, next_state, done))

    def sample(
        self, batch_size: int
    ) -> Tuple[np.ndarray, Tuple[int], Tuple[float], np.ndarray, Tuple[bool]]:
        """
        Sample a batch of transitions from the buffer.

        Parameters
        ----------
        batch_size : int
            The number of transitions to sample.

        Returns
        -------
        Tuple[np.ndarray, float, float, np.ndarray, bool]
            A batch of `batch_size` transitions.
        """
        # Here, `random.sample(self.buffer, batch_size)`
        # returns a list of tuples `(state, action, reward, next_state, done)`
        # where:
        # - `state`  and `next_state` are numpy arrays
        # - `action` and `reward` are floats
        # - `done` is a boolean
        #
        # `states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))`
        # generates 5 tuples `state`, `action`, `reward`, `next_state` and `done`, each having `batch_size` elements.
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.array(states), actions, rewards, np.array(next_states), dones

    def __len__(self):
        """
        Return the current size of the buffer.

        Returns
        -------
        int
            The current size of the buffer.
        """
        return len(self.buffer)