<a href="https://colab.research.google.com/github/Melvinchen0404/Chess_engine/blob/main/sapientia_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**STEP 1**: Create openings book

In [1]:
!pip install datasets
import chess
from datasets import load_dataset

# Load the Lichess openings dataset
def load_openings_book():
    dset = load_dataset("Lichess/chess-openings")
    train_data = dset['train']

    # Create an opening book dictionary where the key is the opening name and the value is the UCI moves
    openings_book = {entry["name"]: entry["uci"] for entry in train_data}
    return openings_book

# Convert moves to UCI notation
def moves_to_uci(move_list):
    return [move.uci() for move in move_list]

# Modified openingMoves function
def openingMoves(board, valid_moves):
    openings_book = load_openings_book()
    current_moves_uci = moves_to_uci(board.move_stack)  # Get UCI moves from the move stack

    # Check for any opening that matches the beginning of the current moves
    for opening_name, opening_uci in openings_book.items():
        opening_moves = opening_uci.split()  # Split the UCI moves into a list

        # If the current moves match the opening up to the length of the opening UCI
        if current_moves_uci[:len(opening_moves)] == opening_moves:
            # Return the valid moves that follow the opening
            return valid_moves[len(opening_moves):]  # Return the rest of the moves from this opening

    # If no matching opening, return all valid moves
    return valid_moves

# Function to evaluate the board with prioritized opening moves
def scoreBoard(board, valid_moves):
    # Prioritize opening moves
    prioritized_moves = openingMoves(board, valid_moves)

    # Add your normal evaluation function here (position scoring, material, etc.)
    # For now, we just return the list of prioritized moves as a placeholder
    print(f"Prioritized moves: {prioritized_moves}")

    # For simplicity, returning the count of valid moves for now
    return len(prioritized_moves)



**STEP 2**: Test openings book

In [2]:
# Test the function
fen = "rnbqkbnr/pppp1ppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq - 0 1"  # FEN after 1. e2e4
board = chess.Board(fen)

valid_moves = list(board.legal_moves)
print(f"Valid moves: {valid_moves}")

# Call the scoreBoard function
score = scoreBoard(board, valid_moves)
print(f"Evaluated score (number of prioritized moves): {score}")

Valid moves: [Move.from_uci('g8e7'), Move.from_uci('g8h6'), Move.from_uci('g8f6'), Move.from_uci('f8e7'), Move.from_uci('f8d6'), Move.from_uci('f8c5'), Move.from_uci('f8b4'), Move.from_uci('f8a3'), Move.from_uci('e8e7'), Move.from_uci('d8e7'), Move.from_uci('d8f6'), Move.from_uci('d8g5'), Move.from_uci('d8h4'), Move.from_uci('b8c6'), Move.from_uci('b8a6'), Move.from_uci('h7h6'), Move.from_uci('g7g6'), Move.from_uci('f7f6'), Move.from_uci('d7d6'), Move.from_uci('c7c6'), Move.from_uci('b7b6'), Move.from_uci('a7a6'), Move.from_uci('h7h5'), Move.from_uci('g7g5'), Move.from_uci('f7f5'), Move.from_uci('d7d5'), Move.from_uci('c7c5'), Move.from_uci('b7b5'), Move.from_uci('a7a5')]


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Prioritized moves: [Move.from_uci('g8e7'), Move.from_uci('g8h6'), Move.from_uci('g8f6'), Move.from_uci('f8e7'), Move.from_uci('f8d6'), Move.from_uci('f8c5'), Move.from_uci('f8b4'), Move.from_uci('f8a3'), Move.from_uci('e8e7'), Move.from_uci('d8e7'), Move.from_uci('d8f6'), Move.from_uci('d8g5'), Move.from_uci('d8h4'), Move.from_uci('b8c6'), Move.from_uci('b8a6'), Move.from_uci('h7h6'), Move.from_uci('g7g6'), Move.from_uci('f7f6'), Move.from_uci('d7d6'), Move.from_uci('c7c6'), Move.from_uci('b7b6'), Move.from_uci('a7a6'), Move.from_uci('h7h5'), Move.from_uci('g7g5'), Move.from_uci('f7f5'), Move.from_uci('d7d5'), Move.from_uci('c7c5'), Move.from_uci('b7b5'), Move.from_uci('a7a5')]
Evaluated score (number of prioritized moves): 29


