# Projet - Reinforcement Learning

## 1 - Librairies 

In [None]:
# Calculs numériques, algèbre, aléatoire
import numpy as np 
from random import sample
from numpy.random import dirichlet

# Arbres
from anytree import NodeMixin

# Keras (réseau de neurones)
from keras.engine.training import Model

# Échecs
import chess

# Affichage 
from IPython.display import display

# Utilitaires
import json
import os

## 2 - Fonctions utilitaires

In [None]:
"""
Function format_input_NN(Chess.Board()) :

Format Chess.Board() objects as an input for the Neural Network
"""


def format_input_NN(chess_board):

    """
    :return: a representation of the board using an (18, 8, 8) shape, good as input to a policy / value network
    """

    pieces_order = 'KQRBNPkqrbnp' # 12x8x8
    castling_order = 'KQkq'       # 4x8x8
    # fifty-move-rule             # 1x8x8
    # en en_passant               # 1x8x8

    ind = {pieces_order[i]: i for i in range(12)}

    def canon_input_planes(fen):
        """

        :param fen:
        :return : (18, 8, 8) representation of the game state
        """
        fen = maybe_flip_fen(fen, is_black_turn(fen))
        return all_input_planes(fen)

    def all_input_planes(fen):
        current_aux_planes = aux_planes(fen)

        history_both = to_planes(fen)

        ret = np.vstack((history_both, current_aux_planes))
        assert ret.shape == (18, 8, 8)
        return ret

    def to_planes(fen):
        board_state = replace_tags_board(fen)
        pieces_both = np.zeros(shape=(12, 8, 8), dtype=np.float32)
        for rank in range(8):
            for file in range(8):
                v = board_state[rank * 8 + file]
                if v.isalpha():
                    pieces_both[ind[v]][rank][file] = 1
        assert pieces_both.shape == (12, 8, 8)
        return pieces_both

    def aux_planes(fen):
        foo = fen.split(' ')

        en_passant = np.zeros((8, 8), dtype=np.float32)
        if foo[3] != '-':
            eps = alg_to_coord(foo[3])
            en_passant[eps[0]][eps[1]] = 1

        fifty_move_count = int(foo[4])
        fifty_move = np.full((8, 8), fifty_move_count, dtype=np.float32)

        castling = foo[2]
        auxiliary_planes = [np.full((8, 8), int('K' in castling), dtype=np.float32),
                            np.full((8, 8), int('Q' in castling), dtype=np.float32),
                            np.full((8, 8), int('k' in castling), dtype=np.float32),
                            np.full((8, 8), int('q' in castling), dtype=np.float32),
                            fifty_move,
                            en_passant]

        ret = np.asarray(auxiliary_planes, dtype=np.float32)
        assert ret.shape == (6, 8, 8)
        return ret

    def replace_tags_board(board_san):
        board_san = board_san.split(" ")[0]
        board_san = board_san.replace("2", "11")
        board_san = board_san.replace("3", "111")
        board_san = board_san.replace("4", "1111")
        board_san = board_san.replace("5", "11111")
        board_san = board_san.replace("6", "111111")
        board_san = board_san.replace("7", "1111111")
        board_san = board_san.replace("8", "11111111")
        return board_san.replace("/", "")

    def is_black_turn(fen):
        return fen.split(" ")[1] == 'b'

    def alg_to_coord(alg):
        rank = 8 - int(alg[1])        # 0-7
        file = ord(alg[0]) - ord('a') # 0-7
        return rank, file

    def maybe_flip_fen(fen, flip = False):
        if not flip:
            return fen
        foo = fen.split(' ')
        rows = foo[0].split('/')
        def swapcase(a):
            if a.isalpha():
                return a.lower() if a.isupper() else a.upper()
            return a
        def swapall(aa):
            return "".join([swapcase(a) for a in aa])
        return "/".join([swapall(row) for row in reversed(rows)]) \
            + " " + ('w' if foo[1] == 'b' else 'b') \
            + " " + "".join(sorted(swapall(foo[2]))) \
            + " " + foo[3] + " " + foo[4] + " " + foo[5]

    return canon_input_planes(chess_board.fen())



"""
Function flipped_uci_labels() :


"""


def flipped_uci_labels():
    """
    Seems to somehow transform the labels used for describing the universal chess interface format, putting
    them into a returned list.
    :return:
    """
    def repl(x):
        return "".join([(str(9 - int(a)) if a.isdigit() else a) for a in x])

    return [repl(x) for x in create_uci_labels()]


