In [1]:
pip install chess
pip install scikit-learn
pip install tensorflow
pip install python-chess

In [None]:
import torch
import re
import chess.pgn
import numpy as np
from tensorflow import python
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import math  # Import the math module for NaN checking

import torch.nn as nn
import torch.optim as optim

is_mps_available = torch.backends.mps.is_available()
device = torch.device("mps" if is_mps_available else "cpu")
print("Using device:", device)


In [None]:
# raw source pgn file
data_file = 'aug2023_data_2gb.pgn'

#data_file = '500mb_data.pgn'

# pgn after extracting k games that stockfish evals >= 2000
destination_pgn = 'k_games_with_eval.pgn'

In [None]:
# Open the original PGN file to read from
with open(data_file, 'r') as file:
    content = file.read()

# Regular expression to match individual games
games = re.split(r'\n\n(?=\[Event)', content)

# Function to check if a player in the game has ELO rating >= 1600
def has_high_elo(game_text):
    #white_elo = int(re.search(r'\[WhiteElo "(\d+)"\]', game_text).group(1))
    #black_elo = int(re.search(r'\[BlackElo "(\d+)"\]', game_text).group(1))
    return True

# Function to check if a game has evaluation annotations
def has_eval_annotations(game_text):
    return bool(re.search(r'\{\s*\[%eval [^\}]*\]\s*\}', game_text))

# Filter games that have eval annotations and at least one player with ELO >= 1600
eval_games = [game for game in games if has_eval_annotations(game) and has_high_elo(game)]

# Print the number of games with evaluations
print(f"Extracted {len(eval_games)} games with evaluation annotations.")

i = 0
k = 1000

with open(destination_pgn, 'w') as file:
  for game in eval_games:
      i = i + 1
      file.write(game + "\n")
      if i >= k:
        break

print(f"Saved {i} games.")

In [None]:
pgn_file_path = destination_pgn

In [None]:
# add empty lines between games in pgn file to fix extraction issues

def add_empty_line_between_games(pgn_file_path):
    # Read the entire PGN file
    with open(pgn_file_path, 'r') as file:
        pgn_content = file.read()

    # Replace occurrences of "1-0", "0-1", or "1/2-1/2" followed by "[Event" with the same string plus an empty line
    pgn_content = pgn_content.replace("1-0\n[Event", "1-0\n\n[Event")
    pgn_content = pgn_content.replace("0-1\n[Event", "0-1\n\n[Event")
    pgn_content = pgn_content.replace("1/2-1/2\n[Event", "1/2-1/2\n\n[Event")

    # Write the modified content back to the file
    with open(pgn_file_path, 'w') as file:
        file.write(pgn_content)


add_empty_line_between_games(pgn_file_path)

In [None]:
# write FEN and evals to .csv file
def write_fens_and_evals_directly_to_file(pgn_file_path, output_file_path):
    game_num = 0

    with open(pgn_file_path) as pgn, open(output_file_path, 'w') as output_file:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break  # No more games in the PGN file

            game_num += 1
            board = game.board()

            for node in game.mainline():
                move = node.move
                board.push(move)
                fen = board.fen()

                # Extract evaluation from the comment if it exists
                eval_comment = node.comment
                eval_score = None
                if "[%eval" in eval_comment:
                    try:
                        eval_score = eval_comment.split("[%eval ")[1].split("]")[0].strip()
                        if not eval_score.startswith("#"):
                            eval_score = float(eval_score)
                    except (IndexError, ValueError):
                        eval_score = None

                # Write the FEN and evaluation to the file
                output_file.write(f"{fen},{eval_score}\n")

# read FEN and evals from .csv file
def read_fens_and_evals_from_file(file_path):
    all_fens_and_evals = []
    with open(file_path, 'r') as file:
        for line in file:
            fen, eval_str = line.strip().split(',')
            eval_score = float(eval_str) if eval_str.replace('.', '', 1).isdigit() else None
            all_fens_and_evals.append((fen, eval_score))
    return all_fens_and_evals

In [None]:
# write FEN and evals per move for each game to a .csv file

output_file_path = 'output_file.csv'

#write_fens_and_evals_directly_to_file(pgn_file_path, output_file_path)

In [None]:
# Call the function to read data from the file

all_fens_and_evals_extracted = read_fens_and_evals_from_file(output_file_path)
print(len(all_fens_and_evals_extracted))

In [None]:
### Function to convert FENs to one hot encoding

# Define the order of pieces for one-hot encoding
piece_order = 'PNBRQKpnbrqk'
piece_to_index = {piece: index for index, piece in enumerate(piece_order)}

def fen_to_one_hot(fen):
    # Split the FEN string into its components
    parts = fen.split(' ')
    rows = parts[0].split('/')

    # Initialize the board to be 8x8x12 for one-hot encoded pieces
    one_hot_board = np.zeros((8, 8, len(piece_order)), dtype=np.float32)

    for row_index, row in enumerate(rows):
        col_index = 0
        for char in row:
            if char.isdigit():
                # Empty squares are skipped
                col_index += int(char)
            else:
                # Set the one-hot encoding for the piece at the appropriate location
                one_hot_board[row_index, col_index, piece_to_index[char]] = 1
                col_index += 1

    # Flatten the one-hot encoded board to a single vector
    flat_board = one_hot_board.flatten()

    # Encode the active color ('w' -> 1, 'b' -> 0)
    active_color = 1 if parts[1] == 'w' else 0

    # Encode castling availability
    castling = [parts[2].count('K'), parts[2].count('Q'), parts[2].count('k'), parts[2].count('q')]

    # Encode the en passant target square
    en_passant = np.zeros((8, 8), dtype=np.float32)
    if parts[3] != '-':
        col = ord(parts[3][0]) - ord('a')
        row = 8 - int(parts[3][1])
        en_passant[row, col] = 1

    # Flatten the en passant board to a single vector
    flat_en_passant = en_passant.flatten()

    # Encode the halfmove clock and fullmove number
    halfmove_clock = int(parts[4])
    fullmove_number = int(parts[5])

    # Combine all parts into a single input vector
    input_vector = np.concatenate([flat_board, [active_color], castling, flat_en_passant, [halfmove_clock, fullmove_number]])

    return input_vector


