In [None]:
# %tensorflow_version 1.x
# !pip install h5py==2.10.0 --force-reinstall
import os

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
# model_path = "/content/drive/My Drive/deepChess/model_densenet.h5"
model_path = "/content/drive/My Drive/deepChess/model_densenet_tf2.h5"

In [None]:
!cp "/content/drive/My Drive/deepChess/dataset.7z" /content/dataset.7z
!7z e dataset.7z
!rm dataset.7z

In [None]:
!cp "/content/drive/My Drive/deepChess/stockfish_14.1_linux_x64.zip" /content/stockfish_14.1_linux_x64.zip
!unzip stockfish_14.1_linux_x64.zip
!rm stockfish_14.1_linux_x64.zip
!cp /content/stockfish_14.1_linux_x64/stockfish_14.1_linux_x64 /content/stockfish
!chmod +x stockfish

In [None]:
!pip install python-chess==0.31.3

In [None]:
import random
import numpy
import chess
import chess.engine


def champion(board, depth):
  with chess.engine.SimpleEngine.popen_uci('/content/stockfish') as ch:
    analysis = ch.analyse(board, chess.engine.Limit(depth=depth))
    score = analysis['score'].white().score()
    return score

def board_random(depth_max=200):
  board = chess.Board()
  current_depth = random.randrange(0, depth_max)
  # depth = depth_max

  for _ in range(current_depth):
    legal_moves = list(board.legal_moves)
    move = random.choice(legal_moves)
    board.push(move)
    if board.is_game_over():
      break
  return board

In [None]:
import random
random.seed(0)

In [None]:
chess.STARTING_FEN='r7/8/n7/2P4p/Kbkp2NP/8/4n3/8 w - - 0 1'

In [None]:
chess.STARTING_BOARD_FEN= 'r7/8/n7/2P4p/Kbkp2NP/8/4n3/8'

In [None]:
board = board_random()
#board = chess.Board('rn6/8/8/2P4p/Kbkp2NP/8/4n3/8 w - - 0 1')
board

In [None]:
print(champion(board, 1))

# Building the dataset

We need to represent the board in a way that can be trained so we are going to build a 3D matrix of shape (14, 8, 8). Any chess board has a 8x8 dimension, and to be trainable we are adding a new dimmension of size 14 that will hold 14 boards, 6 for the 6 types of white pieces, 6 for black pieces, and 2 more boards each one of them for the attacked cells for black and white respectebly.

In [None]:
coordinates = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4, 'f': 5, 'g': 6, 'h': 7}

def board_to_index(square):
  # for instance: h3 -> 17
  letter = chess.square_name(square)
  return 8 - int(letter[1]), coordinates[letter[0]]

def trainable_board(board):
  # the 3d matrix
  board_3d = numpy.zeros((14, 8, 8), dtype=numpy.int8)

  for piece in chess.PIECE_TYPES:
    for square in board.pieces(piece, chess.WHITE):
      idx = numpy.unravel_index(square, (8, 8))
      board_3d[piece - 1][7 - idx[0]][idx[1]] = 1
    for square in board.pieces(piece, chess.BLACK):
      idx = numpy.unravel_index(square, (8, 8))
      board_3d[piece + 5][7 - idx[0]][idx[1]] = 1

  # add attacked cells and valid moves
  temp = board.turn
  board.turn = chess.WHITE
  for move in board.legal_moves:
      i, j = board_to_index(move.to_square)
      board_3d[12][i][j] = 1
  board.turn = chess.BLACK
  for move in board.legal_moves:
      i, j = board_to_index(move.to_square)
      board_3d[13][i][j] = 1
  board.turn = temp

  return board_3d

In [None]:
trainable_board(board)

# Define the training model
We are using the densenet deep learning model:

In [None]:
import tensorflow.keras.models as models
import tensorflow.keras.layers as layers
import tensorflow.keras.utils as utils
import tensorflow.keras.optimizers as optimizers

In [None]:
import tensorflow.keras as K

In [None]:
def dense_block(X, nb_filters, growth_rate, layers):
    '''Builds a dense block as described in Densely Connected
    Convolutional Networks
    Args:
        X is the output from the previous layer
        nb_filters is an integer representing the number of filters in X
        growth_rate is the growth rate for the dense block
        layers is the number of layers in the dense block
    Important: applied bottleneck layers used for DenseNet-B
    Returns: The concatenated output of each layer within the Dense Block
             and the number of filters within the concatenated outputs,
             respectively
    '''
    w = K.initializers.he_normal()
    concat = X

    for i in range(layers):
        l1 = K.layers.BatchNormalization()(concat)
        l1 = K.layers.Activation("relu")(l1)

        l2 = K.layers.Conv2D(filters=(4 * growth_rate),
                             kernel_size=(1, 1),
                             padding="same",
                             data_format="channels_first",
                             kernel_initializer=w)(l1)

        l3 = K.layers.BatchNormalization()(l2)
        l3 = K.layers.Activation("relu")(l3)

        l4 = K.layers.Conv2D(filters=growth_rate,
                             kernel_size=(3, 3),
                             padding="same",
                             data_format="channels_first",
                             kernel_initializer=w)(l3)

        nb_filters += growth_rate
        concat = K.layers.Concatenate(axis=1)([concat, l4])

    return concat, nb_filters