"""
Function create_uci_labels() :


"""

def create_uci_labels():
    """
    Creates the labels for the universal chess interface into an array and returns them
    :return:
    """
    labels_array = []
    letters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
    numbers = ['1', '2', '3', '4', '5', '6', '7', '8']
    promoted_to = ['q', 'r', 'b', 'n']

    for l1 in range(8):
        for n1 in range(8):
            destinations = [(t, n1) for t in range(8)] + \
                           [(l1, t) for t in range(8)] + \
                           [(l1 + t, n1 + t) for t in range(-7, 8)] + \
                           [(l1 + t, n1 - t) for t in range(-7, 8)] + \
                           [(l1 + a, n1 + b) for (a, b) in
                            [(-2, -1), (-1, -2), (-2, 1), (1, -2), (2, -1), (-1, 2), (2, 1), (1, 2)]]
            for (l2, n2) in destinations:
                if (l1, n1) != (l2, n2) and l2 in range(8) and n2 in range(8):
                    move = letters[l1] + numbers[n1] + letters[l2] + numbers[n2]
                    labels_array.append(move)
    for l1 in range(8):
        l = letters[l1]
        for p in promoted_to:
            labels_array.append(l + '2' + l + '1' + p)
            labels_array.append(l + '7' + l + '8' + p)
            if l1 > 0:
                l_l = letters[l1 - 1]
                labels_array.append(l + '2' + l_l + '1' + p)
                labels_array.append(l + '7' + l_l + '8' + p)
            if l1 < 7:
                l_r = letters[l1 + 1]
                labels_array.append(l + '2' + l_r + '1' + p)
                labels_array.append(l + '7' + l_r + '8' + p)
    return labels_array

## 3 - Chargement du réseau de neurones

In [None]:
def load_model():

    config_path = "/content/Reinforcement-Learning-AlphaZero/codes/model_config.json"
    weight_path = "/content/Reinforcement-Learning-AlphaZero/codes/model_weights.h5"

    with open(config_path, "rt") as f:
        model = Model.from_config(json.load(f))
        model.load_weights(weight_path)

    return model



def evaluate_position(model, position):

    input = np.array([format_input_NN(position)])
    p,v = model.predict(input)

    if not position.turn : # on prend toujours la perspective des blancs pour simplifier MCTS
        v = -v

    return p, v

## 4 - Monte Carlo Tree Search (MCTS)

In [None]:
'''
MCTS_nn.py

Contient les classes node_nn et mcts_nn qui permettent d'implémenter 
une recherche arborescente Monte Carlo améliorée par un réseau de neurone 
pré-entraîné qui fournit une politique d'expert.
'''

class node_nn(NodeMixin):

    '''
    Classe node_nn

    Cette classe correspond aux noeuds de l'arbre. Elle est caractérisée par 5 attributs :

        - move : chess.Move, coup correspondant au noeud
        - N : entier, nombre de fois où le noeud a été visité
        - V : réel, somme des valuations de la position
        - prob : réel, probabilité de choisir le coup correspondant au noeud sachant qu'on était dans la position précédente
        - parent : le noeud correspondant à la position précédente
    '''

    def __init__(self,move=None,parent=None,prob=0,V=0):

        self.move = move 
        self.N = 0
        self.V = V
        self.prob = prob
        self.parent = parent

    '''
    Fonction score(node_nn(), white_to_play)

    Argument :
        - white_to_play : booléen, vrai si c'est aux blancs de jouer

    Sortie :
        - Score qui permet de choisir quel noeud choisir lors de la phase de sélection de l'algorithme MCTS
    '''

    def score(self,white_to_play):

        if white_to_play:
            relative_V = self.V

        else:
            relative_V = -self.V

        return relative_V/(self.N or 1) + 1.5 * self.prob * np.sqrt(self.parent.N) / (1 + self.N)
        

