<a href="https://colab.research.google.com/github/AlexCergeev/Analog-of-AlphaZero/blob/main/Analog-of-AlphaZero.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Start on GPU (Runtime shape High-RAM)**

# Import required libraries

You need to upload 3 files: 

1.   data.pgn
2.   stockfish.csv
3.   stockfish_13_linux_x64_bmi2.zim



In [None]:
!pip install python-chess==0.31.3

In [None]:
!unzip /content/stockfish_13_linux_x64_bmi2.zip
!chmod +x /content/stockfish_13_linux_x64_bmi2/stockfish_13_linux_x64_bmi2

In [None]:
import chess
import chess.engine
import chess.pgn
import random
from PIL import Image, ImageFilter
import numpy as np
import requests
from io import BytesIO
import matplotlib.pyplot as plt
from scipy import signal
import tensorflow as tf
import time
import csv

In [None]:
import tensorflow.keras.models as models
import tensorflow.keras.layers as layers
import tensorflow.keras.utils as utils
import tensorflow.keras.optimizers as optimizers
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, \
                                    Dense, \
                                    MaxPool2D,\
                                    Dropout, \
                                    Flatten, \
                                    BatchNormalization,\
                                    Input
from tensorflow.keras.datasets import mnist
import tensorflow.keras.callbacks as callbacks

# GPU Optimization

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
    # Currently, memory growth needs to be the same across GPUs
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.experimental.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Memory growth must be set before GPUs have been initialized
    print(e)

In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# this function will create our f(x) (score)
def stockfish(board):
  with chess.engine.SimpleEngine.popen_uci('/content/stockfish_13_linux_x64_bmi2/stockfish_13_linux_x64_bmi2') as sf:
    for i in range(19, 0, -1):    
      result = sf.analyse(board, chess.engine.Limit(depth=i))
      score = result['score'].white().score()
      if score is not None:
        break
    return score

In [None]:
board = chess.Board()
board

**The AI will learn how to give a accurate prediction of *f(x)* when we present a *x* never seen before.**

*board -> score*

# Creating the dataset

Now we need to convert the board representation to something meaningful.

A 3d matrix of sizes **14 x 8 x 8**



In [None]:
squares_index = {
  'a': 0,
  'b': 1,
  'c': 2,
  'd': 3,
  'e': 4,
  'f': 5,
  'g': 6,
  'h': 7
}


# example: h3 -> 17
def square_to_index(square):
  letter = chess.square_name(square)
  return 8 - int(letter[1]), squares_index[letter[0]]


def split_dims(board):
  # this is the 3d matrix
  board3d = np.zeros((14, 8, 8), dtype=np.int8)

  # here we add the pieces's view on the matrix
  for piece in chess.PIECE_TYPES:
    for square in board.pieces(piece, chess.WHITE):
      idx = np.unravel_index(square, (8, 8))
      board3d[piece - 1][7 - idx[0]][idx[1]] = 1
    for square in board.pieces(piece, chess.BLACK):
      idx = np.unravel_index(square, (8, 8))
      board3d[piece + 5][7 - idx[0]][idx[1]] = 1

  # add attacks and valid moves too
  # so the network knows what is being attacked
  aux = board.turn
  board.turn = chess.WHITE
  for move in board.legal_moves:
      i, j = square_to_index(move.to_square)
      board3d[12][i][j] = 1
  board.turn = chess.BLACK
  for move in board.legal_moves:
      i, j = square_to_index(move.to_square)
      board3d[13][i][j] = 1
  board.turn = aux

  return board3d

#This function was made by owner of chennel "Digital Secrets"
#(original code: https://colab.research.google.com/drive/1GSeBQdyZH_nHvl52XW0uhXV3Cslho24O)

Now, all we have to do is call **random_board()** to create random boards, **stockfish()** to get a score for how good each board is for white.

Then we convert each board to a 3d matrix using **split_dims()**, now creating the dataset is easy!

# TensorFlow!

