# Evaluation Notebook for the AIZero Chess Bot

- Bot prediction from Fen string
- Bot plays against itself
- Bot vs Human
- Bot vs Baseline Bot

In [None]:
%pip install torch chess

In [None]:
import time

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Literal, Optional
from chess import Board, Move, WHITE, BLACK, Color


TIME_TO_THINK = 0.5  # seconds


class ChessBot(ABC):
    def __init__(self, name: str) -> None:
        """Initializes the bot with a name."""
        self.name = name
        self.start_time = 0.0

    @abstractmethod
    def think(self, board: Board) -> Move:
        """This method is called when it's the bot's turn to move. It should return the move that the bot wants to make."""
        raise NotImplementedError('Subclasses must implement this method')

    @property
    def time_elapsed(self) -> float:
        """Returns the time elapsed since the bot started thinking."""
        return time.time() - self.start_time

    @property
    def time_remaining(self) -> float:
        """
        Determines the time remaining for the bot to think.

        :return: The time remaining in seconds.
        """
        return TIME_TO_THINK - self.time_elapsed

    @property
    def time_is_up(self) -> bool:
        """Determines if the bot has run out of time to think."""
        return self.time_remaining <= 0

    def restart_clock(self) -> None:
        """Restarts the clock for the bot."""
        self.start_time = time.time()


@dataclass
class GameResult:
    winner: Optional[Color]
    result: Literal['1-0', '0-1', '1/2-1/2', 'unfinished']

    @staticmethod
    def from_board(board: Board) -> 'GameResult':
        if board.is_checkmate():
            result = '1-0' if board.turn == BLACK else '0-1'
            return GameResult(board.turn, result)

        if board.is_game_over():
            return GameResult(None, '1/2-1/2')

        return GameResult(None, 'unfinished')


class GameManager:
    def __init__(self, white: ChessBot, black: ChessBot) -> None:
        """Initializes the game manager with two players."""
        self.white = white
        self.black = black

    def play_game(self, verify_moves=True) -> GameResult:
        """Manages the gameplay loop until the game is over or a player quits."""
        board = Board()
        
        while not board.is_game_over():
            current_player = self.white if board.turn == WHITE else self.black

            current_player.restart_clock()
            move = current_player.think(board)

            if verify_moves and move not in board.legal_moves:
                raise ValueError(f'Invalid move {move} for player {current_player.name}')

            board.push(move)

        return GameResult.from_board(board)

In [None]:
from chess import Board, Move
import chess.svg
from IPython.display import display, clear_output


class HumanPlayer(ChessBot):
    def __init__(self) -> None:
        """Initializes the human player."""
        super().__init__('Human')

    def think(self, board: Board) -> Move:
        """Allows a human player to input a move using the GUI."""
        boardsvg = chess.svg.board(board, size=350)
        clear_output(wait=True)
        display(boardsvg)
        
        print('Legal moves:', [move.uci() for move in board.legal_moves])
        while True:
            move = input('Enter your move: ')
            try:
                move = Move.from_uci(move)
                if move in board.legal_moves:
                    return move
                print('Invalid move. Try again.')
            except ValueError:
                print('Invalid move. Try again.')
        

In [None]:
import torch
import numpy as np
import torch.nn.functional as F

from numpy.typing import NDArray
from torch import nn, Tensor, softmax


ROW_COUNT = 8
COLUMN_COUNT = 8
NUM_RES_BLOCKS = 8
NUM_HIDDEN = 256

ENCODING_CHANNELS = 6 + 6  # 6 channels for the pieces of the current player and 6 channels for the pieces of the opponent
ACTION_SIZE = 1968


