# **MODEL TRAINING**

## **NEURAL-NETWORK ARCHITECTURE -** 

The architecture is inspired by modern neural chess engines and enables the system to both select strong moves and evaluate positions effectively without relying on traditional search algorithms.

This model uses a Residual Convolutional Neural Network because CNNs give good results where there need for detecting patterns like image classification and in this case a chess game. The output layers are Dense layers with two output heads: a policy head that predicts probabilities over 4672 possible chess moves and a value head that evaluates the position. 

Residual connections allow deeper feature learning without gradient degradation which is very common in chess games like, the common openings are played very frequently, hindering the learning process. The convolutional layers learn how pieces should be placed on the board, cordination between pieces and identify other patterns.

The input is reshaped to 8 x 8 x 18 matrix because for maximum time of the game same coloured pieces are on same side of the board and while applying filter, it should see the same coloured pieces together to learn positional play

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

**HELPER FUNCTIONS FROM "Chess_BOT_1_Data_Processing"**

In [None]:
# Numeric representation of each piece 'a'- for white pieces and 'A'- for black pieces

''' In this I went a bit out of convention. This is the correct convention 'A'- for white 
    and 'a'- for black.
    In later part the correct convention is followed and in code 'piece_weight' are multiplied 
    with a extra '-1' to rectify the error.
'''

num_piece = {'p':0,'n':1,'b':2,'r':3,'q':4,'k':5,
             'P':6,'N':7,'B':8,'R':9,'Q':10,'K':11}

piece_weight = {'p':1,'n':3,'b':3,'r':5,'q':9,'k':0,
             'P':-1,'N':-3,'B':-3,'R':-5,'Q':-9,'K':0}

In [None]:
# extracting each move as FEN from PGN

def pgn_to_fen(PGN):
  fen = []
  pgn = io.StringIO(PGN)
  game = chess.pgn.read_game(pgn)
  for move in game.mainline_moves():
    fen.append(b.fen())
  return fen

In [None]:
# extracting each move as FEN from PGN

def pgn_to_fen(PGN):
  fen = []
  pgn = io.StringIO(PGN)
  game = chess.pgn.read_game(pgn)
  for move in game.mainline_moves():
    fen.append(b.fen())
  return fen

In [None]:
# converting a FEN to a 12x8x8 matrix: 8x8 for board and x12 for each type of chess piece

def fen_to_matrix(FEN):
  matrix = np.zeros((12,8,8))
  b = chess.Board(FEN)
  for square,piece in b.piece_map().items():
    r = 7 - square//8
    c = square%8
    matrix[num_piece[str(piece)],r,c] = 1
  return matrix

In [None]:
# Calculating material points

def material_points(FEN):
  white_point = 0
  black_point = 0

  b = chess.Board(FEN)
  for square,piece in b.piece_map().items():
    if str(piece).isupper():
      white_point = white_point + piece_weight[str(piece)]
    elif str(piece).islower():
      black_point = black_point + piece_weight[str(piece)]
  return white_point,black_point

In [None]:
#  additional features to the board matrix

def add_board(matrix,turn,FEN):

  # side to move
  side_plane = np.ones((1,8,8)) * turn

  # castling rights
  castle = []
  if(board.has_kingside_castling_rights(chess.WHITE)):
    castling_plane = castle.append(np.ones((8,8)))
    castling_plane = castle.append(np.zeros((8,8)))
  else:
    castling_plane = castle.append(np.zeros((8,8)))
    castling_plane = castle.append(np.ones((8,8)))
  if(board.has_kingside_castling_rights(chess.BLACK)):
    castling_plane = castle.append(np.ones((8,8)))
    castling_plane = castle.append(np.zeros((8,8)))
  else:
    castling_plane = castle.append(np.zeros((8,8)))
    castling_plane = castle.append(np.ones((8,8)))

  # material points
  white_point,black_point = material_points(FEN)
  material_advantage = black_point + white_point
  material = np.full((1, 8, 8),material_advantage,dtype=np.float32) * (-1*turn)

  add_matrix = np.concatenate([matrix, side_plane, castle, material], axis=0)

  return add_matrix