**STEP 3**: Define Sapientia chess engine

In [13]:
import random
import threading
import chess
import queue
from endgames_test import fetch_wdl_and_dtz_from_lichess

"""
This is my updated version of the Sepentia chess engine: https://github.com/EuclidStellar/Sepentia-ChessEngine/blob/main/src/ChessAI.py
It is called Sapientia (Latin for 'wisdom, discernment, memory')

Additions include:
An openings book developed from the Lichess openings dataset;
Updated piece value scores;
Additional heuristics for static evaluation of board position (king safety, pawn structure, control of the centre);
Endgame information (WDL, DTZ) from the Lichess Syzygy tablebase
"""

# Piece value scores

piece_score = {
    "K": 7000, "Q": 950, "R": 500, "B": 330, "N": 320, "P": 100,  # White
    "k": -7000, "q": -950, "r": -500, "b": -330, "n": -320, "p": -100  # Black
}

# Piece-square tables
"""
Pawns are most valuable in the centre and the middle game
Bishops are stronger in the centre and on the long diagonals
Rooks are most powerful on the seventh rank
Queens are most effective in the centre
Knights are most valuable in the centre and weakest at the edges and corners
"""
pawn_scores = [[0, 0, 0, 0, 0, 0, 0, 0],
               [78, 83, 86, 73, 102, 82, 85, 90],
               [7, 29, 21, 44, 40, 31, 44, 7],
               [-17, 16, -2, 15, 14, 0, 15, -13],
               [-26, 3, 10, 9, 6, 1, 0, -23],
               [-22, 9, 5, -11, -10, -2, 3, -19],
               [-31, 8, -7, -37, -36, -14, 3, -31],
               [0.0, 0.1, 0.2, 0.2, 0.2, 0.2, 0.1, 0.0]]

bishop_scores = [[-59, -78, -82, -76, -23, -107, -37, -50],
                 [-11, 20, 35, -42, -39, 31, 2, -22],
                 [-9, 39, -32, 41, 52, -10, 28, -14],
                 [25, 17, 20, 34, 26, 25, 15, 10],
                 [13, 10, 17, 23, 17, 16, 0, 7],
                 [14, 25, 24, 15, 8, 25, 20, 15],
                 [19, 20, 11, 6, 7, 6, 20, 16],
                 [-7, 2, -15, -12, -14, -15, -10, -10]]

rook_scores = [[35, 29, 33, 4, 37, 33, 56, 50],
               [55, 29, 56, 67, 55, 62, 34, 60],
               [19, 35, 28, 33, 45, 27, 25, 15],
               [0, 5, 16, 13, 18, -4, -9, -6],
               [-28, -35, -16, -21, -13, -29, -46, -30],
               [-42, -28, -42, -25, -25, -35, -26, -46],
               [-53, -38, -31, -26, -29, -43, -44, -53],
               [-30, -24, -18, 5, -2, -18, -31, -32]]

queen_scores = [[6, 1, -8, -104, 69, 24, 88, 26],
                [14, 32, 60, -10, 20, 76, 57, 24],
                [-2, 43, 32, 60, 72, 63, 43, 2],
                [1, -16, 22, 17, 25, 20, -13, -6],
                [-14, -15, -2, -5, -1, -10, -20, -22],
                [-30, -6, -13, -11, -16, -11, -16, -27],
                [-36, -18, 0, -19, -15, -15, -21, -38],
                [-39, -30, -31, -13, -31, -36, -34, -42]]

