<a href="https://colab.research.google.com/github/MarcelloCeresini/ChessBreaker/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
username = 'MarcelloCeresini'
repository = 'ChessBreaker'

In [2]:
# COLAB ONLY CELLS
try:
    import google.colab
    IN_COLAB = True
    !nvidia-smi             # Check which GPU has been chosen for us
    !rm -rf logs
    #from google.colab import drive
    #drive.mount('/content/drive')
    #%cd /content/drive/MyDrive/GitHub/
    !git clone https://github.com/{username}/{repository}.git
    %cd {repository}
    %ls
    !pip3 install anytree
except:
    IN_COLAB = False

NVIDIA-SMI has failed because it couldn't communicate with the NVIDIA driver. Make sure that the latest NVIDIA driver is installed and running.

fatal: destination path 'ChessBreaker' already exists and is not an empty directory.
/content/ChessBreaker
easy_model.py  model.py    [0m[01;34m__pycache__[0m/  requirements.txt
main.ipynb     [01;34mold_stuff[0m/  README.md     utils.py
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import numpy as np
import tensorflow as tf
import chess
from anytree import Node
from time import time
import matplotlib.pyplot as plt
from tqdm import tqdm

import utils
from utils import plane_dict, Config, x_y_from_position
from model import ResNet

conf = Config()
board = chess.Board()

# legal_moves = board.legal_moves
# for move in legal_moves:
#     print(move.uci())  
# print(legal_moves)


In [4]:
def uniform_tensor(x):
    return tf.fill(conf.BOARD_SHAPE, x)

def special_input_planes(board):                                    # not repeated planes
    return tf.transpose(tf.vectorized_map(                          # vectorized_map = map_fn but in parallel (just a tad faster) 
            uniform_tensor,
            tf.constant([
                int(board.turn                                 ),   # whose turn it is
                int(board.fullmove_number-1                    ),   # don't know why but it starts from 1 on move 1, just reduce it by one and now it's right (MAX 255, using uint8!!)
                int(board.has_kingside_castling_rights(True)   ),   # True for White
                int(board.has_queenside_castling_rights(True)  ),
                int(board.has_kingside_castling_rights(False)  ),   # False for Black
                int(board.has_queenside_castling_rights(False) ),
                int(board.halfmove_clock                       )    # number of moves from last capture / pawn move --> reaching 50 means draw
            ], dtype=conf.PLANES_DTYPE)
        ), [1,2,0])                                                 # transpose to have plane number last --> in order to concat them


def update_planes(current, board, board_history):

    if current == None: # root, initialize to zero
        current = tf.zeros([*conf.BOARD_SHAPE, conf.TOTAL_PLANES], dtype=conf.PLANES_DTYPE)
    
    planes = [] # since we cannot "change" a tensor after creating it, we create them one by one in a list and then stack them

    for color in range(2):                                                                                                  # for each color
        for piece_type in range(1, conf.N_PIECE_TYPES+1):                                                                   # for each piece type
            indices = []                                                                                                    # --> we save the position on the board in a list
            for position in list(board.pieces(piece_type, color)):                                                          # for each piece of that type
                indices.append(x_y_from_position(position))                                                                 # the function transforms a number (1-64) into a tuple (1-8, 1-8)
            if len(indices) == 0:
                tensor = uniform_tensor(tf.constant(0, dtype=conf.PLANES_DTYPE))
            else:
                values = np.array([1]*len(indices), dtype=conf.PLANES_DTYPE_NP) # simply "1" in a list with unit8 dtype
                tensor = tf.sparse.to_dense(tf.SparseTensor(dense_shape=[*conf.BOARD_SHAPE], indices=indices, values=values))   ### created as sparse because it's easier, needed as dense afterwards
            planes.append(tensor)
        planes.append(uniform_tensor(tf.constant(board_history.count(board_history[-1]), dtype=conf.PLANES_DTYPE)))         # adding a "repetition plane" for each color (simply count how many times the current (last) position has been encountered)

    # 1 stack
    current_planes = tf.transpose(tf.stack(planes), [1,2,0])                                                                # transpose them to have the planes as last dimension
    # 7 stacks (total 8 repetitions)
    old_planes = tf.slice(current, begin=[0,0,0], size=[*conf.BOARD_SHAPE, (conf.PAST_TIMESTEPS-1)*conf.REPEATED_PLANES])   # take the first 7 repetitions, slice them and paste them at the end of the new planes (last is discarded, as are special planes)
    
    return tf.concat([current_planes, old_planes, special_input_planes(board)], axis=-1)    # also concat the special planes


In [5]:
class MyNode(Node): # subclassing Node from Anytree to add some methods

    def update_action_value(self, new_action_value):                                                        # used during backtracking to update action value if the simulation reached the end through that node
        self.action_value += (new_action_value-self.action_value)/(self.visit_count+1)                      # simply the mean value, but computed iteratively

    def calculate_upper_confidence_bound(self, num_total_iterations=1):                                     # Q + U --> U proportional to P/(1+N) --> parameter decides exploration vs. exploitation
        return self.action_value + conf.expl_param(num_total_iterations)*self.prior/(1+self.visit_count)

    def calculate_move_probability(self, num_total_iterations=1):                                           # N^(1/tau) --> tau is a temperature parameter (exploration vs. exploitation)
        return self.visit_count**(1/conf.temp_param(num_total_iterations))


def MTCS(model, root_node, max_depth, num_restarts):
    '''
        As it is written, the search descends until it finds a leaf to be evalued, then restarts until it gathers a batch of evaluations
        The descent, however, does not restart from the just evalued leafs, but always from the beginning (easier implementation)
        This should IMPROVE exploration because the probability of entering a node is inversely proportional to the number of visits of that node
    '''

    print(root_node.name)
    INIT_ROOT = root_node
    # for i in tqdm(range(num_restarts)):                                                                           # number of times to explore up until max_depth
    flag_restart = True # true if you haven't reached the number of total restarts
    flag_batch = True   # true if the number of visited nodes is not BATCH_DIM
    flag_max_depth = True
    i = 0

    leaf_node_batch = []
    legal_moves_batch = []
    
    while flag_restart or flag_batch:
        if i >= num_restarts: flag_restart = False
        root_node = INIT_ROOT
        
        while root_node.depth <= max_depth and (flag_restart or flag_batch or flag_max_depth):  # while depth < max --> descend BUT STOP when you finished num_restarts AND the batch is empty AND you reach an already explored end of the tree with a subsequent descent
            legal_moves = list(root_node.board.legal_moves)
            # TODO: implement "complete game" inside MTCS --> stop the descent and give 1 as outcome (1/-1 depends)
            assert root_node.depth >= 0 and root_node.depth <= max_depth, "depth is wrong"          

            if root_node.is_leaf:                                                                           # if it's leaf --> need to pass the position (planes) through the model, to get priors (action_values) and outcome (state_value)
                flag_batch = True           # if we enter in a new leaf it means we are not at max depth, so we start batching --> we don't want to stop until the evaluation has been done

                leaf_node_batch.append(root_node)
                legal_moves_batch.append(legal_moves)
                
                if len(leaf_node_batch) == conf.BATCH_DIM:
                    flag_batch = False

                    plane_list = [root_node.planes for root_node in leaf_node_batch]
                    planes = tf.stack(plane_list)
                    full_moves_batch, outcome_batch = model(planes)
                    print(tf.shape(full_moves_batch))

                    fm_try, o_try = model(tf.expand_dims(plane_list[0], axis=0))
                    print(tf.shape(fm_try))
                    
                    for root_node, full_moves, outcome, legal_moves in zip(leaf_node_batch, tf.unstack(full_moves_batch), tf.unstack(outcome_batch), legal_moves_batch):
                        
                        print(tf.shape(full_moves), len(legal_moves))

                        priors = tf.boolean_mask(full_moves, utils.mask_moves(legal_moves))                         # boolean mask returns a tensor of only the values that were masked (as a list let's say)

                        root_node.action_value = outcome                                                            # the activation value of a leaf node is the state_value computed by the network

                        for move, prior in zip(legal_moves, priors):                                                # creating children
                            root_board_fen = root_node.board.fen()
                            new_board = chess.Board()
                            new_board.set_fen(root_board_fen)
                                                                                  # each with their board (by pushing the move)
                            new_board.push(move)
                            new_board_history = root_node.board_history.copy()                                      # and board history! (copy because list are pointers)
                            new_board_history.append(new_board.fen()[:-6])
                            MyNode(
                                move, 
                                parent = root_node,                                                                 # very important to build the tree
                                prior = prior,                                                                      # prior is the "initial" state_value of a node
                                visit_count = 0,                                                                    # initialize visit_count to 0
                                action_value = 0,
                                board = new_board, 
                                board_history = new_board_history,                                                  
                                planes = update_planes(root_node.planes, new_board, new_board_history)              # update the planes --> each node stores its input planes!
                            )

                    leaf_node_batch = []
                    legal_moves_batch = []

            else: # if it does not need to be evalued because it already has children 
                if root_node.depth < max_depth:                                                                 # if we are normally descending
                    print(root_node.depth)
                    children = root_node.children                                                               # get all the children (always != [])
                    
                    values = [child.calculate_upper_confidence_bound() for child in children]
                    root_node = children[np.argmax(values)]
                    root_node.visit_count += 1                                                                  # add 1 to the visit count of the chosen child
                else:
                    flag_max_depth = False
                    outcome = root_node.action_value    # needed for when depth=max_depth AND NOT LEAF (that means, already visited leaf) --> don't REDO the evaluation, it would give the same result, simply copy it from before
                    break                               # it will leave the while, max depth is reached
           
        print("num_restarts", i, " - flag_r", flag_restart, " - flag_b", flag_batch, " - flag_md", flag_max_depth)
        # barckpropagation of action value through the tree
        while root_node.parent != INIT_ROOT:
            # root node should be an already evalued leaf, at max depth (so OUTCOME has been set)
            assert root_node.depth >= 0 and root_node.depth <= max_depth, "depth is wrong"
            print
            root_node = root_node.parent
            root_node.update_action_value(outcome)
    
    return INIT_ROOT


def choose_move(root_node):
    children = root_node.children
    assert root_node.children != [], "No children, cannot choose move"
    p = [child.calculate_move_probability() for child in children] 
    p_norm = [i/sum(p) for i in p] # normalize probabilities
    root_node = np.random.choice(
        children, 
        p = p_norm  # choose the child proportionally to the number of times it has been visited (exponentiated by a temperature parameter)
    ) 
        
    root_node.parent = None # To detach the subtree and restart with the next move search

    return root_node


def complete_game(model):
    move_list = []
    board = chess.Board()
    board_history = [board.fen()[:-6]]                           # we remove the "en passant", "halfmove clock" and "fullmove number" from the fen --> position will be identical even if those values differ
    root_node = MyNode(
        "",                                                     # no name needed for initial position
        board = board,
        board_history = board_history,
        planes = update_planes(None, board, board_history),    # start from empty planes and fill them (usually you need previous planes to fill them)
        action_value=0)

    while not root_node.board.is_game_over(claim_draw=True) and root_node.board.fullmove_number <= conf.MAX_MOVE_COUNT:
        
        root_node = MTCS(model, root_node, max_depth = conf.MAX_DEPTH, num_restarts=conf.NUM_RESTARTS)                          # though the root node you can access all the tree
        root_node = choose_move(root_node)
        move_list.append(root_node.name)
    
    return move_list


In [6]:
model = ResNet()
print(tf.shape(conf.DUMMY_INPUT))
fm, ac = model(conf.DUMMY_INPUT)
print("full moves shape", fm.shape)
print("action values shape", ac.shape)
model.summary()

tf.Tensor([  8   8   8 119], shape=(4,), dtype=int32)
full moves shape (8, 8, 8, 73)
action values shape (8, 1)
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 8, 8, 119)]  0           []                               
                                                                                                  
 ResB1 (ResNetBlock)            (None, 8, 8, 128)    358784      ['input_1[0][0]']                
                                                                                                  
 ResB2 (ResNetBlock)            (None, 8, 8, 256)    1377536     ['ResB1[0][0]']                  
                                                                                                  
 ResB3 (ResNetBlock)            (None, 8, 8, 512)    5507584     ['ResB2[0][0]'] 

In [None]:
moves = complete_game(model)


tf.Tensor([ 8  8  8 73], shape=(4,), dtype=int32)
tf.Tensor([ 1  8  8 73], shape=(4,), dtype=int32)
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
0
tf.Tensor([ 8  8  8 73], shape=(4,), dtype=int32)
tf.Tensor([ 1  8  8 73], shape=(4,), dtype=int32)
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 20
tf.Tensor([ 8  8 73], shape=(3,), dtype=int32) 

  result = getattr(asarray(obj), method)(*args, **kwds)


[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0
1
num_restarts 0  - flag_r True  - flag_b False  - flag_md False
0


In [None]:
moves2 = moves.copy()
board = chess.Board()

In [None]:
board.push(moves2.pop(0))
board