In [None]:
import chess
import numpy as np
import itertools
import tensorflow as tf
import concurrent.futures
import time
import random
import math
from IPython.display import clear_output
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor
from tensorflow.keras import layers, mixed_precision
from tensorflow.keras.losses import Huber

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

In [None]:
# Board and Input Conversion Functions
def board_to_input(board, history=1):
    pieces = {
        "P": 0,
        "N": 1,
        "B": 2,
        "R": 3,
        "Q": 4,
        "K": 5,
        "p": 6,
        "n": 7,
        "b": 8,
        "r": 9,
        "q": 10,
        "k": 11,
    }

    input_tensor = np.zeros((1, 8, 8, 12 * history), dtype=np.float32)

    board_states = [board]
    for _ in range(history - 1):
        if len(board.move_stack) > 0:
            move = board.pop()
            new_board = board.copy()
            board.push(move)
            board_states.insert(0, new_board)
        else:
            board_states.insert(0, None)

    for h, state in enumerate(board_states):
        if state is not None:
            for i in range(8):
                for j in range(8):
                    piece = state.piece_at(chess.square(i, j))
                    if piece:
                        input_tensor[0, i, j, pieces[str(piece)] + 12 * h] = 1

    return input_tensor

def moves_to_array(moves):
    move_array = np.zeros(4672)
    for move in moves:
        move_idx = move.from_square * 73 + move.to_square
        move_array[move_idx] = 1
    return move_array

def array_to_move(board, move_array):
    legal_moves = list(board.legal_moves)
    legal_move_probs = np.zeros(len(legal_moves))
    for i, move in enumerate(legal_moves):
        move_idx = move.from_square * 73 + move.to_square
        legal_move_probs[i] = move_array[move_idx]
    best_move_idx = np.argmax(legal_move_probs)
    return legal_moves[best_move_idx] if legal_moves else None

# Evaluation and Backpropagation Functions
def material_balance(board):
    piece_values = {
        'P': 1,
        'N': 3,
        'B': 3,
        'R': 5,
        'Q': 9,
        'K': 0,
        'p': -1,
        'n': -3,
        'b': -3,
        'r': -5,
        'q': -9,
        'k': 0,
    }

    balance = 0
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            balance += piece_values[str(piece)]

    return balance

def backpropagate(node, value):
    while node is not None:
        node.update(value)
        node = node.parent
        value = -value

@lru_cache(maxsize=None)
def cached_evaluate_board(board_fen):
    board = chess.Board(board_fen)
    return material_balance(board)

@tf.function(reduce_retracing=True)  
def model_predict(chess_model, board_input):
    board_input = tf.convert_to_tensor(board_input, dtype=tf.float32) 
    return chess_model(board_input) 

def evaluate(self, board):
    board_input = board_to_input(board).reshape(1, 8, 8, 12)
    move_probs, value_estimate = model_predict(self, board_input)
    return value_estimate.numpy().flatten()[0]

board = chess.Board()
input_tensor = board_to_input(board)
print(input_tensor.shape)

