In [None]:
from IPython.core.display import HTML
with open('style.css') as file:
    css = file.read()
HTML(css)

In [None]:
# Autload python modules by default
%load_ext autoreload
%autoreload 2

# Convert notebooks to python, so they can be loaded effiently
from utils.jupyter_loader import JupyterLoader

loader = JupyterLoader()
loader.load_all()

In [None]:
from converted_notebooks.s04_engine_interface import Engine, ScoredMove
from converted_notebooks.s12_simplified_evaluation import DetailedMove, IncrementalEvaluator
from converted_notebooks.s11_iterative_deepening import AlphaBetaCache, NodeType, cache_alpha_beta

import chess
import chess.polyglot
import chess.gaviota
import random
import logging
import math
from chess.engine import Score, Cp, Mate, PovScore, MateGiven


class PrototypeV2Engine(Engine):

    def __init__(
        self,
        evaluator: IncrementalEvaluator,
        max_look_ahead_depth,
        opening_book,
        use_static_exchange_evaluation=False
    ):
        self.evaluator = evaluator
        self.max_look_ahead_depth = max_look_ahead_depth
        self.opening_book = opening_book
        self.use_static_exchange_evaluation = use_static_exchange_evaluation
        self.cache = AlphaBetaCache()

    @cache_alpha_beta
    def _value(
        self, board: chess.Board, depth: int, alpha: Score, beta: Score
    ) -> PovScore:
        if (score := self.evaluator.evaluate(board)) is not None:
            return score
        if depth == 0:
            return self._quiescence(board, alpha, beta)

        search_principal_variation = True
        for move in self._get_moves(board, depth):
            detailedMove = DetailedMove(board, move)

            self.evaluator.push(detailedMove)
            board.push(move)

            if search_principal_variation:
                value = self._value(board, depth - 1, -beta,
                                    -alpha).pov(not board.turn)
            else:
                value = self._zero_width_search(board, depth - 1,
                                                -alpha).pov(not board.turn)
                if alpha < value < beta:
                    value = self._value(board, depth - 1, -beta,
                                        -alpha).pov(not board.turn)

            board.pop()
            self.evaluator.pop()

            if value.is_mate():
                value = self._rise_mate(value)

            if value >= beta:
                return PovScore(value, board.turn)

            if value > alpha:
                alpha = value
                search_principal_variation = False

        return PovScore(alpha, board.turn)

    def _get_moves(self, board: chess.Board, depth: int):
        cached_moves = []
        uncached_moves = []
        for move in board.legal_moves:
            board.push(move)
            cache_key = self.cache.get_key(board)
            board.pop()

            try:
                # depth - 2 as we are looking at one move further and at the previous iteration with depth - 1
                _, value = self.cache.load_cache(cache_key, depth - 2)
                cached_moves.append((value.relative, move))
            except KeyError:
                uncached_moves.append(move)

        cached_moves.sort(reverse=False, key=lambda x: x[0])
        return [cached_move[1] for cached_move in cached_moves] + uncached_moves

    def _zero_width_search(
        self, board: chess.Board, depth: int, beta: Score
    ) -> PovScore:
        if beta.is_mate():
            alpha = Mate(beta.mate() +
                         1) if beta > Cp(0) else Mate(beta.mate() - 1)
        else:
            alpha = Cp(beta.score() - 1)

        if (score := self.evaluator.evaluate(board)) is not None:
            return score
        if depth == 0:
            return self._quiescence(board, alpha, beta)

        for move in board.generate_legal_moves():
            detailedMove = DetailedMove(board, move)

            self.evaluator.push(detailedMove)
            board.push(move)
            value = self._zero_width_search(board, depth - 1,
                                            -alpha).pov(not board.turn)
            board.pop()
            self.evaluator.pop()

            if value.is_mate():
                value = self._rise_mate(value)

            if value >= beta:
                return PovScore(value, board.turn)
        return PovScore(alpha, board.turn)

    def _quiescence(self, board: chess.Board, alpha: Score, beta: Score) -> int:
        stand_pat = self.evaluator.get_score().relative

        if stand_pat >= beta:
            return PovScore(beta, board.turn)
        alpha = max(alpha, stand_pat)

        for detailed_move in self._get_quiescence_detailed_moves(
            board, stand_pat, alpha, beta
        ):
            self.evaluator.push(detailed_move)
            board.push(detailed_move.move)
            value = self._quiescence(board, -beta, -alpha).pov(not board.turn)
            board.pop()
            self.evaluator.pop()

            if value.is_mate():
                value = self._rise_mate(value)

            if value >= beta:
                return PovScore(value, board.turn)
            alpha = max(alpha, value)
        return PovScore(alpha, board.turn)

    def _get_quiescence_detailed_moves(
        self, board: chess.Board, stand_pat: int, alpha: int, beta: int
    ):
        moves = []
        for move in board.generate_legal_captures():
            detailedMove = DetailedMove(board, move)

            capturedPieceValue = self.evaluator.piece_values[
                detailedMove.capturedPiece.piece.piece_type]

            movedPieceValue = self.evaluator.piece_values[
                detailedMove.movedPiece.piece.piece_type]

            if move.promotion is None:
                if self._canDeltaPrune(stand_pat, alpha, capturedPieceValue):
                    continue

                if self._bad_capture(
                    board, detailedMove, capturedPieceValue, movedPieceValue
                ):
                    continue

            moves.append((capturedPieceValue, movedPieceValue, detailedMove))

        # Oder by MVV-LVA (most valuable victim first, least valuable attacker second)
        moves.sort(reverse=True, key=lambda x: (x[0], -x[1]))

        return [move[2] for move in moves]

    def _bad_capture(
        self,
        board: chess.Board,
        detailedMove: DetailedMove,
        capturedPieceValue: int,
        movedPieceValue: int
    ):
        value = capturedPieceValue - movedPieceValue
        if value >= 0:
            return False

        if self.use_static_exchange_evaluation:
            return self._seeCapture(
                board, detailedMove, capturedPieceValue, movedPieceValue
            ) < 0
        return board.is_attacked_by(
            not board.turn, detailedMove.placedPiece.square
        )

    def _canDeltaPrune(
        self, stand_pat: Score, alpha: Score, capturedPieceValue: int
    ):
        POTENTIAL_POSITION_ADVANTAGE = 200
        bestAlpha = Cp(
            stand_pat.score() + capturedPieceValue +
            POTENTIAL_POSITION_ADVANTAGE
        )
        return bestAlpha < alpha

    def _seeCapture(
        self,
        original_board: chess.Board,
        detailedMove: DetailedMove,
        capturedPieceValue: int,
        movedPieceValue: int
    ):
        board = original_board.copy()

        piece = board.remove_piece_at(detailedMove.movedPiece.square)
        value = capturedPieceValue - self._see(
            board,
            detailedMove.placedPiece.square,
            movedPieceValue,
            not board.turn
        )
        board.set_piece_at(detailedMove.movedPiece.square, piece)

        return value

    def _rise_mate(self, score: Score) -> Score:
        moves = score.mate()
        if score > Cp(0):
            return Mate(moves + 1)
        return Mate(moves - 1)

    def _see(
        self,
        board: chess.Board,
        square: chess.Square,
        attacked_piece_value: int,
        turn: chess.Color
    ):
        attackers = [
            (square, self.evaluator.piece_values[board.piece_type_at(square)])
            for square in board.attackers(turn, square)
        ]
        if not attackers:
            return 0

        attacker_square, attacker_piece_value = min(attackers, key=lambda x: x[1])

        piece = board.remove_piece_at(attacker_square)
        value = max(
            0,
            attacked_piece_value -
            self._see(board, square, attacker_piece_value, not turn)
        )
        board.set_piece_at(attacker_square, piece)
        return value

    def _evaluate_move(
        self, board: chess.Board, move: chess.Move, depth: int
    ) -> ScoredMove:
        self.evaluator.push(DetailedMove(board, move))
        board.push(move)
        score = self._value(board, depth - 1, -math.inf, math.inf)
        board.pop()
        self.evaluator.pop()

        return ScoredMove(score=score.white(), move=move)

    def _evaluate_moves(self, board: chess.Board) -> list[ScoredMove]:
        logging.info(f"Max depth: {self.max_look_ahead_depth}")
        self.cache.clear()
        self.evaluator.init(board)

        for depth in range(1, self.max_look_ahead_depth + 1):
            scored_moves = [
                self._evaluate_move(board, move, depth)
                for move in board.legal_moves
            ]

            logging.info(f"Depth {depth}")

        return scored_moves

    def analyse(self, board: chess.Board) -> list[ScoredMove]:
        next_moves = self._evaluate_moves(board.copy())

        whites_turn = board.turn is chess.WHITE
        next_moves.sort(reverse=whites_turn)

        return next_moves

    def _find_move(self, board: chess.Board, score: PovScore) -> chess.Move:
        best_moves = []
        for move in board.legal_moves:
            board.push(move)
            cache_key = self.cache.get_key(board)
            board.pop()

            try:
                type, value = self.cache.load_cache(cache_key, self.max_look_ahead_depth - 1)

                if value.is_mate():
                    value.relative = self._rise_mate(value.relative)

                if type == NodeType.EXACT and value.white() == score.white():
                    best_moves.append(move)
            except KeyError:
                pass



        assert best_moves, f"No best move found with the given score {score.white()}"
        return best_moves[0]

    def _find_best_move(self, board: chess.Board) -> chess.Move:
        logging.info(f"Max depth: {self.max_look_ahead_depth}")
        self.cache.clear()
        self.evaluator.init(board)

        for depth in range(self.max_look_ahead_depth + 1):
            score = self._value(board, depth, -math.inf, math.inf)
            logging.info(f"Depth {depth}")

            # Research in case we found an entry directly in the endgame book
            if score.is_mate() and len(chess.SquareSet(board.occupied)) <= 5:
                scored_moves = [
                    self._evaluate_move(board, move, depth)
                    for move in board.legal_moves
                ]
                return max(
                    scored_moves
                ).move if board.turn is chess.WHITE else min(scored_moves).move

        return self._find_move(board, score)

    def _get_opening_move(self, board: chess.Board) -> chess.Move:
        with chess.polyglot.open_reader(self.opening_book) as reader:
            moves = [
                ScoredMove(entry.weight, entry.move)
                for entry in reader.find_all(board)
                if entry.weight >= 50
            ]
            print

        if not moves:
            return None

        return max(moves).move

    def play(self, board: chess.Board) -> chess.engine.PlayResult:
        board_copy = board.copy()

        best_move = self._get_opening_move(board_copy) or self._find_best_move(
            board_copy
        )
        return chess.engine.PlayResult(move=best_move, ponder=None)

