# Monte Carlo Tree Search (MCTS)

Este é o modelo que constitui o cerne deste projeto. O algoritmo pode ser dividido em 4 fases:

1. **Seleção:** A partir da raiz, percorre-se a árvore selecionando sucessivamente os nós filhos de acordo com uma política (por exemplo, UCT) até chegar a um nó folha.

2. **Expansão:** Se o nó folha não representa um estado terminal, um ou mais filhos são criados e adicionados à árvore.

3. **Simulação:** A partir do novo nó, realiza-se uma simulação (jogadas aleatórias) até que seja alcançado um estado terminal.

4. **Retropropagação:** O resultado da simulação é propagado de volta pela árvore, atualizando os nós visitados de acordo com a estatística selecionada.

Esses passos repetidos um número predefinidos de vezes.

## Aplicação Prática

No contexto deste projeto, cada nó representa uma posição do tabuleiro, e cada aresta uma jogada válida, tendo portanto cada nó como seus filhos os estados que resultam de jogadas válidas. Assim sendo, o modelo é chamado uma vez por jogada, dado o tabuleiro atual, simula uma quantidade consideravél de jogos possíveis e retorna a coluna à qual estiver associada o maior score UCB (estatística explicada mais abaixo).

Segue-se o código da classe MCTS como descrito [neste ficheiro](MCTS/MCTS.py), e uma breve descrição do seu funcionamento:

In [1]:
import math
import random
from typing import Tuple, Any, List, Union

import utils.config as config

from Game.ConnectFour import ConnectFour
from MCTS.node import Node


class MonteCarlo_Single(object):
    """
    Monte Carlo Tree Search algorithm.

    Methods
    -------
    search(root: Node) -> int
        Search the best move from the root node.
     selection(node: Node, turn: int) -> (Node, int)
        Select the best node to expand.
    expansion(node: Node) -> Node
        Expand the node by adding a new child.
    simulation(state_init: ConnectFour, turn: int) -> float
        Simulate a random game from the initial state.
    backpropagation(node: Node, reward: float, turn: int) -> None
        Backpropagate the reward of the simulation to the root node.
    best_child(node: Node) -> Node
        Return the best child of the node.
    """
    def __init__(self, iteration: int = config.ITERATION, exploration: float = config.EXPLORATION, debug: bool = False) -> None:
        """
        Initialize the Monte Carlo Tree Search algorithm.
        """
        self.iteration = iteration
        self.exploration = exploration
        if debug:
            print(f"Monte Carlo Tree Search: iteration={iteration}, exploration={exploration}")

    def search(self, root: Node) -> tuple[Any, list[Any]]:
        """
        Search the best move from the root node.

        Parameters
        ----------
        root: the root node of the search tree

        Returns
        -------
        int: the best move
        """
        for _ in range(self.iteration):
            node, turn = self.selection(root, -1)
            reward = self.simulation(node.state, turn)
            self.backpropagation(node, reward, turn)

        prob = []
        for child in root.children:
            prob.append(child.visits / root.visits)

        ans = max(root.children, key=lambda c: c.visits)
        return ans.state.last_move[1], prob

    def selection(self, node: Node, turn: int) -> tuple[Node, int]:
        """
        Select the best node to expand.

        Parameters
        ----------
        node: the node to start the selection from
        turn: the turn of the player who played the move leading to this node

        Returns
        -------
        node: the node to expand
        turn: the turn of the player who played the move leading to this node
        """
        while not node.is_terminal():
            if not node.fully_explored():
                return self.expansion(node), -1 * turn
            else:
                node = self.best_child(node)
                turn *= -1

        return node, turn

    @staticmethod
    def expansion(node: Node) -> Node:
        """
        Expand the node by adding a new child.

        Parameters
        ----------
        node: the node to expand

        Returns
        -------
        node: the new child
        """
        free_cols = node.state.legal_moves()

        for col in free_cols:
            if col not in node.children_move:
                new_state = node.state.copy()
                new_state.play(col)
                node.add_child(new_state, col)
                break

        return node.children[-1]

    @staticmethod
    def simulation(state_init: ConnectFour, turn: int) -> float:
        """
        Simulate a random game from the initial state.

        Parameters
        ----------
        state_init: the initial state of the game
        turn: the turn of the player who played the move leading to this node

        Returns
        -------
        reward: the reward of the simulated game
        """
        state = state_init.copy()

        while not state.is_over():
            state.play(random.choice(state.legal_moves()))
            turn *= -1

        reward_bool = state.is_over()

        if reward_bool and turn == -1:
            reward = 1.0
        elif reward_bool and turn == 1:
            reward = -1.0
        else:
            reward = 0.0
        return reward

    @staticmethod
    def backpropagation(node: Node, reward: float, turn: int) -> None:
        """
        Backpropagate the reward of the simulation to the root node.

        Parameters
        ----------
        node: the node to start the backpropagation from
        reward: the reward of the simulation
        turn: the turn of the player who played the move leading to this node

        Returns
        -------
        none
        """
        while node is not None:
            node.visits += 1
            node.reward -= turn * reward
            node = node.parent
            turn *= -1

    def best_child(self, node: Node) -> Node:
        """
        Return the best child of the node.

        Parameters
        ----------
        node: the node to select the best child from

        Returns
        -------
        node: the best child
        """
        best_score = -float("inf")
        best_children = None

        for child in node.children:
            exploitation = child.reward / child.visits
            exploration = math.sqrt(math.log2(node.visits) / child.visits)
            score = exploitation + self.exploration * exploration

            if score == best_score:
                if child.visits > best_children.visits:
                    best_children = child
            elif score > best_score:
                best_score = score
                best_children = child

        return best_children