class Network(nn.Module):
    """
    The neural network model for the AlphaZero bot.

    The architecture is based on the AlphaZero paper, but with less layers.

    We use a residual neural network with 8 residual blocks.
    The input to the network is a 12x8x8 tensor representing the board state with 6 channels for the pieces of the current player and 6 channels for the pieces of the opponent.
    The output of the network is a policy over all possible moves and a value for the current board state.

    The amount of parameters in the network is ~13.5 million (13.591.258).
    """

    def __init__(self) -> None:
        super().__init__()

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.startBlock = nn.Sequential(
            nn.Conv2d(ENCODING_CHANNELS, NUM_HIDDEN, kernel_size=3, padding=1),
            nn.BatchNorm2d(NUM_HIDDEN),
            nn.ReLU(),
        )

        self.backBone = nn.ModuleList([ResBlock(NUM_HIDDEN) for _ in range(NUM_RES_BLOCKS)])

        self.policyHead = nn.Sequential(
            nn.Conv2d(NUM_HIDDEN, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(32 * ROW_COUNT * COLUMN_COUNT, ACTION_SIZE),
        )

        self.valueHead = nn.Sequential(
            nn.Conv2d(NUM_HIDDEN, 3, kernel_size=3, padding=1),
            nn.BatchNorm2d(3),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3 * ROW_COUNT * COLUMN_COUNT, 1),
            nn.Tanh(),
        )

        self.to(self.device)

    def forward(self, x: Tensor) -> tuple[Tensor, Tensor]:
        x = self.startBlock(x)
        for resBlock in self.backBone:
            x = resBlock(x)
        policy = self.policyHead(x)
        value = self.valueHead(x)
        return policy, value

    def inference(self, x: Tensor) -> tuple[NDArray[np.float32], NDArray[np.float32]]:
        result: tuple[Tensor, Tensor] = self(x)
        policy, value = result
        policy = softmax(policy, dim=1).cpu().numpy()
        value = value.squeeze(1).cpu().numpy()
        return policy, value


