# Shared stuff

Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Shared code for encoding & decoding chess stuff

In [None]:
import chess

x = 0
MOVES = {}
REVERSE_MOVES = {}

for i in range(64):
  for j in range(64):
    MOVES[x] = (i, j)
    REVERSE_MOVES[(i, j)] = x
    x += 1

POSSIBLE_MOVE_COUNT = x

def piece_to_int(piece):
  PIECE_MAP = {
      "p" : 1,
      "b" : 2,
      "n" : 3,
      "r" : 4,
      "q" : 5,
      "k" : 6
  }

  return 0 if piece is None else PIECE_MAP[piece.symbol().lower()]

def encode_board(board):
  features = []

  for i in range(64): #7 features
    piece = board.piece_at(i)
    features.append(piece_to_int(piece))

  features.append(7 if board.turn else 8) #2 features here
  features.append(9 if board.has_legal_en_passant() else 10) #2 features here

  features.append(11 if board.has_kingside_castling_rights(board.turn) else 12) #2 features here
  features.append(13 if board.has_queenside_castling_rights(board.turn) else 14) #2 features here

  features = tf.one_hot(features, 7 + 2 + 2 + 2 + 2, dtype=np.uint8)

  return features

def encode_move(move):
  return tf.one_hot([ REVERSE_MOVES[(move.from_square, move.to_square)] ], POSSIBLE_MOVE_COUNT + 1, dtype=np.uint8)

# For generating data

Download dependencies

In [None]:
!pip install python-chess

!wget https://cdn.lbryplayer.xyz/api/v4/streams/free/Lichess-Elite-Database/b0f01856c521a5f782f8ce4ec6275054e71cf664/3a71ac?download=true -O db.7z

Unzip chess games

In [None]:
!mkdir db
!7z e db.7z -odb/

Generate training data

In [None]:
!mkdir TRAIN

import gc
import tensorflow as tf
import numpy as np
import os
import chess.pgn

gc.collect() #Run GC

PATH = './db/'
SAVE_BATCH_SIZE = 8000
MAX_GAME_COUNT = 500000

files = os.listdir(PATH)

game_count = 0
current_batch = 0
X = []
Y = []

def process_file(f):
  global game_count, current_batch, X, Y

  while True:
    game_count += 1

    if game_count % SAVE_BATCH_SIZE == 0 and game_count != 0: #Save batch
      current_batch += 1
      save_batch(X, Y, current_batch)
      print(f'Saving batch #{current_batch}')

      #Reset
      X = [] 
      Y = []
      gc.collect() #Run GC

    
    if game_count >= MAX_GAME_COUNT:
      return

    game = chess.pgn.read_game(f)
    
    if game is None:
      break

    print('Processing game', game_count)

    board = game.board()
    
    for move in game.main_line():
      X.append(encode_board(board))
      Y.append(encode_move(move))
      board.push(move)

def save_batch(X, Y, batch):
  np.savez_compressed(f'TRAIN/{batch}', x=X, y=Y)


for file in files:
  if not file.endswith('.pgn'):
    continue

  with open(PATH + file, 'r') as f:
    if game_count >= MAX_GAME_COUNT:
      break

    print('Processing file', file)
    process_file(f)

print('Process done')

Copy training data to Google Drive

In [None]:
!zip train.zip -5 -r TRAIN/
!mv train.zip /content/gdrive/MyDrive

# For training

Fetch training data from Google Drive then unzip it.

In [None]:
!cp /content/gdrive/MyDrive/train.zip .
!unzip -o train.zip

And run training code (can take some time)

In [None]:
%tensorflow_version 2.x
import tensorflow as tf
import numpy as np
import os
import gc

gc.collect() #Run GC

train_data = os.listdir('TRAIN/')

print("TensorFlow version: {}".format(tf.__version__))
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)
model = tf.keras.Sequential([
  tf.keras.layers.Dense(1020, activation=tf.nn.leaky_relu, input_shape=(1020, )), 
  tf.keras.layers.Dense(750, activation=tf.nn.leaky_relu),
  tf.keras.layers.Dense(750, activation=tf.nn.leaky_relu),
  tf.keras.layers.Dense(500, activation=tf.nn.leaky_relu),
  tf.keras.layers.Dense(4097)
])

model.compile(optimizer='Adam', loss=loss_object)

def loss(model, x, y, training):
  y_ = model(x, training=training)
  return loss_object(y_true=y, y_pred=y_)

def train_batch(X, Y):
  global model
  model.fit(x=X, y=Y)
  pass

X = []
Y = []

for file in train_data:
  gc.collect() #Run GC
  
  print('Loading data...')
  data = np.load('TRAIN/' + file)
  X, Y = data['x'], data['y']

  X = X.reshape((len(X), 1020, ))
  Y = Y.reshape((len(Y), 4097, ))

  print('Training for file', file)

  train_batch(X, Y)

print('Training done, saving model')
model.save('trained_model', save_format='h5', overwrite=True)




Zip model and copy it to Google Drive

In [None]:
!zip model.zip trained_model
!mv model.zip /content/gdrive/MyDrive/

# Playing with Model

Fetch and unzip the model

In [None]:
!cp /content/gdrive/MyDrive/model.zip .
!unzip model.zip

Archive:  model.zip
  inflating: trained_model           


Load model

In [None]:
%tensorflow_version 2.x
import tensorflow as tf
import numpy as np
import chess
import time

model = tf.keras.models.load_model('trained_model', custom_objects={'leaky_relu': tf.nn.leaky_relu})

model.summary()

In [None]:
def predict(board):
  X = np.array(encode_board(board))
  X = X.reshape((1, 1020, ))
  org_pred = list(model.predict(X)[0])

  pred = list(org_pred)

  pred.sort()
  pred = pred[::-1] #reverse array

  i = 0
  for p in pred:
    if i >= len(MOVES):
      continue

    move_from, move_to = MOVES[org_pred.index(p)]
    move = chess.Move(move_from, move_to)

    if move in board.legal_moves:
      break

    i += 1

  return move

In [None]:
import time
from IPython.display import clear_output, display

board = chess.Board()

while True:
  if board.is_game_over():
    print("Game over")
    break
  board.push(predict(board))
  display(board)
  m = input("Move:")
  board.push(chess.Move.from_uci(m))
  clear_output(wait=True)
  