### Explicação das Funções da Classe

- **`__init__`**  
    Inicializa o algoritmo MCTS, definindo o número de iterações e o parâmetro de exploração.

- **`search`**  
    Executa o ciclo principal do MCTS: seleção, expansão, simulação e retropropagação, retornando a melhor jogada e as probabilidades associadas a cada filho da raiz.

- **`selection`**  
    Percorre a árvore a partir de um nó, seguindo a política de seleção (ex: UCB), até encontrar um nó folha ou não totalmente explorado.

- **`expansion`**  
    Expande um nó folha, criando e adicionando um novo filho correspondente a uma jogada ainda não explorada.

- **`simulation`**  
    Realiza uma simulação (jogo aleatório) a partir do estado atual até um estado terminal, retornando a recompensa do resultado.

- **`backpropagation`**  
    Propaga o resultado da simulação de volta pela árvore, atualizando as estatísticas dos nós visitados.

- **`best_child`**  
    Seleciona o melhor filho de um nó com base no critério de exploração/exploração (UCB), para guiar a busca nas próximas iterações.

Ou seja, a escolha de uma jogada (executada pela função `search`) executa as 4 fases mencionadas acima, iteradas tantas vezes quantas as definidas no ficheiro [config.py](utils/config.py).

### UCB
Utilizamos como métrica para avaliação dos nodes o UCB, a estatística mais comum em implementações de MCTS, e que pode ser descrita pela seguinte fórmula:

$$
UCB = \frac{w_i}{s_i} + c \sqrt{\frac{\ln s_p}{s_i}}
$$

onde:

- $w_i$: número de vitórias do nó $i$
- $s_i$: número de simulações do nó $i$
- $s_p$: número de simulações do nó pai
- $c$: parâmetro de *exploitation*

Em que $c$ constitui uma constante que equilibra o quanto a *exploration* é favorecida sobre a *exploitation*. Esta constante, neste projeto está também definida no ficheiro de [configuração](utils/config.py) e toma o valor de $1.414$, isto é, aproximadamente $\sqrt{2}$, também um valor muito usado por defeito nestas áreas.

Em específico, o UCB é calculado nestas linhas do ficheiro [MCTS.py](MCTS/MCTS.py), na função `best_child`:

```python
exploitation = child.reward / child.visits
exploration = math.sqrt(math.log2(node.visits) / child.visits)
score = exploitation + self.exploration * exploration
```