In [None]:
def build_model(conv_depth):
  board3d = layers.Input(shape=(14, 8, 8))

  # adding the convolutional layers
  x = board3d
  x = layers.Conv2D(conv_depth, kernel_size=(3,3), padding='same', activation='relu', data_format='channels_first')(x)
  x = layers.Conv2D(conv_depth, kernel_size=(3,3), padding='same', activation='relu',data_format='channels_first')(x)
  x = layers.MaxPool2D(pool_size=(2,2), data_format='channels_first')(x)
  x = layers.Conv2D(conv_depth*2, kernel_size=(3,3), padding='same', activation='relu',data_format='channels_first')(x)
  x = layers.Conv2D(conv_depth*2, kernel_size=(3,3), padding='same', activation='relu',data_format='channels_first')(x)
  x = layers.MaxPool2D(pool_size=(2,2), data_format='channels_first')(x)
  x = layers.Conv2D(conv_depth*4, kernel_size=(2,2), padding='same', activation='relu', data_format='channels_first')(x)
  x = layers.Conv2D(conv_depth*4, kernel_size=(2,2), padding='same', activation='relu', data_format='channels_first')(x)
  x = layers.Flatten()(x)
  x = layers.Dense(128, 'relu')(x)
  x = layers.Dropout(0.3)(x)
  x = layers.Dense(64, 'relu')(x)
  x = layers.Dense(1, 'sigmoid')(x)
  return models.Model(inputs=board3d, outputs=x)

  
model = build_model(16)
model.summary()

Skip connections (residual network) will likely improve the model for deeper connections. If you want to test the residual model, check the code below.

# It's training time!

In [None]:
def get_dataset():
	container = np.load('/content/drive/My Drive/Chess/dataset.npz')
	b, v = container['b'],container['v']
	v = np.asarray(v / abs(v).max() / 2 + 0.5, dtype=np.float32) # normalization (0 - 1)
	return b, v

x_train, y_train = get_dataset()
print(type(x_train))
print(y_train.shape)
#TODO (y_train+50)/100

In [None]:
x_train.shape

In [None]:
y_train.shape

In [None]:
i= 0
b = 0
x_train = []
count = 4126383-3

with open("/content/data.pgn") as pgn:
    while 1:    
      try:
        game = chess.pgn.read_game(pgn)
      except Exception:
        break
      board = game.board()
      for move in game.mainline_moves():
        board.push(move)
        x_train.append(split_dims(board))
        if b > count:
          break
        b = b + 1
        
      if b > count:
          break

x_train = np.asarray(x_train,dtype=np.int16)
x_train.shape

In [None]:
a = 0
count = 4126383-3
y_train = []
with open('/content/stockfish — копия.csv', newline='') as File:  
    reader = csv.reader(File)
    for row in reader:
        row1 = str(row[1]).split()
        if row1 == ['MoveScores']:
          continue 
        #print(row1)
        for i in str(row[1]).split():
           y_train.append(i)
           if a > count:
             break
           a = a + 1
        if a > count:
             break


y_train = np.asarray(y_train,dtype=np.float32)
y_train.shape

In [None]:
np.save('example/dataXtrain_lite', x_train)

In [None]:
y_train = np.asarray(y_train / abs(y_train).max() / 2 + 0.5, dtype=np.float32)

In [None]:
i = 1
All_history = []
while i < 2:
  model = build_model(14)
  model.compile(optimizer=optimizers.Adam(), loss='mean_squared_error')
  model.summary()
  history = model.fit(x_train, y_train,   # TODO x_train, y_train change to ai traing 
            batch_size=1024,
            epochs=40,
            verbose=1,
            validation_split=0.1,
            callbacks=[callbacks.ReduceLROnPlateau(monitor='loss', patience=10),
                      callbacks.EarlyStopping(monitor='loss', patience=15)])

  All_history.append(history)

  i = i + 5