knight_scores = [[-66, -53, -75, -75, -10, -55, -58, -70],
                 [-3, -6, 100, -36, 4, 62, -4, -14],
                 [10, 67, 1, 74, 73, 27, 62, -2],
                 [24, 24, 45, 37, 33, 41, 25, 17],
                 [-1, 5, 31, 21, 22, 35, 2, 0],
                 [-18, 10, 13, 22, 18, 15, 11, -14],
                 [-23, -15, 2, 0, 2, 0, -23, -20],
                 [-66, -53, -75, -75, -10, -55, -58, -70]]

piece_position_scores = {"wN": knight_scores,
                         "bN": knight_scores[::-1],
                         "wB": bishop_scores,
                         "bB": bishop_scores[::-1],
                         "wQ": queen_scores,
                         "bQ": queen_scores[::-1],
                         "wR": rook_scores,
                         "bR": rook_scores[::-1],
                         "wp": pawn_scores,
                         "bp": pawn_scores[::-1]}

CHECKMATE = 10000
STALEMATE = 0
DEPTH = 4

# Order moves based on captures and high-value targets
def orderMoves(moves, game_state):
    capture_moves = [move for move in moves if game_state.board[move.end_row][move.end_col] != "--"]
    quiet_moves = [move for move in moves if move not in capture_moves]

    capture_moves.sort(key=lambda move: piece_score[game_state.board[move.end_row][move.end_col][1]], reverse=True)
    quiet_moves.sort(key=lambda move: piece_position_scores.get(game_state.board[move.start_row][move.start_col], [[0]*8]*8)[move.end_row][move.end_col], reverse=True)

    ordered_moves = capture_moves + quiet_moves
    return ordered_moves