In [1]:
# Convert all FENs to one hot encoding, store one-hot and evals in parallel lists to be fed into model

one_hot_board_representations = []
stockfish_evaluations = []

# Iterate over the first 200 tuples in all_fens_and_evals
for i in range(len(all_fens_and_evals_extracted)):
    # Extract the FEN and eval from the current tuple
    fen, eval = all_fens_and_evals_extracted[i]

    # Convert FEN to one-hot representation
    one_hot = fen_to_one_hot(fen)

    # Get evaluation from Stockfish
    #board = chess.Board(fen)
    #info = engine.analyse(board, chess.engine.Limit(time=0.1))

    # Check if the evaluation is not None and is a valid number (not NaN)
    #evaluation = info["score"].relative.score()
    evaluation = eval

    #print(evaluation)

    if str(evaluation)[0] == '#':
      if str(evaluation[1] == '-'):
        evaluation = -15
      else:
        evaluation = 15

    if evaluation:
      if evaluation > 15:
        evaluation = 15
      elif evaluation < -15:
        evaluation = -15

    if evaluation is not None and not math.isnan(evaluation):
        # Add the one-hot representation and evaluation
        one_hot_board_representations.append(one_hot)
        stockfish_evaluations.append(evaluation)

In [None]:
# Create dummy data for testing
num_samples = 100  # Set the number of samples
#X = np.random.randint(2, size=(num_samples, 839))  # 839 features (one-hot encoded)
#y = np.random.randn(num_samples)  # Corresponding evaluations

# Assuming you have a dataset with input arrays and corresponding evaluations
# Replace this with your actual data
X = one_hot_board_representations  # Sample input data as a NumPy array
y = stockfish_evaluations          # Sample evaluation data as a NumPy array

device = torch.device("mps" if is_mps_available else "cpu")
is_mps_available = torch.backends.mps.is_available()
print("Is Apple GPU available:", is_mps_available)


# Convert your data to NumPy arrays
X = np.array(X)
y = np.array(y, dtype=np.float32)

# Split the data into training and validation sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define the enhanced neural network model with more layers
class ANNModel(nn.Module):
    def __init__(self, input_size):
        super(ANNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.dropout2 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(256, 128)
        self.bn3 = nn.BatchNorm1d(128)
        self.dropout3 = nn.Dropout(0.5)
        self.fc4 = nn.Linear(128, 64)
        self.bn4 = nn.BatchNorm1d(64)
        self.dropout4 = nn.Dropout(0.5)
        self.fc5 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.bn1(x)
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.bn2(x)
        x = self.dropout2(x)
        x = torch.relu(self.fc3(x))
        x = self.bn3(x)
        x = self.dropout3(x)
        x = torch.relu(self.fc4(x))
        x = self.bn4(x)
        x = self.dropout4(x)
        x = self.fc5(x)
        return x

# Check for GPU availability and use it if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ANNModel(839).to(device)

# Function to train the PyTorch model
def train_model(model, train_input, train_target, val_input, val_target, num_epochs=10, learning_rate=0.005, batch_size=32):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train_size = len(train_input)

    # Lists for storing loss values
    training_losses = []
    validation_losses = []

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")

        # Training Phase
        model.train()
        train_loss_accum = 0

        for batch_start in range(0, train_size, batch_size):
            batch_end = min(batch_start + batch_size, train_size)
            train_inputs = torch.from_numpy(train_input[batch_start:batch_end]).float().to(device)
            train_targets = torch.from_numpy(train_target[batch_start:batch_end]).float().view(-1, 1).to(device)

            optimizer.zero_grad()
            train_outputs = model(train_inputs)
            train_loss = criterion(train_outputs, train_targets)
            train_loss.backward()
            optimizer.step()
            train_loss_accum += train_loss.item()

        avg_train_loss = train_loss_accum / (train_size // batch_size)
        training_losses.append(avg_train_loss)

        # Validation Phase
        model.eval()
        with torch.no_grad():
            val_inputs = torch.from_numpy(val_input).float().to(device)
            val_targets = torch.from_numpy(val_target).float().view(-1, 1).to(device)
            val_outputs = model(val_inputs)
            val_loss = criterion(val_outputs, val_targets)
            validation_losses.append(val_loss.item())

        print(f'Training Loss: {avg_train_loss}  Validation Loss: {val_loss.item()}')

    # Plot the training and validation loss
    plt.plot(training_losses, label='Training Loss')
    plt.plot(validation_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.show()

# Train the enhanced PyTorch model and plot losses
train_model(model, X_train, y_train, X_test, y_test, batch_size=32)

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Function to evaluate the model on test data
def evaluate_model(model, test_input, test_target):
    model.eval()
    with torch.no_grad():
        test_inputs = torch.from_numpy(test_input).float().to(device)
        test_targets = torch.from_numpy(test_target).float().view(-1, 1).to(device)
        predictions = model(test_inputs)
        predictions = predictions.cpu().numpy()

    mse = mean_squared_error(test_target, predictions)
    mae = mean_absolute_error(test_target, predictions)
    r2 = r2_score(test_target, predictions)

    print(f"Test MSE: {mse}")
    print(f"Test MAE: {mae}")