class ResBlock(nn.Module):
    def __init__(self, num_hidden: int) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(num_hidden, num_hidden, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(num_hidden)
        self.conv2 = nn.Conv2d(num_hidden, num_hidden, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(num_hidden)

    def forward(self, x: Tensor) -> Tensor:
        residual = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += residual
        x = F.relu(x)
        return x

In [None]:
def get_best_model_path() -> str:
    with open('models/last_training_config.pt') as f:
        items = {line.split('=')[0]: line.split('=')[1] for line in f.readlines()}
        return items['model'].strip()

In [None]:
from chess import PIECE_TYPES, SQUARES, square_rank, square_file, WHITE, BLACK, QUEEN, ROOK, BISHOP, KNIGHT, PAWN, PieceType, Square, square

def encode_board(board: Board) -> Tensor:
    encodedBoard = torch.zeros((ENCODING_CHANNELS, ROW_COUNT, COLUMN_COUNT), dtype=torch.float32)

    for color in [WHITE, BLACK]:
        for piece in PIECE_TYPES:
            layerIndex = color * 6 + piece - 1
            bitboard = board.pieces_mask(piece, color)
            
            for square in SQUARES:
                row = square_rank(square)
                col = square_file(square)
                if bitboard & (1 << square):
                    encodedBoard[layerIndex][row][col] = 1
                    
    return encodedBoard

def encode_boards(boards: list[Board]) -> Tensor:
    return torch.stack([encode_board(board) for board in boards])


def filter_policy_then_get_moves_and_probabilities(
    policy: NDArray[np.float32], board: Board
) -> list[tuple[Move, float]]:
    """
    Gets a list of moves with their corresponding probabilities from a policy.

    The policy is a 1D numpy array representing the probabilities of each move
    in the board. The list of moves is a list of tuples, where each tuple contains
    a move and its corresponding probability.

    :param policy: The policy to get the moves and probabilities from.
    :param board: The chess board to filter the policy with.
    :return: The list of moves with their corresponding probabilities.
    """
    filtered_policy = __filter_policy_with_legal_moves(policy, board)
    moves_with_probabilities = __map_policy_to_moves(filtered_policy)
    return moves_with_probabilities


def encode_move(move: Move) -> int:
    """
    Encodes a chess move into a move index.

    :param move: The move to encode.
    :param current_player: The current player to encode the move for.
    :return: The encoded move index.
    """
    if move.promotion not in __MOVE_MAPPINGS[move.from_square][move.to_square]:
        raise ValueError(f'Error: move.promotion not in MOVE_MAPPINGS[move.from_square][move.to_square]: {move}')

    return __MOVE_MAPPINGS[move.from_square][move.to_square][move.promotion]


def decode_move(move_index: int) -> Move:
    """
    Decodes a move index into a chess move.

    :param move_index: The index of the move to decode.
    :return: The decoded chess move.
    """
    from_square, to_square, promotion_type = __REVERSE_MOVE_MAPPINGS[move_index]
    return Move(from_square, to_square, promotion=promotion_type)


def decode_moves(move_indices: NDArray[np.int32]) -> list[Move]:
    """
    Decodes an array of move indices into a list of chess moves.

    :param move_indices: The array of move indices to decode.
    :return: The list of decoded chess moves.
    """
    moves = [__REVERSE_MOVE_MAPPINGS[index] for index in move_indices]
    return [Move(from_square, to_square, promotion=promotion_type) for from_square, to_square, promotion_type in moves]


def __precalculate_move_mappings() -> tuple[list[list[dict[PieceType | None, int]]], int]:
    KNIGHT_MOVES = [(-2, -1), (-2, 1), (-1, -2), (-1, 2), (1, -2), (1, 2), (2, -1), (2, 1)]
    ROOK_MOVES = [(0, 1), (0, -1), (1, 0), (-1, 0)]
    BISHOP_MOVES = [(1, 1), (1, -1), (-1, 1), (-1, -1)]

    move_mappings: list[list[dict[PieceType | None, int]]] = [[{} for _ in range(64)] for _ in range(64)]
    index = 0

    def add_move(from_square: Square, to_square: Square, promotion_type: PieceType | None) -> None:
        nonlocal index
        move_mappings[from_square][to_square][promotion_type] = index
        index += 1

    def add_promotion_moves(from_square: Square, col: int, to_row: int) -> None:
        for offset in (-1, 0, 1):
            if 0 <= col + offset < 8:
                to_square = square(col + offset, to_row)
                add_move(from_square, to_square, QUEEN)
                add_move(from_square, to_square, ROOK)
                add_move(from_square, to_square, BISHOP)
                add_move(from_square, to_square, KNIGHT)

    for row in range(8):
        for col in range(8):
            from_square = square(col, row)

            # Calculate knight moves from this square
            for dx, dy in KNIGHT_MOVES:
                if 0 <= row + dx < 8 and 0 <= col + dy < 8:  # Check if move is within bounds
                    to_square = square(col + dy, row + dx)
                    add_move(from_square, to_square, None)

            # Calculate rook moves from this square
            for dx, dy in ROOK_MOVES:
                for i in range(1, 8):
                    if 0 <= row + i * dx < 8 and 0 <= col + i * dy < 8:
                        to_square = square(col + i * dy, row + i * dx)
                        add_move(from_square, to_square, None)

            # Calculate bishop moves from this square
            for dx, dy in BISHOP_MOVES:
                for i in range(1, 8):
                    if 0 <= row + i * dx < 8 and 0 <= col + i * dy < 8:
                        to_square = square(col + i * dy, row + i * dx)
                        add_move(from_square, to_square, None)

            # Calculate pawn promotion moves from this square
            if row == 1:
                add_promotion_moves(from_square, col, row - 1)
            elif row == 6:
                add_promotion_moves(from_square, col, row + 1)

    return move_mappings, index


def __precalculate_reverse_move_mappings(
    move_mappings: list[list[dict[PieceType | None, int]]],
) -> list[tuple[Square, Square, PieceType | None]]:
    reverse_move_mappings: list[tuple[Square, Square, PieceType | None]] = [None] * ACTION_SIZE  # type: ignore

    for from_square, moves in enumerate(move_mappings):
        for to_square, promotional_mapping in enumerate(moves):
            for promotion_type, index in promotional_mapping.items():
                reverse_move_mappings[index] = (from_square, to_square, promotion_type)

    return reverse_move_mappings


__MOVE_MAPPINGS, ACTION_SIZE = __precalculate_move_mappings()
__REVERSE_MOVE_MAPPINGS = __precalculate_reverse_move_mappings(__MOVE_MAPPINGS)


def __encode_legal_moves(board: Board) -> NDArray[np.int8]:
    """
    Encodes the legal moves of a chess board into a 1D numpy array.

    Each entry in the array represents a possible move on the board. If the
    corresponding move is legal, the entry is 1, and 0 otherwise. The array
    has a length of TOTAL_MOVES, representing all possible moves from all squares
    to all reachable squares.

    :param board: The chess board to encode.
    :return: A 1D numpy array representing the encoded legal moves.
    """
    # Initialize a 1D array filled with zeros
    # There are TOTAL_MOVES possible moves
    legal_moves_encoded = np.zeros(ACTION_SIZE, dtype=np.int8)

    # Iterate over all legal moves available in the position
    for move in board.legal_moves:
        legal_moves_encoded[encode_move(move)] = 1

    return legal_moves_encoded


def __filter_policy_with_legal_moves(policy: NDArray[np.float32], board: Board) -> NDArray[np.float32]:
    """
    Filters a policy with the legal moves of a chess board.

    The policy is a 1D numpy array representing the probabilities of each move
    in the board. The legal moves are encoded in a 1D numpy array, where each
    entry is 1 if the corresponding move is legal, and 0 otherwise. The policy
    is then filtered to only include the probabilities of the legal moves.

    :param policy: The policy to filter.
    :param board: The chess board to filter the policy with.
    :return: The filtered policy.
    """
    legal_moves_encoded = __encode_legal_moves(board)
    policy *= legal_moves_encoded
    policy /= np.sum(policy)
    return policy


def __map_policy_to_moves(policy: NDArray[np.float32]) -> list[tuple[Move, float]]:
    """
    Maps a filtered policy to a list of moves with their corresponding probabilities.

    The policy is a 1D numpy array representing the probabilities of each move
    in the board. The list of moves is a list of tuples, where each tuple contains
    a move and its corresponding probability.

    :param policy: The policy to map.
    :return: The list of moves with their corresponding probabilities.
    """
    # Find indices where probability > 0
    nonzero_indices = np.nonzero(policy > 0)[0]

    # Decode all moves at once
    moves = decode_moves(nonzero_indices)

    # Pair up moves with their probabilities
    moves_with_probabilities = list(zip(moves, policy[nonzero_indices]))

    return moves_with_probabilities

In [None]:
from __future__ import annotations


class AlphaMCTSNode:
    @classmethod
    def root(cls, board: Board) -> AlphaMCTSNode:
        instance = cls(policy=1.0, move_to_get_here=Move.null(), parent=None, num_played_moves=0)
        instance.board = board
        instance.number_of_visits = 1.0
        return instance

    def __init__(
        self, policy: float, move_to_get_here: Move, parent: AlphaMCTSNode | None, num_played_moves: int
    ) -> None:
        self.board: Board = None  # type: ignore
        self.parent = parent
        self.children: list[AlphaMCTSNode] = []
        self.move_to_get_here = move_to_get_here
        self.num_played_moves = num_played_moves  # This is the number of moves played to get to this node
        self.number_of_visits = 0.0001  # Prevent division by zero
        self.result_score = -1.0
        self.policy = policy

    def init(self) -> None:
        """Initializes the node by creating a board if it doesn't have one."""
        if not self.board:
            if not self.parent or not self.parent.board:
                raise ValueError('Parent node must have a board')

            self.board = self.parent.board.copy(stack=False)
            self.board.push(self.move_to_get_here)

    @property
    def is_terminal_node(self) -> bool:
        return self.board is not None and self.board.is_game_over()

    @property
    def is_fully_expanded(self) -> bool:
        return len(self.children) > 0

    def expand(self, moves_with_scores: list[tuple[Move, float]]) -> None:
        self.children = [
            AlphaMCTSNode(score, move, parent=self, num_played_moves=self.num_played_moves + 1)
            for move, score in moves_with_scores
        ]

        # Convert to NumPy arrays
        self.children_number_of_visits = np.array([child.number_of_visits for child in self.children], dtype=np.float32)
        self.children_result_scores = np.array([child.result_score for child in self.children], dtype=np.float32)
        self.children_policies = np.array([child.policy for child in self.children], dtype=np.float32)

    def back_propagate(self, result: float) -> None:
        self.number_of_visits += 1.0
        self.result_score += result
        if self.parent:
            child_index = self.parent.children.index(self)
            self.parent.children_number_of_visits[child_index] += 1.0
            self.parent.children_result_scores[child_index] += result
            self.parent.back_propagate(result)

    def best_child(self, c_param: float = 0.1) -> AlphaMCTSNode:
        """Selects the best child node using the UCB1 formula and initializes the best child before returning it."""

        q_score = 1 - ((self.children_result_scores / self.children_number_of_visits) + 1) / 2
        policy_score = c_param * np.sqrt(self.number_of_visits) / (1 + self.children_number_of_visits)

        ucb_scores = q_score + self.children_policies * policy_score

        # Select the best child
        best_child = self.children[np.argmax(ucb_scores)]
        best_child.init()
        return best_child

    def __repr__(self) -> str:
        return f"""AlphaMCTSNode(
{self.board}
visits: {self.number_of_visits}
depth: {self.num_played_moves}
score: {self.result_score:.2f}
policy: {self.policy:.2f}
move: {self.move_to_get_here}
children: {len(self.children)}
)"""


In [None]:

class AlphaZeroBot(ChessBot):
    def __init__(self, network_model_file_path) -> None:
        super().__init__('Alpha MCTS Bot')
        self.model = Network()
        self.model.load_state_dict(torch.load(network_model_file_path))

    def think(self, board: Board) -> Move:
        root = AlphaMCTSNode.root(board)

        while not self.time_is_up:
            self.iterate(root)

        best_child = root.best_child(c_param=0.0)

        print('---------------------- Alpha Zero Best Move ----------------------')
        print(f'Best child has {best_child.number_of_visits:.4f} visits')
        print(f'Best child has {best_child.result_score:.4f} result_score')
        print(f'Best child has {best_child.policy:.4f} policy')
        print('------------------------------------------------------------------')

        return best_child.move_to_get_here

    def iterate(self, root: AlphaMCTSNode) -> None:
        current_node = root

        while not current_node.is_terminal_node:
            if current_node.is_fully_expanded:
                current_node = current_node.best_child()
            else:
                moves_with_scores, result = self.evaluation(current_node.board)
                current_node.expand(moves_with_scores)
                current_node.back_propagate(result)
                return

    @torch.no_grad()
    def evaluation(self, board: Board) -> tuple[list[tuple[Move, float]], float]:
        policy, value = self.model.inference(encode_boards([board]).to(self.model.device))

        moves = filter_policy_then_get_moves_and_probabilities(policy[0], board)

        return moves, value[0]

In [None]:
import chess.engine

class BaselineBot(ChessBot):
    def __init__(self, engine_path: str, skill: int) -> None:
        super().__init__(f'Baseline Bot ({engine_path})')
        self.engine = chess.engine.SimpleEngine.popen_uci(engine_path)

        # Set the skill level of the engine
        # The skill level can be set from 0 to 20 (0 being the weakest and 20 the strongest)
        self.engine.configure({'Skill Level': skill}) 

    def think(self, board: Board) -> Move:
        result = self.engine.play(board, chess.engine.Limit(time=TIME_TO_THINK))
        if result.move is None:
            raise ValueError('The engine returned a None move')
        return result.move
    
    def stop(self) -> None:
        self.engine.quit()

## Bot prediction from Fen string

In [None]:
# Download the model file
%mkdir -p models
%cd models
!wget https://github.com/official-stockfish/Stockfish/releases/download/sf_16/stockfish-ubuntu-x86-64-modern.tar
%tar -xvf stockfish-ubuntu-x86-64-modern.tar
%mv stockfish-ubuntu-x86-64-modern stockfish
%rm stockfish-ubuntu-x86-64-modern.tar
%cd ..

In [None]:
import chess.engine

network = Network()
network.load_state_dict(torch.load(get_best_model_path()))

stockfish = chess.engine.SimpleEngine.popen_uci('models/stockfish')

mid_game_fens = [
    "r1bqkb1r/pp2pppp/2n2n2/2pp4/3PP3/2N2N2/PPP2PPP/R1BQKB1R w KQkq - 0 1",
    "r1bq1rk1/ppp1bppp/2np1n2/3Np3/1PP1P3/2N5/PB3PPP/R2QKB1R b KQ - 0 1",
    "r1bq1rk1/pppn1pbp/3p1np1/4p3/2PP4/2N2N2/PP2BPPP/R1BQ1RK1 w - - 0 1",
    "r2q1rk1/ppp1bppp/2np1n2/1B2p3/1bP1P3/2N2N2/PP1QBPPP/R4RK1 w - - 0 1",
    "2kr3r/ppp2ppp/2npb3/q7/3NP3/2N5/PPP1QPPP/R1B1K2R b KQ - 0 1",
    "r1bq1rk1/1pp1npbp/p2p1np1/3Pp3/2P1P3/2N1BN2/PP2BPPP/R2Q1RK1 w - - 0 1",
    "r2q1rk1/1bpp1ppp/p1n2n2/1p1pp3/3P1B2/2PBPN2/PP3PPP/R2QK2R w KQ - 0 1",
    "r2q1rk1/ppp2ppp/2nbpn2/3p4/3P1B2/2NBPN2/PPP2PPP/R2QK2R b KQ - 0 1",
    "rnbq1rk1/pp3pbp/3p1np1/2pPp3/2P1P3/2N2N2/PP3PPP/R1BQRBK1 b - - 0 1",
    "r1bq1rk1/pp2ppbp/2np1np1/8/2PNP3/2N5/PP3PPP/R1BQKB1R w KQ - 0 1"
]

network.eval()
policy, value = network.inference(encode_boards([Board(fen) for fen in mid_game_fens]).to(network.device))

for fen, (p, v) in zip(mid_game_fens, zip(policy, value)):
    board = Board(fen)
    moves_with_probabilities = filter_policy_then_get_moves_and_probabilities(p, board)
    stockfish_result = stockfish.analyse(board, chess.engine.Limit(time=0.1))

    print(f'FEN: {fen}')
    print(f'Value: {v:.4f}')
    print(f'Stockfish evaluation: {stockfish_result["score"]}')
    print(f'Moves with probabilities: {len(moves_with_probabilities)}')
    
    for move, probability in moves_with_probabilities:
        print(f'{move}: {probability:.4f}')
        
        board.push(move)
        stockfish_result = stockfish.analyse(board, chess.engine.Limit(time=0.1))
        print(f'Stockfish evaluation of new Board state: {stockfish_result["score"]}')
        board.pop()

## Bot plays against itself

In [None]:
bot1 = AlphaZeroBot(get_best_model_path())
bot2 = AlphaZeroBot(get_best_model_path())

game_manager = GameManager(bot1, bot2)
game_manager.play_game()

## Bot vs Human

In [None]:
bot = AlphaZeroBot(get_best_model_path())
human = HumanPlayer()

game_manager = GameManager(bot, human)
game_manager.play_game()

## Bot vs Baseline Bot

In [None]:
def bot_vs_baseline(model_path: str, repetitions: int, skill: int) -> None:
    bot = AlphaZeroBot(model_path)
    baseline = BaselineBot('models/stockfish', skill=skill)

    game_manager = GameManager(bot, baseline)
    results = [game_manager.play_game() for _ in range(repetitions)]
    
    baseline.stop()
    
    wins, draws, losses = 0, 0, 0
    
    for j, result in enumerate(results):
        print(f'Game {model_path}-{j}: {result.result}')
        
        if result.winner == WHITE:
            wins += 1
        elif result.winner == BLACK:
            losses += 1
        else:
            draws += 1
            
    print(f'Wins: {wins}, Draws: {draws}, Losses: {losses}')

bot_vs_baseline(get_best_model_path(), repetitions=10, skill=1)

## Bot vs Baseline for all last checkpoints

In [None]:
import os

current_best_model_path = get_best_model_path()
best_model_iteration = int(current_best_model_path.split('_')[-1].replace('.pt', ''))

for i in range(1, best_model_iteration + 1):
    model_path = f'models/model_{i}.pt'
    if not os.path.exists(model_path):
        continue
    
    bot_vs_baseline(model_path, repetitions=10, skill=1)