In [3]:
import pandas as pd
import numpy as np
import chess.pgn
import chess
from random import randint
from random import shuffle
import tensorflow as tf
from tensorflow import keras
from keras import layers, callbacks, losses
from keras import regularizers
from keras import backend as K

In [4]:
def fen2bitstring(fen):
    board = chess.Board(fen)
    bitboard = np.zeros(64*6*2+5)

    piece_idx = {'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5}

    for i in range(64):
        if board.piece_at(i):
            color = int(board.piece_at(i).color) + 1
            bitboard[(piece_idx[board.piece_at(i).symbol().lower()] + i * 6) * color] = 1

    bitboard[-1] = int(board.turn)
    bitboard[-2] = int(board.has_kingside_castling_rights(True))
    bitboard[-3] = int(board.has_kingside_castling_rights(False))
    bitboard[-4] = int(board.has_queenside_castling_rights(True))
    bitboard[-5] = int(board.has_queenside_castling_rights(False))

    return bitboard
    

In [5]:
def randPosiGeneratorFromGame(game):
    random_positions_array = []
    positions = []
    board = game.board()

    for move in game.mainline_moves():
        if not board.is_capture(move):
            position = board.fen()
            bitstring_position = fen2bitstring(position)
            positions.append(bitstring_position)

        board.push(move)

        no_capture_posis = len(positions)
    indices = list(range(5, no_capture_posis))
    shuffle(indices)
    selected_indices = indices[:15]

    random_positions_array = [positions[index] for index in selected_indices]

    return random_positions_array

In [None]:
pgn_file = open("CCRLdb.pgn")

random_positions_white_win_bitstring = []
random_positions_black_win_bitstring = []

i = 0
while i < (500000):
    game = chess.pgn.read_game(pgn_file)
    if game.headers["Result"] == "1-0":
        temp1 = randPosiGeneratorFromGame(game)
        random_positions_white_win_bitstring += temp1
    elif game.headers["Result"] == "0-1":
        temp2 = randPosiGeneratorFromGame(game)
        random_positions_black_win_bitstring += temp2
    if game is None:
        break
    i = i + 1
    print(f"{i}\t")
pgn_file.close()

In [7]:
len(random_positions_white_win_bitstring)

2637225

In [8]:
min(len(random_positions_black_win_bitstring), len(random_positions_white_win_bitstring))

1944450

In [9]:
x = 0
y = min(len(random_positions_black_win_bitstring), len(random_positions_white_win_bitstring))//3
w = min(len(random_positions_black_win_bitstring), len(random_positions_white_win_bitstring))//3
t = min(len(random_positions_black_win_bitstring), len(random_positions_white_win_bitstring))

In [10]:
white_win_bitstring_tensor = tf.convert_to_tensor(random_positions_white_win_bitstring[x:y], dtype=tf.float32)

black_win_bitstring_tensor = tf.convert_to_tensor(random_positions_black_win_bitstring[x:y], dtype=tf.float32)


In [11]:
white_win_dataset = tf.data.Dataset.from_tensor_slices(white_win_bitstring_tensor)
black_win_dataset = tf.data.Dataset.from_tensor_slices(black_win_bitstring_tensor)

In [12]:
concatenated_dataset = white_win_dataset.concatenate(black_win_dataset)

In [13]:
num_samples = len(concatenated_dataset)
train_dataset = concatenated_dataset.shuffle(num_samples)
print('Train Dataset Size:', len(train_dataset))

Train Dataset Size: 1296300


In [14]:
train_data = np.array(list(train_dataset.as_numpy_iterator()))

In [15]:
def scheduler1(epoch, lr):
    return lr * 0.98

# Create the learning rate scheduler callback
lr_scheduler = callbacks.LearningRateScheduler(scheduler1)


In [16]:
optimiser1=tf.keras.optimizers.legacy.Adam(lr = 0.005)

  super().__init__(name, **kwargs)


In [17]:
def train_autoencoders(train_data):  #[[e1, d1], [c1], [e2, d2], [c2], [e3, d3], [c3], [e4, d4], [o]]
    weights = []
    # Layer 1: 773-600-773 Autoencoder

    input1 = layers.Input(shape=(773,))
    dropout1 = layers.Dropout(0.25)(input1)
    encoder1 = layers.Dense(600, activation='relu')(dropout1)
    decoder1 = layers.Dense(773, activation='relu')(encoder1)
    layer1 = tf.keras.Model(inputs=input1, outputs=decoder1)
    layer1.compile(optimizer=optimiser1, loss = losses.MeanSquaredError())
    K.set_value(layer1.optimizer.learning_rate, 0.005)
    layer1.fit(train_data, train_data, epochs=100, batch_size = 5000, callbacks=[lr_scheduler])
    weights.append(layer1.layers[2].get_weights())
    
    
    # Layer 2: 600-400-600 Autoencoder
    
    input2 = layers.Input(shape=(773,))
    dropout2 = layers.Dropout(0.25)(input2)
    l1_2 = layers.Dense(600, activation = 'relu')(dropout2)
    encoder2 = layers.Dense(400, activation='relu')(l1_2)
    decoder2 = layers.Dense(600, activation='relu')(encoder2)
    decoder2_1 = layers.Dense(773, activation = 'relu')(decoder2)
    layer2 = tf.keras.Model(inputs=input2, outputs=decoder2_1)
    layer2.layers[2].set_weights(weights[0])
    layer2.compile(optimizer=optimiser1, loss = losses.MeanSquaredError())
    K.set_value(layer2.optimizer.learning_rate, 0.005)
    layer2.fit(train_data, train_data, epochs=100, batch_size = 5000, callbacks=[lr_scheduler])
    weights.append(layer2.layers[3].get_weights())
    

    # Layer 3: 400-200-400 Autoencoder

    input3 = layers.Input(shape=(773,))
    dropout3 = layers.Dropout(0.25)(input3)
    l1_3 = layers.Dense(600, activation = 'relu')(dropout3)
    l2_3 = layers.Dense(400, activation = 'relu')(l1_3)
    encoder3 = layers.Dense(200, activation='relu')(l2_3)
    decoder3 = layers.Dense(400, activation='relu')(encoder3)
    decoder3_1 = layers.Dense(600, activation = 'relu')(decoder3)
    decoder3_2 = layers.Dense(773, activation = 'relu')(decoder3_1)
    layer3 = tf.keras.Model(inputs=input3, outputs=decoder3_2)
    layer3.layers[2].set_weights(weights[0])
    layer3.layers[3].set_weights(weights[1])
    layer3.compile(optimizer=optimiser1, loss = losses.MeanSquaredError())
    K.set_value(layer3.optimizer.learning_rate, 0.005)
    layer3.fit(train_data, train_data, epochs=100, batch_size = 5000, callbacks=[lr_scheduler])
    weights.append(layer3.layers[4].get_weights())
    

    # Layer 4: 200-100-200 Autoencoder

    input4 = layers.Input(shape=(773,))
    dropout4 = layers.Dropout(0.25)(input4)
    l1_4  =layers.Dense(600, activation = 'relu')(dropout4)
    l2_4 = layers.Dense(400, activation = 'relu')(l1_4)
    l3_4 = layers.Dense(200, activation = 'relu')(l2_4)
    encoder4 = layers.Dense(100, activation='relu')(l3_4)
    decoder4 = layers.Dense(200, activation='relu')(encoder4)
    decoder4_1 = layers.Dense(400, activation = 'relu')(decoder4)
    decoder4_2 = layers.Dense(600, activation = 'relu')(decoder4_1)
    decoder4_3 = layers.Dense(773, activation = 'relu')(decoder4_2)
    layer4 = tf.keras.Model(inputs=input4, outputs=decoder4_3)
    layer4.layers[2].set_weights(weights[0])
    layer4.layers[3].set_weights(weights[1])
    layer4.layers[4].set_weights(weights[2])
    layer4.compile(optimizer=optimiser1, loss = losses.MeanSquaredError())
    K.set_value(layer4.optimizer.learning_rate, 0.005)
    layer4.fit(train_data, train_data, epochs=100, batch_size = 5000, callbacks=[lr_scheduler])
    weights.append(layer4.layers[5].get_weights())
    
    return weights

In [18]:
pos2vec_weights = train_autoencoders(train_data)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [19]:
weights_encoder1 = pos2vec_weights[0]
weights_encoder2 = pos2vec_weights[1]
weights_encoder3 = pos2vec_weights[2]
weights_encoder4 = pos2vec_weights[3]

In [20]:
# Weights are taken after training on pos2vec first
def create_pos2vec_model():
    input_layer = layers.Input(shape = (773,))
    dropouti = layers.Dropout(0.25)(input_layer)
    x1 = layers.Dense(600, activation='relu')(dropouti)
    dropout1 = layers.Dropout(0.25)(x1)
    x2 = layers.Dense(400, activation='relu')(dropout1)
    dropout2 = layers.Dropout(0.25)(x2)
    x3 = layers.Dense(200, activation='relu')(dropout2)
    dropout3 = layers.Dropout(0.25)(x3)
    x4 = layers.Dense(100, activation='relu')(dropout3)
    
    dbn_model = tf.keras.Model(inputs=input_layer, outputs=x4)
    
    # Set the weights for the DBN layers
    dbn_model.layers[2].set_weights(weights_encoder1)  # First encoder layer
    dbn_model.layers[4].set_weights(weights_encoder2)  # Second encoder layer
    dbn_model.layers[6].set_weights(weights_encoder3)  # Third encoder layer
    dbn_model.layers[8].set_weights(weights_encoder4)  # Fourth encoder layer
    #dbn_model.trainable = False

    return dbn_model

In [21]:
def create_siamese_network():
    dbn = create_pos2vec_model()
    
    input1 = layers.Input(shape = (773,))
    input2 = layers.Input(shape = (773,))
    
    features1 = dbn(input1)
    features2 = dbn(input2)
    
    concatenated_features = layers.Concatenate()([features1, features2])
    
    fc1 = layers.Dense(400, activation = 'relu')(concatenated_features)
    dropout1 = layers.Dropout(0.25)(fc1)
    fc2 = layers.Dense(200, activation = 'relu')(dropout1)
    dropout2 = layers.Dropout(0.25)(fc2)
    fc3 = layers.Dense(100, activation = 'relu')(dropout2)
    #fc4 = layers.Dense(50, activation = 'relu')(fc3)
    #fc5 = layers.Dense(25, activation = 'relu')(fc4)
    #fc6 = layers.Dense(12, activation = 'relu')(fc5)
    output_layer = layers.Dense(2, activation = 'softmax')(fc3)
    
    siamese_model = tf.keras.Model(inputs=[input1, input2], outputs=output_layer)
    
    return siamese_model

In [22]:
untrained_siamese_model = create_siamese_network()
untrained_siamese_model.save("siamese_untrained_500000.h5")



  saving_api.save_model(


In [23]:
win = [1]
white_win_labels = []
for i in range(len(random_positions_white_win_bitstring)):
    white_win_labels.append(win)
loss = [0]
black_win_labels = []
for i in range(len(random_positions_black_win_bitstring)):
    black_win_labels.append(loss)

In [24]:
white_win_bitstring_tensor = tf.convert_to_tensor(random_positions_white_win_bitstring[w:t], dtype=tf.float32)

white_win_labels_tensor = tf.convert_to_tensor(white_win_labels[w:t], dtype=tf.int32)

black_win_bitstring_tensor = tf.convert_to_tensor(random_positions_black_win_bitstring[w:t], dtype=tf.float32)

black_win_labels_tensor = tf.convert_to_tensor(black_win_labels[w:t], dtype=tf.int32)

In [25]:
white_win_dataset = tf.data.Dataset.from_tensor_slices(white_win_bitstring_tensor)
black_win_dataset = tf.data.Dataset.from_tensor_slices(black_win_bitstring_tensor)

white_win_labels_dataset = tf.data.Dataset.from_tensor_slices(white_win_labels_tensor)
black_win_labels_dataset = tf.data.Dataset.from_tensor_slices(black_win_labels_tensor)

In [26]:
paired_dataset = tf.data.Dataset.zip((white_win_dataset, black_win_dataset))
paired_labels_dataset = tf.data.Dataset.zip((white_win_labels_dataset, black_win_labels_dataset))

In [27]:
len(paired_dataset)

1296300

In [28]:
from sklearn.model_selection import train_test_split

# Convert ZipDataset to numpy arrays
paired_dataset_array = np.array(list(paired_dataset.as_numpy_iterator()))
paired_labels_dataset_array = np.array(list(paired_labels_dataset.as_numpy_iterator()))
# Split ratio for train-test split
split_ratio = 0.8

# Split the paired dataset and labels dataset into train and test
train_dataset, test_dataset, train_labels_dataset, test_labels_dataset = train_test_split(
    paired_dataset_array, paired_labels_dataset_array, test_size=1-split_ratio)

# Print the sizes of train and test datasets
print('Train Dataset Size:', len(train_dataset))
print('Train Labels Dataset Size:', len(train_labels_dataset))
print('Test Dataset Size:', len(test_dataset))
print('Test Labels Dataset Size:', len(test_labels_dataset))

Train Dataset Size: 1037040
Train Labels Dataset Size: 1037040
Test Dataset Size: 259260
Test Labels Dataset Size: 259260


In [29]:
for i in range(len(train_dataset)//2):
    train_dataset[2*i][0], train_dataset[2*i][1] = train_dataset[2*i][1].copy(), train_dataset[2*i][0].copy()
    train_labels_dataset[2*i][0], train_labels_dataset[2*i][1] = train_labels_dataset[2*i][1].copy(), train_labels_dataset[2*i][0].copy()

for i in range(len(test_dataset) // 2):
    test_dataset[2*i][0], test_dataset[2*i][1] = test_dataset[2*i][1].copy(), test_dataset[2*i][0].copy()
    test_labels_dataset[2*i][0], test_labels_dataset[2*i][1] = test_labels_dataset[2*i][1].copy(), test_labels_dataset[2*i][0].copy()

In [30]:
def scheduler2(epoch, lr):
    return lr * 0.99

# Create the learning rate scheduler callback
lr_scheduler = callbacks.LearningRateScheduler(scheduler2)

In [31]:
optimiser2 = tf.keras.optimizers.legacy.Adam(lr=0.01)


  super().__init__(name, **kwargs)


In [32]:
siamese_model = create_siamese_network()
# Extract inputs from the paired_dataset_array
paired_dataset_input1 = train_dataset[:, 0]
paired_dataset_input2 = train_dataset[:, 1]

# Create and compile the Siamese model
siamese_model.compile(optimizer=optimiser2, loss=tf.keras.losses.CategoricalCrossentropy(), metrics=['categorical_accuracy'])
K.set_value(siamese_model.optimizer.learning_rate, 0.01)
siamese_model.fit([paired_dataset_input1, paired_dataset_input2], train_labels_dataset, epochs=500, batch_size=5000, callbacks = [lr_scheduler], validation_data=([test_dataset[:, 0], test_dataset[:, 1]], test_labels_dataset), validation_batch_size=500, validation_freq=10)

Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Epoch 77/500
Epoch 78

<keras.src.callbacks.History at 0xbf80d3890>

In [33]:
# Evaluate the Siamese model on the test dataset
test_loss, test_accuracy = siamese_model.evaluate([test_dataset[:, 0], test_dataset[:, 1]], test_labels_dataset)

# Print the accuracy
print('Test Accuracy:', test_accuracy*100)

Test Accuracy: 78.8671612739563


In [34]:
# Evaluate the Siamese model on the test dataset
test_loss, test_accuracy = siamese_model.evaluate([paired_dataset_input1, paired_dataset_input2], train_labels_dataset)

# Print the accuracy
print('Test Accuracy:', test_accuracy*100)

Test Accuracy: 87.07696795463562


In [35]:
def preprocess_for_siamese_input(fen1, fen2):
    bitstring1 = np.array([fen2bitstring(fen1)])
    bitstring2 = np.array([fen2bitstring(fen2)])
    siamese_input = [bitstring1, bitstring2]
    return siamese_input

In [36]:
fen1 = "3r3k/p4pq1/7Q/4pB2/7P/2N5/PPP2P2/6RK b - - 0 34"  #position with queen blunder
fen2 = "3r3k/p4pQ1/7p/4pB2/7P/2N5/PPP2P2/6RK b - - 0 34"   #best move acc. to stockfish

fin1 = preprocess_for_siamese_input(fen1, fen2)
x = siamese_model.predict(fin1)
x



array([[0.53256875, 0.46743116]], dtype=float32)

In [37]:
fin2 = preprocess_for_siamese_input(fen2, fen1)
y = siamese_model.predict(fin2)
y



array([[0.46378598, 0.53621405]], dtype=float32)

In [38]:
siamese_model.save("siamese_final_500000_2.h5")

  saving_api.save_model(