In [None]:
from converted_notebooks.s12_simplified_evaluation import IncrementalPieceSquareEvaluator


class IncrementalEndgamePieceSuqareEvaluator(IncrementalPieceSquareEvaluator):

    def __init__(
        self,
        piece_values,
        piece_square_values,
        endgame_piece_square_values,
        check_endgame,
        endgame_book_directory="../../data/gaviota"
    ):
        super().__init__(
            piece_values,
            piece_square_values,
            endgame_piece_square_values,
            check_endgame
        )
        self.endgame_book_directory = endgame_book_directory

    def evaluate(self, board: chess.Board) -> PovScore:
        if (score := super().evaluate(board)) is not None:
            return score

        if len(chess.SquareSet(board.occupied)) > 5:
            return None

        with chess.gaviota.open_tablebase(
            self.endgame_book_directory
        ) as tablebase:
            try:
                depth_to_mate = tablebase.probe_dtm(board)
                if depth_to_mate == 0:
                    return PovScore(Cp(0), board.turn)
                return PovScore(Mate(depth_to_mate), board.turn)
            except:
                return None

In [None]:
from converted_notebooks.s12_simplified_evaluation import incremental_simplified_evaluator

incremental_endgame_simplified_evaluator = IncrementalEndgamePieceSuqareEvaluator(
    piece_values=incremental_simplified_evaluator.piece_values,
    piece_square_values=incremental_simplified_evaluator.piece_square_values,
    endgame_piece_square_values=incremental_simplified_evaluator.
    endgame_piece_square_values,
    check_endgame=incremental_simplified_evaluator.check_endgame
)