In [None]:
b = 32
for history in All_history:
  plt.xlabel('Epochs')
  plt.ylabel("Loss")
  plt.plot(history.epoch, history.history["loss"],  label='loss %s' %(b))
  plt.grid()
  plt.plot(history.epoch, history.history["val_loss"],  label='val_loss %s' %(b))
  plt.legend()
  plt.xlim([0, max(history.epoch)+20])
  b = b + 5
plt.show()

In [None]:
i = 0 #len(All_history)-1
plt.xlabel('Epochs')
plt.ylabel("Loss")
plt.plot(All_history[i].epoch, All_history[i].history["loss"],  label='loss %s' %(b))
plt.grid()
plt.plot(All_history[i].epoch, All_history[i].history["val_loss"],  label='val_loss %s' %(b))
plt.legend()
plt.xlim([0, max(All_history[i].epoch)])
b = b + 2 

# Playing with the AI

In [None]:
# used for the minimax algorithm
def minimax_eval(board):
  board3d = split_dims(board)
  board3d = np.expand_dims(board3d, 0)
  return model.predict(board3d)[0][0]


In [None]:
with chess.engine.SimpleEngine.popen_uci('stockfish-5-linux/Linux/stockfish_14053109_x64') as engine:
  while True:
    h = []
    move_answer = []
    depth = 19
    start = stockfish(board, depth)
    d = depth
    while(start is None):
      d = d - 1
      start = stockfish(board, d)
      
    for move in board.legal_moves:
      board.push(move)
      start_next = stockfish(board, depth)
      d = depth
      while (start_next is None):
        d = d - 1
        start_next = stockfish(board, d)      
      h.append(start - start_next)#minimax_eval(board))
      move_answer.append(move)
      board.pop()     
    board.push(move_answer[h.index(min(h))])
    print(f'\n{board}')
    if board.is_game_over():
      break

    move = engine.analyse(board, chess.engine.Limit(time=0.1), info=chess.engine.INFO_PV)['pv'][0]
    board.push(move)
    print(f'\n{board}')
    if board.is_game_over():
      break

In [None]:
moves

In [None]:
board = chess.Board()

In [None]:
board.variation_san(moves)

In [None]:
moves = []

In [None]:
moves.append(board.pop())

In [None]:
board


In [None]:
board.push_san("a5")

board

In [None]:
start_fish = (stockfish(board)+50)/100
h = []
move_answer = []
for move in board.legal_moves:
    board.push(move)
    h.append(start_fish - (stockfish(board)+50)/100)
    move_answer.append(move)
    #print(move,abs(start_fish - (stockfish(board)+50)/100))
    board.pop()
print(move_answer[h.index(min(h))])

In [None]:
def get_move(board):
  h = []
  move_answer = []
  start = minimax_eval(board)
  for move in board.legal_moves:
      board.push(move)
      h.append(abs(start - minimax_eval(board)))
      move_answer.append(move)
      #print(move,abs(start - minimax_eval(board)))
      board.pop()
  return move_answer[h.index(max(h))]

In [None]:
for i in range(23):
  start_time = time.time()
  print(i+1, "depth ---  ",stockfish_depth(board, i),"--- %s seconds ---" % (time.time() - start_time))
  print()

In [None]:
a = [abs]
for i in range(23):
  start_time = time.time()
  print(i+1, "depth ---  ", a.append(split_dims(board)),"--- %s seconds ---" % (time.time() - start_time))
  print()

In [None]:
while 1:
  for i in range(23):
    start_time = time.time()
    print(i+1, "depth ---  ",stockfish_depth(board, i),"--- %s seconds ---" % (time.time() - start_time))
    print()

In [None]:

h = []
move_answer = []
start = minimax_eval(board)
for move in board.legal_moves:
    start_time = time.time()
    board.push(move)
    h.append(start - minimax_eval(board))
    move_answer.append(move)
    print(move,abs(start - minimax_eval(board)) ,"--- %s seconds ---" % (time.time() - start_time))
    board.pop()
print(move_answer[h.index(min(h))])