In [None]:
def transition_layer(X, nb_filters, compression):
    '''Builds a transition layer as described in Densely Connected
    Convolutional Networks
    Args:
        X is the output from the previous layer
        nb_filters is an integer representing the number of filters in X
        compression is the compression factor for the transition layer
    Returns: The output of the transition layer and the number of filters
             within the output, respectively
    '''
    w = K.initializers.he_normal()

    filters = int(compression * nb_filters)

    l1 = K.layers.BatchNormalization()(X)
    l1 = K.layers.Activation("relu")(l1)

    l2 = K.layers.Conv2D(filters=filters,
                         kernel_size=(1, 1),
                         padding="same",
                         data_format="channels_first",
                         kernel_initializer=w)(l1)

    l3 = K.layers.AveragePooling2D(pool_size=(2, 2),
                                   strides=2,
                                   data_format="channels_first",
                                   padding="valid")(l2)

    return l3, filters

In [None]:
def custom_densenet(growth_rate=32, compression=1.0):
    '''Builds the DenseNet-121 architecture as described in Densely
    Connected Convolutional Networks
    Args:
        growth_rate is the growth rate
        compression is the compression factor
    Important: Input data should have shape (224, 224, 3)
    Returns: the keras model
    '''
    w = K.initializers.he_normal()
    inputs = K.Input(shape=(14, 8, 8))

    sub_layers = [16]

    # l1 = K.layers.BatchNormalization()(inputs)
    # l1 = K.layers.Activation("relu")(l1)

    # l2 = K.layers.Conv2D(filters=64,
    #                      kernel_size=(7, 7),
    #                      strides=2,
    #                      padding="same",
    #                      kernel_initializer=w)(l1)

    # l3 = K.layers.MaxPool2D(pool_size=(3, 3),
    #                         strides=2,
    #                         padding="same")(l2)

    l4 = inputs
    filters = 14
    for i in range(1):
        l4, filters = dense_block(l4, filters, growth_rate, sub_layers[i])
        # if i < 3:
        #     l4, filters = transition_layer(l4, filters, compression)

    l5 = K.layers.AveragePooling2D(pool_size=(8, 8),
                                   strides=1,
                                   data_format="channels_first",
                                   padding="valid")(l4)

    l6 = K.layers.Flatten()(l5)

    output = K.layers.Dense(units=1,
                            activation="sigmoid",
                            kernel_initializer=w)(l6)

    return K.Model(inputs=inputs, outputs=output)

In [None]:
model = custom_densenet(14, 0.5)
utils.plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=False)

# Training

In [None]:
import tensorflow.keras.callbacks as callbacks
from sklearn.preprocessing import minmax_scale

In [None]:
def get_dataset():
    container = numpy.load('dataset.npz')
    bd, v = container['b'], container['v']
    arr = numpy.isnan(v)
    print(numpy.count_nonzero(arr))
    print(v)
    print(v.max())
    print(v.min())
    # v = numpy.asarray((v - v.min()) / (v.max() - v.min() + 1e-4), dtype=numpy.float32) # normalization (0 - 1)
    # normalize [a, b]
    # a = -1
    # b = 1
    # v = numpy.asarray(((b - a) * (v - v.min()) / (v.max() - v.min())) + a, dtype=numpy.float32) # normalization (a - b)
    v = minmax_scale(v, feature_range=(0, 1))  # 0-1 scaling
    print(v)
    print(v.max())
    print(v.min())
    return bd, v

x_train, y_train = get_dataset()
x_split = int(len(x_train) * 0.8)
y_split = int(len(y_train) * 0.8)
x_train, x_valid = x_train[:x_split,:,:], x_train[x_split:,:,:]
y_train, y_valid = y_train[:y_split], y_train[y_split:]
print(x_train.shape)
print(x_valid.shape)
print(y_train.shape)
print(y_valid.shape)

In [None]:
def optimize_model(network, alpha, beta1, beta2):
    '''sets up Adam optimization for a keras model with categorical
    crossentropy loss and accuracy metrics
    Args:
        network is the model to optimize
        alpha is the learning rate
        beta1 is the first Adam optimization parameter
        beta2 is the second Adam optimization parameter
    Returns: None
    '''
    optimizer = K.optimizers.Adam(lr=alpha,
                                  beta_1=beta1,
                                  beta_2=beta2)
    network.compile(optimizer=optimizer,
                    loss="mean_squared_error",
                    metrics=["accuracy"])

