In [1]:
!pip install python-chess

Collecting python-chess
  Downloading python_chess-1.999-py3-none-any.whl.metadata (776 bytes)
Collecting chess<2,>=1 (from python-chess)
  Downloading chess-1.10.0-py3-none-any.whl.metadata (19 kB)
Downloading python_chess-1.999-py3-none-any.whl (1.4 kB)
Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess, python-chess
Successfully installed chess-1.10.0 python-chess-1.999


In [1]:
import chess
from chess import Board
import tensorflow as tf
import numpy as np


In [2]:
def board_to_nn_mat(board: Board):
    mat = np.zeros(shape=(16, 8, 8))

    for x, piece in board.piece_map().items():
        layer = piece.piece_type + (0 if piece.color else 6) - 1
        mat[layer, 7 - x // 8, x % 8] = 1

    for square in chess.SQUARES:
        if board.is_attacked_by(chess.WHITE, square):
            mat[13, 7 - square // 8, square % 8] = 1

        if board.is_attacked_by(chess.BLACK, square):
            mat[14, 7 - square // 8, square % 8] = 1

    if board.turn == chess.BLACK:
        mat[15] = np.zeros(shape=(8, 8))
    else:
        mat[15] = np.ones(shape=(8, 8))


    return mat

In [3]:
# base_path = '/content/drive/My Drive/Colab Notebooks/'
base_path = '/'

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def count_games_in_pgn(file_path):
    total_games = 0
    with open(file_path, 'r') as pgn_file:
        s = time.time()
        while True:
            if time.time() - s >= 10:
                e = time.time()
                print(total_games, e - s)
                s = e
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                break
            total_games += 1
    return total_games

total_games = count_games_in_pgn("lichess_elite_2023-07/lichess_elite_2023-07.pgn")
total_games

In [None]:
import numpy as np
import chess.pgn

X = []
y = []
unique_moves = set()

def process_game(game):
    board = game.board()
    for move in game.mainline_moves():
        X.append(board_to_nn_mat(board))
        y.append(str(move))
        unique_moves.add(str(move))
        board.push(move)

saving_rate = 3000 
save_count = 0

with open(f"{base_path}lichess_elite_2023-07/lichess_elite_2023-07.pgn") as pgn_file:
    count = 0
    while True:
        if count % 1000 == 0:
            print(f"Processing game {count}...")

        game = chess.pgn.read_game(pgn_file)
        if game is None or count // (saving_rate) == 35:
            break

        process_game(game)
        count += 1

        if count % saving_rate == 0:
            save_count += 1
            np.save(f'./npy-data/data-X-{save_count}.npy', np.array(X, dtype=np.uint8))
            np.save(f'./npy-data/data-y-{save_count}.npy', np.array(y))

            X = []
            y = []

    np.save(f'./npy-data/data-u.npy', np.array(list(unique_moves)))
    if X and y:
        save_count += 1
        np.save(f'./npy-data/data-X-{save_count}.npy', np.array(X, dtype=np.uint8))
        np.save(f'./npy-data/data-y-{save_count}.npy', np.array(y))

    print("Processing complete.")


Processing game 0...
Processing game 1000...
Processing game 2000...
Processing game 3000...
Processing game 4000...
Processing game 5000...
Processing game 6000...
Processing game 7000...
Processing game 8000...
Processing game 9000...
Processing game 10000...
Processing game 11000...
Processing game 12000...
Processing game 13000...
Processing game 14000...
Processing game 15000...
Processing game 16000...
Processing game 17000...
Processing game 18000...
Processing game 19000...
Processing game 20000...
Processing game 21000...
Processing game 22000...
Processing game 23000...
Processing game 24000...
Processing game 25000...
Processing game 26000...
Processing game 27000...
Processing game 28000...
Processing game 29000...
Processing game 30000...
Processing game 31000...
Processing game 32000...
Processing game 33000...
Processing game 34000...
Processing game 35000...
Processing game 36000...
Processing game 37000...
Processing game 38000...
Processing game 39000...
Processing ga

In [None]:
count

35

In [None]:
np.save(f'./npy-data/data-u.npy', np.array(list(unique_moves)))

In [4]:
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

unique_moves = np.load(f'.{base_path}npy-data/data-u.npy', allow_pickle=True)

if unique_moves.ndim == 0:
    unique_moves = unique_moves.item()

unique_moves_list = list(unique_moves)

label_encoder = LabelEncoder()
label_encoder.fit(unique_moves_list)

integer_encoded_full = label_encoder.transform(unique_moves_list).reshape(-1, 1)

onehot_encoder = OneHotEncoder(sparse_output=False)
onehot_encoder.fit(integer_encoded_full)

def encode_y(y):
    integer_encoded = label_encoder.transform(y).reshape(-1, 1)

    onehot_encoded = onehot_encoder.transform(integer_encoded)

    return onehot_encoded


In [5]:
len(unique_moves)

1920

In [8]:
model = tf.keras.models.Sequential()

model.add(tf.keras.layers.InputLayer(shape=(16, 8, 8)))

model.add(tf.keras.layers.Conv2D(128, (2, 2), activation='relu'))
model.add(tf.keras.layers.BatchNormalization())

model.add(tf.keras.layers.Conv2D(128, (2, 2), activation='relu'))
model.add(tf.keras.layers.BatchNormalization())

model.add(tf.keras.layers.Conv2D(128, (2, 2), activation='relu'))
model.add(tf.keras.layers.BatchNormalization())

model.add(tf.keras.layers.Flatten())

model.add(tf.keras.layers.Dense(2048, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(1024, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(1024, activation='relu'))

model.add(tf.keras.layers.Dense(len(unique_moves), activation='softmax'))

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

model.summary()

In [9]:
def data_generator(file_indices, batch_size=32):
    while True:
        for i in file_indices:
            X = np.load(f'{base_path}npy-data/data-X-{i}.npy')
            y = np.load(f'{base_path}npy-data/data-y-{i}.npy')

            y_encoded = encode_y(y)

            for j in range(0, len(X), batch_size):
                X_batch = X[j:j + batch_size]
                y_batch = y_encoded[j:j + batch_size]
                yield X_batch, y_batch



In [12]:
file_indices = range(1, 35)  
batch_size = 1024

steps_per_epoch = sum(np.load(f'{base_path}npy-data/data-X-{i}.npy').shape[0] for i in file_indices) // batch_size

train_generator = data_generator(file_indices, batch_size=batch_size)

In [13]:
model.fit(train_generator, steps_per_epoch=steps_per_epoch, epochs=10, validation_data=None)

Epoch 1/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m827s[0m 96ms/step - accuracy: 0.2355 - loss: 3.2834
Epoch 2/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m668s[0m 78ms/step - accuracy: 0.2943 - loss: 2.7786
Epoch 3/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m661s[0m 77ms/step - accuracy: 0.3193 - loss: 2.6144
Epoch 4/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m662s[0m 77ms/step - accuracy: 0.3352 - loss: 2.5222
Epoch 5/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m664s[0m 77ms/step - accuracy: 0.3460 - loss: 2.4599
Epoch 6/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m663s[0m 77ms/step - accuracy: 0.3544 - loss: 2.4133
Epoch 7/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m665s[0m 77ms/step - accuracy: 0.3615 - loss: 2.3758
Epoch 8/10
[1m8584/8584[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m663s[0m 77ms/step - accuracy: 0.3669 - loss: 2.3473


<keras.src.callbacks.history.History at 0x7c00a814d9f0>

In [14]:
model.save(f'{base_path}chess_model.h5')



In [6]:
model = tf.keras.models.load_model('./chess_model.h5')



In [23]:
def minimax(board, depth, alpha, beta, maximizingPlayer, model):
    if depth == 0 or board.is_game_over():
        return heuristic_evaluation(board)

    output = model.predict(board_to_nn_mat(board).reshape(1, 16, 8, 8), verbose=0)[0]
    sorted_indices = np.argsort(output)[::-1]
    predicted_moves = label_encoder.inverse_transform(sorted_indices)

    legal_moves = [move for move in predicted_moves if move in [str(m) for m in board.legal_moves]]

    if maximizingPlayer:
        maxEval = -np.inf
        for move in legal_moves:
            board.push(chess.Move.from_uci(move))
            eval = minimax(board, depth - 1, alpha, beta, False, model)
            board.pop()
            maxEval = max(maxEval, eval)
            alpha = max(alpha, eval)
            if beta <= alpha:
                break
        return maxEval
    else:
        minEval = np.inf
        for move in legal_moves:
            board.push(chess.Move.from_uci(move))
            eval = minimax(board, depth - 1, alpha, beta, True, model)
            board.pop()
            minEval = min(minEval, eval)
            beta = min(beta, eval)
            if beta <= alpha:
                break
        return minEval

def heuristic_evaluation(board):
    return sum([piece_value(piece) for piece in board.piece_map().values()])

def piece_value(piece):
    values = {chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3, chess.ROOK: 5, chess.QUEEN: 9, chess.KING: 0}
    return values[piece.piece_type] * (1 if piece.color == chess.WHITE else -1)

import chess

def get_best_move(fen, model, depth):
    board = chess.Board(fen)

    best_move = None
    best_value = -np.inf
    alpha = -np.inf
    beta = np.inf

    output = model.predict(board_to_nn_mat(board).reshape(1, 16, 8, 8), verbose=0)[0]
    sorted_indices = np.argsort(output)[::-1]
    predicted_moves = label_encoder.inverse_transform(sorted_indices)

    # Filter only legal moves
    legal_moves = [move for move in predicted_moves if move in [str(m) for m in board.legal_moves]]

    for move in legal_moves:
        board.push(chess.Move.from_uci(move))
        move_value = minimax(board, depth - 1, alpha, beta, False, model)
        board.pop()

        if move_value > best_value:
            best_value = move_value
            best_move = move

        alpha = max(alpha, best_value)
        if beta <= alpha:
            break

    return best_move

In [24]:
fen = 'r1bqkb1r/pppp1ppp/2n2n2/1B2p3/4P3/5N2/PPPP1PPP/RNBQK2R w KQkq - 4 4'
best_move = get_best_move(fen, model, depth=4)
print(f"The best move is: {best_move}")

KeyboardInterrupt: 

In [9]:
def get_best_move(board, model, label_encoder):
    output = model.predict(board_to_nn_mat(board).reshape(1, 16, 8, 8), verbose=0)[0]
    sorted_indices = np.argsort(output)[::-1]
    predicted_moves = label_encoder.inverse_transform(sorted_indices)
    legal_moves = [str(move) for move in board.legal_moves]

    for move in predicted_moves:
        if move in legal_moves:
            return move
    return None

board = chess.Board()

# Ask user for their color choice
user_color = input("Choose your color (w for white, b for black): ").strip().lower()

while not board.is_game_over():
    if board.turn == chess.WHITE:
        if user_color == 'w':
            # User's turn as white
            print(board)
            legal_moves = list(board.legal_moves)
            while True:
                player_move = input("Enter your move (in UCI format, e.g., e2e4): ").strip()
                if player_move in [move.uci() for move in legal_moves]:
                    board.push_uci(player_move)
                    break
                print("Invalid move. Please try again.")
        else:
            # Model's turn as white
            move = get_best_move(board, model, label_encoder)
            board.push_uci(move)
            print(f"Model (White) played: {move}")
    else:
        if user_color == 'b':
            # User's turn as black
            print(board)
            legal_moves = list(board.legal_moves)
            while True:
                player_move = input("Enter your move (in UCI format, e.g., e7e5): ").strip()
                if player_move in [move.uci() for move in legal_moves]:
                    board.push_uci(player_move)
                    break
                print("Invalid move. Please try again.")
        else:
            # Model's turn as black
            move = get_best_move(board, model, label_encoder)
            board.push_uci(move)
            print(f"Model (Black) played: {move}")

    print("\nCurrent board state:")
    print(board)
    print()

# Print the game result
result = board.result()
print(f"Game over: {result}")

Model (White) played: d2d4

Current board state:
r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . P . . . .
. . . . . . . .
P P P . P P P P
R N B Q K B N R

r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . P . . . .
. . . . . . . .
P P P . P P P P
R N B Q K B N R
Invalid move. Please try again.

Current board state:
r n b q k b n r
p p p . p p p p
. . . . . . . .
. . . p . . . .
. . . P . . . .
. . . . . . . .
P P P . P P P P
R N B Q K B N R

Model (White) played: c2c4

Current board state:
r n b q k b n r
p p p . p p p p
. . . . . . . .
. . . p . . . .
. . P P . . . .
. . . . . . . .
P P . . P P P P
R N B Q K B N R

r n b q k b n r
p p p . p p p p
. . . . . . . .
. . . p . . . .
. . P P . . . .
. . . . . . . .
P P . . P P P P
R N B Q K B N R

Current board state:
r n b q k b n r
p p p . p p p p
. . . . . . . .
. . . . . . . .
. . p P . . . .
. . . . . . . .
P P . . P P P P
R N B Q K B N R

Model (White) played: e2e3

Current board state:
r n b

KeyboardInterrupt: Interrupted by user