### Escolha da próxima jogada
Acabadas as iterações do MCTS, cada jogada válida tem a si associada dois valores:

* **Visited (V):** Representa a quantidade de simulações feitas sobre esse nó nas iterações de de MCTS. Vai ser, portanto, $n \mid n \in \mathbb{N},\ 0 \leq n \leq \text{iter}$ em que $iter$ é o número de iterações do MCTS.
* **Reward:** Representa o somatório dos rewards das iterações de MTCS. Um reward em cada iteração é definido por:
    * **1** se o próprio ganha;
    * **0** se há um empate;
    * **-1** se o adversário ganha.

Resta apenas definir o critério de escolha, isto é, se consideramos o melhor lance aquele que foi o mais visitado, o mais recompensado ou um equilíbrio das duas características.

Testando as várias hipóteses, no entanto, verificamos que a melhor performance é obtida ao priorizar pelos visitados. Isto provavelmente deve-se a um nó ser mais visitado significar que obteve mais score UCB mais vezes, provando ser o mais promissor.

### Tempo de execução
Ao constatar, naturalmente, que um número superior de iterações significa uma melhor performance, tentou-se executar modelos com mais e mais iterações. No entanto, neste processo o modelo começou também a demorar cada vez mais tempo a calcular a sua próxima jogada, o que piora a experiência de jogo.

No sentido de corrigir isto, implementou-se uma outra classe de MCTS que utiliza *paralel processing*, executando várias iterações ao mesmo tempo, e sendo por isso significativamente mais rápido que a sua contraparte, que utiliza apenas um *core* para todas as tarefas.

Eis essa outra implementação, denominada `MonteCarlo` no ficheiro [MCTS_optimized.py](MCTS/MCTS_optimized.py):

In [1]:
import math
import random
import os
from concurrent.futures import ProcessPoolExecutor
from typing import Tuple, Any, Dict

import utils.config as config
from Game.ConnectFour import ConnectFour
from MCTS.node import Node


def worker_mcts(state: ConnectFour, iterations: int, exploration: float) -> Dict[int, Tuple[float, int]]:
    """
    Each worker runs its own mini-MCTS rooted at the same state.
    Returns: {move: (total_reward, total_visits)}
    """
    root = Node(state.copy())

    def selection(node: Node, turn: int) -> Tuple[Node, int]:
        while not node.is_terminal():
            if not node.fully_explored():
                return expansion(node), -1 * turn
            node = best_child(node)
            turn *= -1
        return node, turn

    def expansion(node: Node) -> Node:
        for col in node.state.legal_moves():
            if col not in node.children_move:
                new_state = node.state.copy()
                new_state.play(col)
                node.add_child(new_state, col)
                break
        return node.children[-1]

    def simulation(state: ConnectFour, turn: int, max_depth: int = 20) -> float:
        state = state.copy()
        moves = 0
        while not state.is_over() and moves < max_depth:
            legal = state.legal_moves()
            if 3 in legal:
                state.play(3)
            else:
                state.play(random.choice(legal))
            turn *= -1
            moves += 1

        if state.is_over():
            return 1.0 if turn == -1 else -1.0
        return 0.0

    def backpropagation(node: Node, reward: float, turn: int) -> None:
        while node is not None:
            node.visits += 1
            node.reward -= turn * reward
            node = node.parent
            turn *= -1

    def best_child(node: Node) -> Node:
        best_score = -float("inf")
        best_node = None
        log_visits = math.log(node.visits + 1)
        for child in node.children:
            exploit = child.reward / (child.visits + 1e-8)
            explore = math.sqrt(log_visits / (child.visits + 1e-8))
            score = exploit + exploration * explore

            if score == best_score:
                if child.visits >= best_node.visits:
                    best_node = child
            elif score > best_score:
                best_score = score
                best_node = child
        return best_node

    for _ in range(iterations):
        node, turn = selection(root, -1)
        reward = simulation(node.state, turn)
        backpropagation(node, reward, turn)

    move_stats = {}
    for _, child in enumerate(root.children):
        move = child.state.last_move[1]
        move_stats[move] = (child.reward, child.visits)

    return move_stats