In [None]:
from converted_notebooks.s12_simplified_evaluation import incremental_simplified_evaluator
from converted_notebooks.s06_play import play_game

random.seed(42)

engine = PrototypeV2Engine(
    evaluator=incremental_endgame_simplified_evaluator,
    max_look_ahead_depth=4,
    opening_book="../../data/polyglot/ProDeo.bin",
    use_static_exchange_evaluation=False
)

board = chess.Board()
# board = chess.Board("8/8/8/3k4/8/8/5R2/3K4 w - - 0 1")
# play_game(board, engine, engine, display_board=True, log_moves=True)

In [None]:
from converted_notebooks.s09_minimax_engine import middlegame_board
from converted_notebooks.s12_simplified_evaluation import incremental_simplified_evaluator

random.seed(42)

engine = PrototypeV2Engine(
    evaluator=incremental_simplified_evaluator,
    max_look_ahead_depth=4,
    opening_book="../../data/polyglot/ProDeo.bin",
    use_static_exchange_evaluation=False
)

%timeit engine.play(middlegame_board)

# 1.47 s ± 7.66 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [None]:
from converted_notebooks.s15_performance import test_engine

from pathlib import Path


def create_PrototypeV2Engine_engine(depth: int):
    return PrototypeV2Engine(
        evaluator=incremental_simplified_evaluator,
        max_look_ahead_depth=depth,
        opening_book="../../data/polyglot/ProDeo.bin",
        use_static_exchange_evaluation=False
    )