class mcts_nn():

    '''
    Classe mcts_nn

    Cette classe correspond à l'arbre dans lequel on effectue MCTS. Elle est caractérisée par 6 attributs :

        - initial_position : chess.Board(), position dans laquelle on est réellement
        - current_position : chess.Board(), variable utilisé pour stocker les différentes positions courantes rencontrées dans MCTS
        - root : node_nn(), noeud correspondant à la position initial_position
        - model : modèle keras, le réseau de neurone utilisé pour cacluler les valuations des positions et les probabilités conditionnelles
        - moves_w : tous les coups jouables pour les blancs
        - moves_b : tous les coups jouables pour les noirs
    '''

    def __init__(self,position):

        self.initial_position = position.copy() 
        self.current_position = position.copy()
        self.root = node_nn()
        self.model = load_model()
        self.moves_w = [chess.Move.from_uci(move) for move in create_uci_labels()]
        self.moves_b = [chess.Move.from_uci(move) for move in flipped_uci_labels()]

    '''
    Fonction selection(mcts_nn())

    Arguments :

    Sorties :
        - un noeud de l'arbre qui est une feuille

    Description : 
        Cette fonction permet, à partir de la racine de l'arbre, de sélectionner une feuille en parcourant les noeuds 
        ayant les scores les plus hauts. La variable current_position est parallèlement mise à jour en fonction des noeuds empruntés.
        On retourne finalement le noeud dans lequel on aboutit (c'est nécessairement une feuille).
    '''
    
    def selection(self): 

        current_node = self.root

        while current_node.is_leaf is not True: # tant qu'on est pas arrivé dans une feuille

            white_to_play = self.current_position.turn
            score = [child.score(white_to_play) for child in current_node.children] # calcul des scores des noeuds enfants du noeud courant
            index = sample([i for i, j in enumerate(score) if j == max(score)],1)[0] # si plusieurs scores sont maximaux on en tire un au hasard parmi ces noeuds
            current_node = current_node.children[index] # mise à jour du noeud courant
            self.current_position.push(current_node.move) # mise à jour de la position courante 

        leaf = current_node 

        return leaf

    '''
    Fonction expansion_backprop(mcts_nn(), leaf)

    Arguments :
        - leaf : node_nn(), noeud correspondant à la sortie de la fonction sélection(mcts_nn)

    Sorties :

    Description : 
        Cette fonction correspond à deux phases de l'algorithme MCTS : phase d'expansion/simulation et phase de rétropropagation.
        EXPANSION/SIMULATION : 
            Dans une feuille, on procède à la phase d'expansion. La position correspondant à cette feuille est évaluée par le réseau de neurones.
            On crée ensuite tous les noeuds enfants possibles (ceux correspondant à des coups légaux)
        RETROPROPAGATION :
            On fait remonter les statistiques pertinententes pour le calcul des scores dans les noeuds parents.
    '''

    def expansion_backprop(self,leaf): 

        outcome = self.current_position.outcome() # issue de la partie, None si la partie n'est pas finie

        # EXPANSION

        legal_moves = self.current_position.legal_moves # génération des coups légaux

        if outcome is None: # si la partie n'est pas terminée

            dirichlet_noise, inc = dirichlet([0.03]*legal_moves.count()), 0 # bruit tiré selon une loi de dirichlet
            p,v = evaluate_position(self.model, self.current_position) # évaluation de la position à l'aide du réseau de neurones
            p = p[0]
            v = v[0,0]
            leaf.V += v # mise à jour de la valuation de la feuille

            for move in legal_moves: # création des nouveaux noeuds correspondants aux coups légaux

                if self.current_position.turn : # si c'est aux blancs de jouer
                    prob = p[self.moves_w.index(move)]

                else : # si c'est aux noirs de jouer
                    prob = p[self.moves_b.index(move)]

                prob = 0.75 * prob + 0.25 * dirichlet_noise[inc] # ajout du bruit
                inc += 1

                node_nn(move=move,parent=leaf,prob=prob) # création des noeuds enfants

            # RETROPROPAGATION

            for ancestor in leaf.iter_path_reverse(): # rétropropagation de la valuation
                ancestor.N += 1 # mise à jour du nombre de visites des ancêtres
                ancestor.V += v # mise à jour des valuations des ancêtres

        self.current_position = self.initial_position.copy() # on initialise la position courante 

        return 

## 5 - Classes pour jouer 

In [None]:
class game():


    def __init__(self):

        self.board = chess.Board()


    def play_random(self):

        random_move = sample(list(self.board.legal_moves),1)[0]
        self.board.push(random_move)

        display(self.board)

        return


    def play_mcts_nn(self, nb_simul):

        MCTS = mcts_nn(self.board)

        for i in range(nb_simul):
            MCTS.expansion_backprop(MCTS.selection())

        N = [child.N for child in MCTS.root.children]
        index = sample([i for i, j in enumerate(N) if j == max(N)],1)[0] 
        move = [child.move for child in MCTS.root.children][index]
        self.board.push(move)

        display(self.board)

        return 

## 6 - Démonstrations