In [None]:
def train_model(network, data, labels, batch_size, epochs,
                validation_data=None, early_stopping=False,
                patience=0, learning_rate_decay=False, alpha=0.1,
                decay_rate=1, save_best=False, filepath=None,
                verbose=True, shuffle=False):
    '''trains a model using mini-batch gradient descent
    Args:
        network is the model to train
        data is a numpy.ndarray of shape (m, nx) containing the input data
        labels is a one-hot numpy.ndarray of shape (m, classes) containing
               the labels of data
        batch_size is the size of the batch used for mini-batch gradient
                   descent
        epochs is the number of passes through data for mini-batch gradient
               descent
        validation_data is the data to validate the model with, if not None
        early_stopping is a boolean that indicates whether early stopping
                       should be used
                       - only performed if validation_data exists
                       - based on validation loss
        patience is the patience used for early stopping
        learning_rate_decay is a boolean that indicates whether learning rate
                            decay should be used
                            - only performed if validation_data exists
                            - performed using inverse time decay
                            - decays in a stepwise fashion after each epoch
                            - each time the learning rate updates, Keras
                              prints a message
        alpha is the initial learning rate
        decay_rate is the decay rate
        save_best is a boolean indicating whether to save the model after each
                  epoch if it is the best
                  - a model is considered the best if its validation loss is
                    the lowest that the model has obtained
        filepath is the file path where the model should be saved
        verbose is a boolean that determines if output should be printed
                during training
        shuffle is a boolean that determines whether to shuffle the batches
                every epoch. Normally, it is a good idea to shuffle, but for
                reproducibility, we have chosen to set the default to False.
    Returns: the History object generated after training the model
    '''
    callbacks = []
    if validation_data:
        if early_stopping:
            early_stop = K.callbacks.EarlyStopping(monitor='val_loss',
                                                   patience=patience)
            callbacks.append(early_stop)
        if learning_rate_decay:
            def schedule(step):
                '''stepwise inverse time decay function'''
                return alpha * 1 / (1 + decay_rate * step)
            lr_decay = K.callbacks.LearningRateScheduler(schedule=schedule,
                                                         verbose=1)
            callbacks.append(lr_decay)
        if save_best:
            save_b = K.callbacks.ModelCheckpoint(filepath=filepath,
                                                 monitor='val_loss',
                                                 save_best_only=True)
            callbacks.append(save_b)
    return network.fit(x=data,
                       y=labels,
                       batch_size=batch_size,
                       validation_data=validation_data,
                       epochs=epochs,
                       verbose=verbose,
                       callbacks=callbacks,
                       shuffle=shuffle)

In [None]:
if os.path.isfile(model_path):
    model = K.models.load_model(model_path)

In [None]:
lambtha = 0.0001
keep_prob = 0.95
alpha = 0.001
beta1 = 0.9
beta2 = 0.999
optimize_model(model, alpha, beta1, beta2)
batch_size = 64
epochs = 1000
train_model(model, x_train, y_train, batch_size, epochs,
            validation_data=(x_valid, y_valid), early_stopping=True,
            patience=3, learning_rate_decay=True, alpha=alpha,
            save_best=True, filepath=model_path)

# Playing with the AI

In [None]:
# min_max algorithm
def check_min_max(board):
  board_3d = trainable_board(board)
  board_3d = numpy.expand_dims(board_3d, 0)
  return model.predict(board_3d)[0][0]


def min_max(board, depth, alpha, beta, maximizing_player):
  if depth == 0 or board.is_game_over():
    return check_min_max(board)
  
  if maximizing_player:
    max_eval = -numpy.inf
    for move in board.legal_moves:
      board.push(move)
      eval = min_max(board, depth - 1, alpha, beta, False)
      board.pop()
      max_eval = max(max_eval, eval)
      alpha = max(alpha, eval)
      if beta <= alpha:
        break
    return max_eval
  else:
    min_eval = numpy.inf
    for move in board.legal_moves:
      board.push(move)
      eval = min_max(board, depth - 1, alpha, beta, True)
      board.pop()
      min_eval = min(min_eval, eval)
      beta = min(beta, eval)
      if beta <= alpha:
        break
    return min_eval


# function that gets the move from the neural network
def get_move(board, depth):
  max_move = None
  max_eval = -numpy.inf

  for move in board.legal_moves:
    board.push(move)
    eval = min_max(board, depth - 1, -numpy.inf, numpy.inf, False)
    #print(move)
    #print("Eval: ", eval)
    #print("champion: ", champion(board, 1))
    board.pop()
    if eval > max_eval:
      max_eval = eval
      max_move = move
  #print(max_eval)
  return max_move

In [None]:
from IPython.display import display, Image

In [None]:
board = chess.Board()

with chess.engine.SimpleEngine.popen_uci('/content/stockfish') as engine:
  engine.configure({"Skill Level": 1})
  while True:
    move = get_move(board, 1)
    board.push(move)
    #print(f'\n{board}')
    display(board)
    if board.is_game_over():
      break

    move = engine.analyse(board, chess.engine.Limit(time=1), info=chess.engine.INFO_PV)['pv'][0]
    board.push(move)
    #print(f'\n{board}')
    display(board)
    print("\n\n\n")
    if board.is_game_over():
      break