def scoreBoard(board):
    """
    Score the board. A positive score is good for white, a negative score is good for black.
    """
    if board.is_checkmate():
        return -CHECKMATE if board.turn else CHECKMATE
    if board.is_stalemate():
        return STALEMATE

    score = 0
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            piece_value = piece_score.get(piece.symbol().lower(), 0)
            piece_position_score = piece_position_scores.get(piece.symbol(), [[0] * 8] * 8)[square // 8][square % 8]

            # Calculate the score for the piece itself
            if piece.color == chess.WHITE:
                score += piece_value + piece_position_score
            else:
                score -= piece_value + piece_position_score

    # Call tablebase function to fetch WDL and DTZ
    fen = board.fen()
    tablebase_info = fetch_wdl_and_dtz_from_lichess(fen)

    if "WDL_numeric" in tablebase_info:
      # If WDL info is available, add it to the score
      score += tablebase_info["WDL_numeric"] * 1000  # Scale the WDL impact (you can adjust the multiplier)
      print(f"WDL information available: {tablebase_info['WDL_numeric']}, modifying score accordingly.")
    else:
      print("No WDL information available.")

    if "DTZ_numeric" in tablebase_info:
      dtz = tablebase_info["DTZ_numeric"]

      if dtz < 0:
        # If DTZ is negative, it indicates that the game is in a losing position.
        # Penalize more as the game approaches a loss.
        score -= abs(dtz) * 100  # Penalize for losing positions (you can adjust the multiplier)
        print(f"DTZ is negative: {dtz}, resulting in penalization for a losing position.")
      elif dtz < 10:
        # If DTZ is less than 10, we are near checkmate/draw
        score += 500  # Reward positions near the endgame (you can adjust this value)
        print(f"DTZ is close to zero: {dtz}, resulting in a reward for positions near the endgame.")
      else:
        # DTZ is relatively large, so it doesn't have a significant effect on the score.
        print(f"DTZ is much larger from zero: {dtz}, resulting in minimal impact on the score.")
    else:
        print("No DTZ information available.")

    # Optionally, handle the case where neither WDL nor DTZ is available by defaulting to a neutral or fallback score adjustment
    if "WDL_numeric" not in tablebase_info and "DTZ_numeric" not in tablebase_info:
        print("Neither WDL nor DTZ information available.")

    # King Safety (a simple heuristic based on the distance of the king from the center and the possibility of castling)
    score += 10 * (4 - abs(white_king_square // 8 - 3) - abs(white_king_square % 8 - 3))
    score -= 10 * (4 - abs(black_king_square // 8 - 3) - abs(black_king_square % 8 - 3))

    # Check if castling is available for either side (still possible)
    if not board.has_kingside_castling_rights(chess.WHITE) and not board.has_queenside_castling_rights(chess.WHITE):
        score -= 30  # Penalize if white can't castle
    if not board.has_kingside_castling_rights(chess.BLACK) and not board.has_queenside_castling_rights(chess.BLACK):
        score += 30  # Penalize if black can't castle

    # If castling rights are still available, reward some points
    if board.has_kingside_castling_rights(chess.WHITE) or board.has_queenside_castling_rights(chess.WHITE):
        score += 15  # Reward for white if castling is still possible
    if board.has_kingside_castling_rights(chess.BLACK) or board.has_queenside_castling_rights(chess.BLACK):
        score -= 15  # Reward for black if castling is still possible

    # Pawn Structure (pass the method to evaluate pawn structure)
    score += evaluate_pawn_structure(board)

    # Control of the Center (just a simple evaluation of central pawn squares)
    center_squares = [27, 28, 35, 36, 45, 46, 53, 54]
    score += sum(1 for sq in center_squares if board.piece_at(sq) and board.piece_at(sq).color == chess.WHITE)
    score -= sum(1 for sq in center_squares if board.piece_at(sq) and board.piece_at(sq).color == chess.BLACK)

    return score

def evaluate_pawn_structure(board):
    """
    A simple evaluation of pawn structure: penalizes isolated, doubled, and backward pawns,
    rewards passed pawns.
    """
    score = 0
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece and piece.piece_type == chess.PAWN:
            if piece.color == chess.WHITE:
                score += evaluate_white_pawn_structure(board, square)
            else:
                score -= evaluate_black_pawn_structure(board, square)
    return score

def evaluate_white_pawn_structure(board, square):
    """ Evaluates white pawn structure for isolated, doubled, backward, and passed pawns. """
    score = 0
    col = square % 8

    # Isolated pawns (no pawns on adjacent files)
    if col == 0 or not board.piece_at(square - 1) or board.piece_at(square - 1).piece_type != chess.PAWN:
        if col == 7 or not board.piece_at(square + 1) or board.piece_at(square + 1).piece_type != chess.PAWN:
            score -= 10  # Penalize isolated pawn

    # Passed pawns (no opposing pawns blocking them on the same file or adjacent files)
    if not board.piece_at(square + 8) and (col == 0 or not board.piece_at(square + 7) or board.piece_at(square + 7).piece_type != chess.PAWN):
        score += 20  # Reward passed pawns

    return score

def evaluate_black_pawn_structure(board, square):
    """ Evaluates black pawn structure for isolated, doubled, backward, and passed pawns. """
    score = 0
    col = square % 8

    # Isolated pawns (no pawns on adjacent files)
    if col == 0 or not board.piece_at(square - 1) or board.piece_at(square - 1).piece_type != chess.PAWN:
        if col == 7 or not board.piece_at(square + 1) or board.piece_at(square + 1).piece_type != chess.PAWN:
            score += 10  # Penalize isolated pawn

    # Passed pawns (no opposing pawns blocking them on the same file or adjacent files)
    if not board.piece_at(square - 8) and (col == 0 or not board.piece_at(square - 7) or board.piece_at(square - 7).piece_type != chess.PAWN):
        score -= 20  # Penalize passed pawns for black

    return score

# NegaMax with alpha-beta pruning
def findMoveNegaMaxAlphaBeta(board, valid_moves, depth, alpha, beta, turn_multiplier):
    global next_move

    # Base case: if we reach the maximum depth or the game ends (checkmate, stalemate)
    if depth == 0 or board.is_checkmate() or board.is_stalemate() or board.is_insufficient_material() or board.is_seventyfive_moves() or board.is_variant_draw():
        return turn_multiplier * scoreBoard(board)

    max_score = -CHECKMATE

    # Order moves based on some heuristic (for better performance)
    ordered_moves = list(valid_moves)
    ordered_moves.sort(key=lambda move: scoreBoard(board.push(move)), reverse=True)  # Sort by score

    for move in ordered_moves:
        board_copy = copy.deepcopy(board)  # Make a copy of the board
        board_copy.push(move)
        score = -findMoveNegaMaxAlphaBeta(board_copy, board_copy.legal_moves, depth - 1, -beta, -alpha, -turn_multiplier)

        if score > max_score:
            max_score = score
            if depth == DEPTH:
                next_move = move

        alpha = max(alpha, score)
        if alpha >= beta:
            break

    return max_score

# Find the best move
def findBestMove(game_state, valid_moves, return_queue):
    global next_move
    next_move = None
    findMoveNegaMaxAlphaBeta(game_state, valid_moves, DEPTH, -CHECKMATE, CHECKMATE, 1 if game_state.white_to_move else -1)
    return_queue.put(next_move)

# Control center moves (for positional play)
def controlCenterMoves(game_state, valid_moves):
    center_moves = [move for move in valid_moves if move.start_row in [1, 6] and abs(move.start_col - move.end_col) <= 1 or move.start_col == 4 or move.end_col == 4]
    return center_moves

# Global cache for openings_book
openings_book = {}

# Function to load openings dataset from Lichess and cache it globally
def load_openings_dataset():
    global openings_book

    # Check if openings_book is already populated
    if openings_book:
        print("Using cached openings book.")
        return openings_book

    # Load the Lichess openings dataset from Hugging Face
    dset = load_dataset("Lichess/chess-openings")

    # Extract the training data (which contains the opening names and their UCI moves)
    train_data = dset['train']

    # Build a dictionary mapping opening names to their UCI move sequences
    openings_book = {entry["name"]: entry["uci"] for entry in train_data}

    print("Openings book loaded and cached.")
    return openings_book

# Modified openingMoves function
def openingMoves(board, valid_moves, openings_book):
    # Convert the current move stack to UCI notation as a string
    current_moves_uci = " ".join([move.uci() for move in board.move_stack]).strip()

    # If no moves have been played yet, we can skip opening matching logic
    if not current_moves_uci:
        print("No moves played yet, skipping opening check.")
        return valid_moves[0]  # Return the first valid move if no moves are played

    for opening_name, opening_moves in openings_book.items():
        opening_sequence = opening_moves.strip().split()

        num_moves_played = len(board.move_stack)
        partial_opening_sequence = " ".join(opening_sequence[:num_moves_played]).strip()

        if partial_opening_sequence == current_moves_uci:
            print(f"Match found from the openings book of Sapientia engine: {opening_name}")

            # Get the next move in the opening sequence after the matched ones
            next_move_uci = opening_sequence[num_moves_played]

            # Filter valid moves to only include the one that matches the next move in the opening
            opening_moves_valid = [move for move in valid_moves if move.uci() == next_move_uci]

            if opening_moves_valid:
                best_move = opening_moves_valid[0]
                return best_move

    # If no match found in the opening book, proceed to tablebase analysis for best move
    print("No match found in openings book. Using tablebase to select the best move.")

    # Get the best move based on DTZ or WDL using the tablebase (assuming `get_best_move_based_on_dtz` function)
    fen = board.fen()
    best_move = get_best_move_based_on_dtz(fen)

    if best_move:
        print(f"Best move selected from tablebase: {best_move}")
        return best_move

    # If no move found via tablebase, return the first valid move as a fallback
    print("No best move found from tablebase. Selecting first valid move.")
    return valid_moves[0]

# Function for deriving best move from the openings book
def get_best_move_from_opening(uci_sequence):
    try:
        # Initialize the board from the UCI sequence
        board = chess.Board()
        print(f"Board initialized:\n{board}")

        # Apply each move from the UCI sequence
        for move in uci_sequence.split():
            board.push_uci(move)
        print(f"Board after moves:\n{board}")

        # Load the opening book
        openings_book = load_openings_dataset()

        # Get the valid moves from the current position
        valid_moves = list(board.legal_moves)  # Convert generator to a list

        # Get the best move (either from the opening book or first valid move)
        best_move = openingMoves(board, valid_moves, openings_book)

        # Output the result
        print(f"Best move from openings book of Sapientia engine: {best_move}")

    except Exception as e:
        print(f"Error: {e}")

# Function for analyzing endgames with the tablebase
def get_best_move_from_tablebase(fen):
    """
    Fetches the best move from the tablebase and includes additional information like WDL and DTZ.

    Args:
    - fen (str): The FEN string representing the board position.

    Returns:
    - dict: A dictionary containing the best move (UCI), WDL, and DTZ.
    """
    # Construct the URL with the provided FEN
    url = f"https://tablebase.lichess.ovh/standard?fen={fen}"

    # Send a GET request to the tablebase API
    response = requests.get(url)

    if response.status_code == 200:
        # Parse the JSON response
        data = response.json()

        # Check if the 'moves' key exists and the list is not empty
        if 'moves' in data and len(data['moves']) > 0:
            # Extract the first UCI move
            first_move_uci = data['moves'][0]['uci']

            # Extract WDL (win, draw, loss) and DTZ (Distance to Zero)
            wdl = data.get('category', 'unknown')  # Category could be win, loss, or draw
            dtz = data.get('dtz', None)  # Distance to zero
            precise_dtz = data.get('precise_dtz', None)  # Precise distance to zero

            # Validate the UCI move string
            try:
                best_move = chess.Move.from_uci(first_move_uci)
                if best_move in chess.Board(fen).legal_moves:
                    # Return the best move and additional details
                    return {
                        'best_move': first_move_uci,
                        'wdl': wdl,
                        'dtz': dtz,
                        'precise_dtz': precise_dtz
                    }
                else:
                    return {"error": f"Best move (UCI) {first_move_uci} is not legal for this position."}
            except ValueError:
                return {"error": f"Error in parsing UCI move: {first_move_uci}"}
        else:
            return {"error": "No valid moves found in the response."}
    else:
        return {"error": f"Failed to fetch data. HTTP Status code: {response.status_code}"}

# For thread-safe handling of the best move selection
def findMove(game_state, valid_moves, return_queue):
    global next_move
    next_move = None
    for depth in range(1, DEPTH + 1):
        findMoveNegaMaxAlphaBeta(game_state, valid_moves, depth, -CHECKMATE, CHECKMATE, 1 if game_state.white_to_move else -1)
        if next_move is not None:
            break
    return_queue.put(next_move)

# Utility function to get a random move
def findRandomMove(valid_moves):
    return random.choice(valid_moves)

**STEP 4**: Test Sapientia against an opening phase position

In [14]:
# Test the engine on a specific UCI sequence (example sequence after 1. e2e4 g8f6)
uci_sequence = "e2e4 g8f6"
get_best_move_from_opening(uci_sequence)

Board initialized:
r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
P P P P P P P P
R N B Q K B N R
Board after moves:
r n b q k b . r
p p p p p p p p
. . . . . n . .
. . . . . . . .
. . . . P . . .
. . . . . . . .
P P P P . P P P
R N B Q K B N R
Openings book loaded and cached.
Match found from the openings book of Sapientia engine: Alekhine Defense
Best move from openings book of Sapientia engine: e4e5


**STEP 5**: Test Sapientia against an endgame position

In [15]:
import requests

# Define the test FEN
test_fen = "8/8/2k1PK2/8/8/8/8/8 w - - 0 1"
board = chess.Board(test_fen)
print("Board display:\n", board)
valid_moves = list(board.legal_moves)
best_move_uci = get_best_move_from_tablebase(test_fen)
move_details = get_best_move_from_tablebase(test_fen)

if move_details:
    print(f"Best move (UCI): {move_details['best_move']}")
    print(f"Result (WDL): {move_details['wdl']}")
    print(f"Distance to Zero (DTZ): {move_details['dtz']}")

Board display:
 . . . . . . . .
. . . . . . . .
. . k . P K . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
Best move (UCI): e6e7
Result (WDL): win
Distance to Zero (DTZ): 1