**POLICY LABEL -** Encoding move played in a particular position. Makes an array of arrays of length 4672(a single move) to record all moves of a game. The value at index of move (0-4671) played in a particular position is made 1 and rest all 0.

In [None]:
# Gives index(out of 4672) for the move played in a position

def move_played_idx(move):

  # indexing move
  from_sq = move.from_square
  to_sq = move.to_square
  from_rank = chess.square_rank(from_sq)
  from_file = chess.square_file(from_sq)
  to_rank = chess.square_rank(to_sq)
  to_file = chess.square_file(to_sq)

  d_file = to_file - from_file
  d_rank = to_rank - from_rank

  # sliding moves
  slide_dir = [(0,1),(1,1),(1,0),(1,-1),(0,-1),(-1,-1),(-1,0),(-1,1)]

  for dir_idx,(dx,dy) in enumerate(slide_dir):
    for dist in range(1,8):
        if d_file == dx*dist and d_rank == dy*dist:
        move_type = dir_idx*7 + dist - 1
        return from_sq*73 + move_type

  # knight move
  knight_dir = [(1,2),(2,1),(2,-1),(1,-2),(-1,-2),(-2,-1),(-2,1),(-1,2)]
  for dir_idx,(dx,dy) in enumerate(knight_dir):
    if d_file == dx and d_rank == dy:
      move_type = 56 + dir_idx
      return from_sq*73 + move_type

  # underpromotion
  if move.promotion is not None:
    promo_map = {chess.KNIGHT:0,chess.BISHOP:1,chess.ROOK:2}
    if move.promotion in promo_map:
      is_capture = (d_file != 0)
      move_type = 64 + (is_capture*3) + promo_map[move.promotion]
      return from_sq*73 + move_type

  return None

In [None]:
# Policy label for a complete game

def policy_label(PGN):
    pgn = io.StringIO(PGN)
    game = chess.pgn.read_game(pgn)
    policy = []

    for move in game.mainline_moves():
        m = np.zeros(4672, dtype=np.float32)
        idx = move_played_idx(move)
        m[idx] = 1.0
        policy.append(m)

    policy = np.stack(policy, axis=0)
    return policy

**VALUE LABEL -** Encoding the evaluation of a position as the result of the game. White won = +1, Black won = -1, Draw = 0. It is a scalar and its value in each position is assumed to be same as the result of the game. It is assumed that at high level moves played are generally good, their chances of making blunders is less. This is not a good practice for position evaluation but can be considered fine at beginner level 

In [None]:
# Value label 

def value_label(result,turn):
    return result * turn

## **CREATING BATCHES -**
The model will be trained on positions i.e. give a particular position as matrix and map it to policy and value labels.

A single game contains 30+ moves so training can't be performed at once. Therefore, model is trained in batches. 

In [None]:
# Processing a single game: from PGN extract FENs and form policy and value label for each FEN

def process_single_game(pgn, result):
    fens = pgn_to_fen(pgn)
    policies = policy_label(pgn)

    X_list, P_list, V_list = [], [], []

    turn = 1;
    for fen, policy in zip(fens, policies):
        #board = chess.Board(fen)
        #turn = 1 if board.turn else -1

        piece_matrix = fen_to_matrix(fen)
        input_tensor = add_board(piece_matrix, turn, fen)

        value = value_label(result, turn)
        turn = -1 * turn

        X_list.append(input_tensor)
        P_list.append(policy)
        V_list.append(value)

    return (
        np.asarray(X_list, dtype=np.float32),
        np.asarray(P_list, dtype=np.float32),
        np.asarray(V_list, dtype=np.float32)
    )

In [None]:
# Creating a batch of games

def game_batch_to_position_batch(df_batch):
    X_all = []
    policy_all = []
    value_all = []

    for _, row in df_batch.iterrows():
        Xg, Pg, Vg = process_single_game(row['PGN'], row['Result'])

        X_all.append(Xg)
        policy_all.append(Pg)
        value_all.append(Vg)

    X = np.concatenate(X_all, axis=0)
    Y_policy = np.concatenate(policy_all, axis=0)
    Y_value = np.concatenate(value_all, axis=0)

    return X, Y_policy, Y_value