In [None]:
# MCTS Functions and Classes
class MCTSNode:
    def __init__(self, board, parent=None, move=None, prior=0):
        self.board = board
        self.parent = parent
        self.move = move
        self.children = []
        self.visits = 0
        self.value = 0
        self.prior = prior
        self.eval_value = self.evaluate_board()

    def add_child(self, child_node):
        self.children.append(child_node)

    def update(self, value):
        self.visits += 1
        self.value += value

    def expand_children_parallel(self, chess_model, num_threads=4):
        legal_moves = list(self.board.legal_moves)
        if len(legal_moves) == 0:
            return

        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            futures = [executor.submit(self._expand_child, move, chess_model) for move in legal_moves]

            for future in concurrent.futures.as_completed(futures):
                child = future.result()
                if child is not None:
                    self.children.append(child)

    def _expand_child(self, move, chess_model):
      child_board = self.board.copy()
      child_board.push(move)
      child_node = MCTSNode(child_board, self, move)
      child_node.evaluate(chess_model)
      return child_node


    def fully_expanded(self):
        return len(self.children) == len(list(self.board.legal_moves))

    def evaluate_board(self):
        return material_balance(self.board)

    def select_child(self, temperature):
        """
        Select the child node with the highest UCB (Upper Confidence Bound) score.
        """
        ucb_scores = [
            (child.additional_rewards / (child.num_visits + 1e-10) +
             math.sqrt(2 * math.log(self.num_visits + 1e-10) / (child.num_visits + 1e-10)))
            for child in self.children
        ]

        if temperature == 0:
            best_child_index = ucb_scores.index(max(ucb_scores))
        else:
            ucb_scores = [x**(1/temperature) for x in ucb_scores]
            total_score = sum(ucb_scores)
            probabilities = [x / total_score for x in ucb_scores]
            best_child_index = np.random.choice(len(self.children), p=probabilities)

        return self.children[best_child_index]

    def additional_rewards(self, child_node):
      rewards = 0
      from_square = child_node.move.from_square

      # Reward checkmate
      if child_node.board.is_checkmate():
          rewards += 100

      # Reward capturing material
      if child_node.board.is_capture(child_node.move):
          material_value = material_balance(child_node.board)
          if child_node.board.turn:  # White's turn
              rewards += material_value
          else:  # Black's turn
              rewards += material_value  # Increase reward for black captures

      # Reward putting king in check
      if child_node.board.is_check():
          rewards += 0.5

      return rewards

    def is_leaf(self):
        return len(self.children) == 0

    def evaluate(self, chess_model):
        self.visits += 1
        self.value = chess_model.evaluate(self.board)

    def best_child(self, c_param=1, eval_weight=0, temperature=1, noise=0, last_move=None):
        if self.children == []:
            return None

        choices_weights = [
            ((c.value / (c.visits + 1e-8) + c_param * (c.prior + np.random.randn() * noise) * np.sqrt(self.visits) / (1 + c.visits) + eval_weight * c.eval_value) / temperature)
            + self.additional_rewards(c)
            for c in self.children
        ]

        if last_move:
            for i, c in enumerate(self.children):
                if c.move == last_move:
                    choices_weights[i] -= 100
        return self.children[np.argmax(choices_weights)]

class OptimizedMCTSNode(MCTSNode):
    def evaluate_board(self):
        return cached_evaluate_board(self.board.fen())

def expand(node, chess_model):
    board_input = board_to_input(node.board).reshape(1, 8, 8, 12)
    move_probs, value_estimate = model_predict(chess_model, board_input)
    move_probs = move_probs.numpy().flatten()

    legal_moves = list(node.board.legal_moves)
    for move in legal_moves:
        new_board = node.board.copy()
        new_board.push(move)
        move_idx = move.from_square * 73 + move.to_square
        prior = move_probs[move_idx]
        child_node = MCTSNode(new_board, parent=node, move=move, prior=prior)
        node.add_child(child_node)

    return node.children[np.random.choice(len(node.children))]

def mcts(board, chess_model, num_simulations, temperature=1.0, noise=0.0, parallel_sims=True):
    root = MCTSNode(board, chess_model)

    if not parallel_sims:
        for _ in range(num_simulations):
            current_board = board.copy()
            node = root
            while not node.is_leaf():
                node = node.select_child(temperature)
                current_board.push(node.move)
            if not current_board.is_game_over():
                node.expand_children(chess_model)
    else:
        def simulate(root):
            current_board = board.copy()
            node = root
            while not node.is_leaf():
                node = node.select_child(temperature)
                current_board.push(node.move)
            if not current_board.is_game_over():
                node.expand_children_parallel(chess_model)

        with ThreadPoolExecutor(max_workers=num_simulations) as executor:
            executor.map(simulate, [root]*num_simulations)

    return root.best_child().move