class MonteCarlo:


    def __init__(self, iteration: int = config.ITERATION, exploration: float = config.EXPLORATION, debug: bool = False):
        
        self.iteration = iteration
        self.exploration = exploration
        self.cpu_cores = max(1, os.cpu_count() or 1)
        self.debug = debug

        
        if self.debug:
            print(f"Using {self.cpu_cores} CPU cores for MCTS.")
            print(f"Iterations per worker: {self.iteration // self.cpu_cores}")
            print(f"Exploration factor: {self.exploration}")
            print(f"Total iterations: {self.iteration}")
            

        

    def search(self, root: Node) -> tuple[Any, list[Any]]:
        iterations_per_worker = self.iteration // self.cpu_cores

        with ProcessPoolExecutor(max_workers=self.cpu_cores) as executor:
            futures = [executor.submit(worker_mcts, root.state, iterations_per_worker, self.exploration)
                       for _ in range(self.cpu_cores)]

            all_stats = [f.result() for f in futures]

        merged_stats: Dict[int, Tuple[float, int]] = {}

        for stat in all_stats:
            for move, (reward, visits) in stat.items():
                if move not in merged_stats:
                    merged_stats[move] = (reward, visits)
                else:
                    r, v = merged_stats[move]
                    merged_stats[move] = (r + reward, v + visits)

        for move, (reward, visits) in merged_stats.items():
            found = False
            for child in root.children:
                if child.state.last_move[1] == move:
                    child.reward += reward
                    child.visits += visits
                    found = True
                    break
            if not found:
                new_state = root.state.copy()
                new_state.play(move)
                new_child = Node(new_state, root)
                new_child.reward = reward
                new_child.visits = visits
                root.children.append(new_child)
                root.children_move.append(move)
            root.visits += visits

        prob = [child.visits / root.visits for child in root.children]

        ans = max(root.children, key=lambda c: c.visits)
        return ans.state.last_move[1], prob

    def best_child(self, node: Node) -> Node:
        best_score = -float("inf")
        best_children = None
        log_parent_visits = math.log(node.visits + 1)

        for child in node.children:
            exploitation = child.reward / (child.visits + 1e-8)
            exploration = math.sqrt(log_parent_visits / (child.visits + 1e-8))
            score = exploitation + self.exploration * exploration

            if score == best_score:
                if child.visits >= best_children.visits:
                    best_children = child
            elif score > best_score:
                best_score = score
                best_children = child

        return best_children

Mesmo assim, provou-se que *paralel processing* não é sempre a solução mais rápida. Isto acontece porque, essencialmente cada *core* do CPU está a realizar MCTS na sua própria árvore e, apenas quando **todos os *cores*** já tenham acabado as suas tarefas é que todas as árvores podem ser unidas, o que computacionalmente é trabalhoso.

Assim, estudando este tema mais a fundo, chegou-se a conclusão que a classe `MonteCarlo` é mais eficiente que a class `MCTS` a partir das **5000 iterações**. Assim sendo, dependedo do modo de jogo selecionado, e dependendo da dificuldade selecionada (ou do número de iterações selecionadas no caso de AI vs AI) seleciona-se o modelo mais rápido para a situação.

### Visualização da árvore

Finalmente, no sentido de melhor compreender o funcionamento do modelo, e mesmo para *debugging*, criou-se um ficheiro [Visualize_MCtree.py](utils/Visualize_MCtree.py) que, ao ser implementado na [GameMain](GameMain.py) permite ao jogador ativar uma nova funcionalidade do **Modo de Debugging**.

Este modo ativa prints de debug, que contêm informações como:

- O tempo de resposta da IA
- Quantos *cores* do CPU estão a ser usados (se estiver a ser usado mais que um)
- O *score* dos vários nós
- Entre outros

Para além disso, selecionando o modo Player vs AI e jogando contra o modelo, a cada lance feito pelo MTCS, desenhamos um grafo que representa o estado atual e os próximos estados possíveis, bem como as estatísticas a eles associados.