s17_PrototypeV2Engine_results = test_engine(
    "s17_PrototypeV2Engine",
    Path("./results/s17_PrototypeV2Engine.json"),
    create_PrototypeV2Engine_engine
)

In [None]:
from converted_notebooks.s15_performance import test_engine


def create_PrototypeV2Engine_see_engine(depth: int):
    return PrototypeV2Engine(
        evaluator=incremental_simplified_evaluator,
        max_look_ahead_depth=depth,
        opening_book="../../data/polyglot/ProDeo.bin",
        use_static_exchange_evaluation=True
    )


s17_PrototypeV2Engine_see_results = test_engine(
    "s17_PrototypeV2Engine_see",
    Path("./results/s17_PrototypeV2Engine_see.json"),
    create_PrototypeV2Engine_see_engine
)

In [None]:
import pandas as pd
import IPython.display

results = s17_PrototypeV2Engine_results + s17_PrototypeV2Engine_see_results
results_frame = pd.DataFrame(results)

with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    IPython.display.display(
        results_frame.groupby(['engine', 'depth']).sum()
    )  # .drop(columns=['time'])

In [None]:
from converted_notebooks.s15_performance import match_stockfish

stockfish_results = [
    match_stockfish(
        create_PrototypeV2Engine_engine(4),
        engine_description="4",
        stockfish_elo=elo
    ) for elo in [1400, 1600, 1800]
]

stockfish_results += [
    match_stockfish(
        create_PrototypeV2Engine_see_engine(4),
        engine_description="see_4",
        stockfish_elo=elo,
    ) for elo in [1400, 1600, 1800]
]

In [None]:
concatenated_result_frames = pd.concat([
    result[1] for result in stockfish_results
])
IPython.display.display(concatenated_result_frames)