In [None]:
# Chess Model and Training Functions
def residual_block(inputs, num_filters, kernel_size=(3, 3)):
    x = layers.Conv2D(num_filters, kernel_size, padding="same", activation=None)(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(num_filters, kernel_size, padding="same", activation=None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([inputs, x])
    x = layers.ReLU()(x)
    return x


def create_chess_model(num_res_blocks=19, num_filters=256):
    input_shape = (8, 8, 12)

    inputs = layers.Input(shape=input_shape, dtype=tf.float16)
    x = layers.Conv2D(num_filters, kernel_size=(3, 3), padding="same", activation="relu")(inputs)
    x = layers.BatchNormalization()(x)

    for _ in range(num_res_blocks):
        x = residual_block(x, num_filters)

    # Policy head
    policy = layers.Conv2D(2, kernel_size=(1, 1), padding="same", activation="relu")(x)
    policy = layers.BatchNormalization()(policy)
    policy = layers.Flatten()(policy)
    policy = layers.Dense(4672, activation="softmax")(policy)

    # Value head
    value = layers.Conv2D(1, kernel_size=(1, 1), padding="same", activation="relu")(x)
    value = layers.BatchNormalization()(value)
    value = layers.Flatten()(value)
    value = layers.Dense(1, activation="tanh")(value)

    model = tf.keras.Model(inputs=inputs, outputs=[policy, value])

    def evaluate(self, board):
        board_input = board_to_input(board).reshape(1, 8, 8, 12)
        move_probs, value_estimate = model_predict(self, board_input)
        return value_estimate.numpy().flatten()[0]

    model.evaluate = evaluate.__get__(model)
    return model

def compute_advantages(rewards, values, gamma=0.99, lambda_=0.95):
    advantages = np.zeros_like(rewards)
    gae = 0
    for t in reversed(range(len(rewards))):
        delta = rewards[t] + gamma * values[t + 1] - values[t]
        gae = delta + gamma * lambda_ * gae
        advantages[t] = gae
    return advantages

def ppo_loss_fn(advantages, old_probs, actions, logits, values, clip_epsilon=0.2, value_loss_coeff=0.5, entropy_coeff=0.01):
    prob_ratio = tf.exp(tf.nn.log_softmax(logits) - tf.stop_gradient(tf.nn.log_softmax(old_probs)))
    prob_ratio = tf.reduce_sum(prob_ratio * actions, axis=-1)
    clipped_prob_ratio = tf.clip_by_value(prob_ratio, 1 - clip_epsilon, 1 + clip_epsilon)
    surrogate_loss = -tf.reduce_mean(tf.minimum(prob_ratio * advantages, clipped_prob_ratio * advantages))

    value_loss = Huber()(values, old_values)

    entropy_loss = -tf.reduce_mean(tf.reduce_sum(tf.nn.softmax(logits) * tf.nn.log_softmax(logits), axis=-1))

    total_loss = surrogate_loss + value_loss_coeff * value_loss - entropy_coeff * entropy_loss
    return total_loss

chess_model = create_chess_model()
chess_model.summary()
chess_model.evaluate = evaluate.__get__(chess_model)

In [None]:
# Create separate models for black and white players
white_chess_model = create_chess_model()
black_chess_model = create_chess_model()

white_weights_file = "weights/white_chess_model_weights.h5"
black_weights_file = "weights/black_chess_model_weights.h5"

try:
    white_chess_model.load_weights(white_weights_file)
    black_chess_model.load_weights(black_weights_file)
except:
    pass

optimizer = 'adam'
loss = 'categorical_crossentropy'

white_chess_model.compile(optimizer=optimizer, loss=loss)
black_chess_model.compile(optimizer=optimizer, loss=loss)

def play_game(game_number, total_games, white_chess_model, black_chess_model, num_simulations=100, verbose=False, time_per_move=2):
    board = chess.Board()

    while not board.is_game_over():
        if verbose:
            clear_output(wait = True)
            print(f"\nGame {game_number + 1} out of {total_games}")
            print(board)

        if board.turn:  # White's turn
            move = mcts(board, white_chess_model, num_simulations=num_simulations)
        else:  # Black's turn
            move = mcts(board, black_chess_model, num_simulations=num_simulations)

        if verbose:
            print(move)
            time.sleep(time_per_move)

        board.push(move)

    result = board.result()

    if verbose:
        print("Game over. Result:", result)

    return result

def run_multiple_games(white_chess_model, black_chess_model, num_games, num_simulations=100, num_threads=4, time_per_move=2):
    game_results = []
    white_score = 0
    black_score = 0
    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [executor.submit(play_game, i, num_games, white_chess_model, black_chess_model, num_simulations, i == 0, time_per_move) for i in range(num_games)]

        completed_games = 0
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            game_results.append(result)

            if result == "1-0":
                white_score += 1
            elif result == "0-1":
                black_score += 1

            print(f"Game {completed_games + 1} out of {num_games}")
            print(f"White score: {white_score}, Black score: {black_score}")

            completed_games += 1
            games_left = num_games - completed_games
            elapsed_time = time.time() - start_time
            avg_time_per_game = elapsed_time / completed_games if completed_games > 0 else 0
            print(f"Elapsed time: {elapsed_time:.2f}s, Avg time per game: {avg_time_per_game:.2f}s")

    return game_results

num_games = 50
game_results = run_multiple_games(white_chess_model, black_chess_model, num_games, num_simulations=100, num_threads=4, time_per_move=2)

def print_game_results(game_results):
    for i, result in enumerate(game_results):
        print(f"Game {i+1}: {result}")

print_game_results(game_results)

white_chess_model.save_weights("weights/white_chess_model_weights.h5")
black_chess_model.save_weights("weights/black_chess_model_weights.h5")