In [None]:
# Creating a batch of positions from multiple games

def train_generator(train_df, start_game=16, games_per_batch=8, shuffle=False):
    n = len(train_df)

    # Decide order ONCE
    if shuffle:
        df = train_df.sample(frac=1).reset_index(drop=True)
    else:
        df = train_df.reset_index(drop=True)

    current = start_game

    while current < n:
        df_batch = df.iloc[current: current + games_per_batch]

        X, Y_policy, Y_value = game_batch_to_position_batch(df_batch)

        yield X, {
            "policy_head": Y_policy,
            "value_head": Y_value
        }

        current += games_per_batch

## **BUILDING A MODEL**

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers

In [None]:
# Creating Residual block

def residual_block(x, filters, l2_reg=3e-4):
    shortcut = x

    x = layers.Conv2D(
        filters,
        kernel_size=3,
        padding='same',
        use_bias=False,
        kernel_regularizer=regularizers.l2(l2_reg)
    )(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.Conv2D(
        filters,
        kernel_size=3,
        padding='same',
        use_bias=False,
        kernel_regularizer=regularizers.l2(l2_reg)
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Add()([shortcut, x])
    x = layers.ReLU()(x)

    return x

In [None]:
# Building chess model

def build_chess_model(
    input_shape=(18, 8, 8),
    num_res_blocks=4,
    num_filters=128,
    policy_size=4672
):
    
    inputs = layers.Input(shape=input_shape)

    x = layers.Permute((2, 3, 1))(inputs)  # (8, 8, 18)

    x = layers.Conv2D(
        num_filters,
        kernel_size=3,
        padding='same',
        use_bias=False,
        kernel_regularizer=regularizers.l2(1e-4)
    )(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    for _ in range(num_res_blocks):
        x = residual_block(x, num_filters)

    # Policy Head

    p = layers.Conv2D(
        2,
        kernel_size=1,
        use_bias=False,
        kernel_regularizer=regularizers.l2(1e-4)
    )(x)
    p = layers.BatchNormalization()(p)
    p = layers.ReLU()(p)
    p = layers.Flatten()(p)
    p = layers.Dense(
        policy_size,
        activation='softmax',
        name='policy_head'
    )(p)

    # Value Head

    v = layers.Conv2D(
        1,
        kernel_size=1,
        use_bias=False,
        kernel_regularizer=regularizers.l2(1e-4)
    )(x)
    v = layers.BatchNormalization()(v)
    v = layers.ReLU()(v)
    v = layers.Flatten()(v)
    v = layers.Dense(
        256,
        activation='relu',
        kernel_regularizer=regularizers.l2(1e-4)
    )(v)
    v = layers.Dense(
        1,
        activation='tanh',
        name='value_head'
    )(v)

    model = models.Model(inputs=inputs, outputs=[p, v])
    return model

In [None]:
# Compiling model

model = build_chess_model(
    num_res_blocks=4,
    num_filters=128
)

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss={
        "policy_head": tf.keras.losses.CategoricalCrossentropy(),
        "value_head": tf.keras.losses.MeanSquaredError()
    },
    loss_weights={
        "policy_head": 1.0,
        "value_head": 1.0
    }
)

model.summary()

## **TRAINING MODEL**

Train on first batch and Save and Download model

In [None]:
# Loading DataFrame

train_df = pd.read_pickle('/kaggle/input/chess-train-dataset/train_df.pkl')

print(train_df.shape)
print(type(train_df))

In [None]:
# Fitting model

model.fit(
    train_generator(train_df, games_per_batch=8),
    steps_per_epoch=len(train_df) // 8,
    epochs=2
    )

In [None]:
X, Yp, Yv = game_batch_to_position_batch(train_df.iloc[:2])

print(X.shape)
print(Yp.shape)
print(Yv.shape)

In [None]:
import pickle
import os

SAVE_DIR = "/kaggle/working"
model.save(os.path.join(SAVE_DIR, "chess_model.keras"))

!ls /kaggle/working/

**LOAD, TRAIN, SAVE, DOWNLOAD** and repeat this process for entire dataset. Keep incrementing the value of 'start_game' by 'games_per_batch' each time before training