# Machine Learning Chess Model
## Authors: Remington Ward, Eric Lykins

This file trains and creates a machine learning model to play chess. The model is a convolutional nueral network using nd sklearn. 
This file handles reading in the data, cleaning the data, training the nueral network and saving it to a file.


 ### -- Resources and references --
 Scikit-learn library
 https://medium.com/@waleedmousa975/a-step-by-step-guide-to-developing-a-chess-game-with-an-ai-opponent-using-python-e06374fcc04a

 Keep models persistent so we do not need to retrain it every run
 https://scikit-learn.org/stable/model_persistence.html

 Building a learning model with Scikit-learn
 https://www.geeksforgeeks.org/learning-model-building-scikit-learn-python-machine-learning-library/



 ### -- DATASET USED--
 chess_games.csv
 Chess Game data set
 https://www.kaggle.com/datasets/arevel/chess-games


# Download the data and load it into a dataframe

In [1]:
import os
import gdown

# Helper to download files from google drive
# Credit to this post: https://stackoverflow.com/questions/38511444/python-download-files-from-google-drive-using-url
def download_from_drive(file_id, destination):
    """Downloads the google drive file with file_id into the destination"""
    url = f'https://drive.google.com/uc?id={file_id}'
    gdown.download(url, destination, quiet=True)

if (not os.path.exists("datasets/")):
    os.mkdir("datasets/")

CHESS_GAMES_DRIVE_ID = "1u9fpjNzZjTtByCfBlM_JV4f1HZmA8LZu"
"""Google drive file id to download the data"""
CHESS_GAMES_DATA_PATH = "datasets/chess_games.csv"
"""Dataset from: https://www.kaggle.com/datasets/arevel/chess-games"""
# Download if the file doesn't exist
if not os.path.exists(CHESS_GAMES_DATA_PATH):
    download_from_drive(CHESS_GAMES_DRIVE_ID, CHESS_GAMES_DATA_PATH)

In [2]:
# Load in chess games dataset
import pandas as pd
data = pd.read_csv(CHESS_GAMES_DATA_PATH)

# Clean the data and remove unnecessary columns

In [None]:
# Clean up dataset
# - Remove unnecessary columns
# - Get rid of incomplete data
dataNoTF = data[data.Termination!="Time forfeit"]
dataOnlyGoodColumns = dataNoTF.drop(['Event', 'White', 'Black', 'UTCDate', 'UTCTime'], axis=1)

dataOnlyGoodColumns['LowestElo'] = dataOnlyGoodColumns[['WhiteElo', 'BlackElo']].min(axis=1)
sortedLElo = dataOnlyGoodColumns.sort_values(by='LowestElo')
sortedLElo = sortedLElo[-100000:]


# Add information on Board positions to each move

In [None]:
# How much of the data we will use
NUMBER_OF_INPUT_GAMES = 2000
testData = sortedLElo.head(NUMBER_OF_INPUT_GAMES)

In [None]:
# Generate data on a per move basis
import chess
import chess.engine
import os 
import numpy as np

print(os.getcwd())
""
PATH = "../lib/stockfish/stockfish.exe"
board = chess.Board()
engine = chess.engine.SimpleEngine.popen_uci(PATH)
em = True


def board_to_array(board):
    # Create an 8x8 array where each cell contains a unique value for each piece type and color
    piece_values = {
        'P': 1, 'N': 2, 'B': 3, 'R': 4, 'Q': 5, 'K': 6,  # White pieces
        'p': 7, 'n': 8, 'b': 9, 'r': 10, 'q': 11, 'k': 12 # Black pieces
    }
    # Initialize an empty board with zeros
    array = np.zeros((8, 8))
    for i in range(8):
        for j in range(8):
            piece = board.piece_at(chess.square(i, j))
            if piece:
                array[i, j] = piece_values.get(piece.symbol(), 0)
    return array


movesByGame = testData
rows_list = []
columns = ['Moves', 'BoardState']
def get_data_from_board(board):
    return {
        'Moves': f"{chess.square_name(move_obj.from_square)}{chess.square_name(move_obj.to_square)}",
        'BoardState': board_to_array(board),
    }
df = pd.DataFrame(columns=columns)
    
# For each game play the game on a board and record the positons
for index, game in movesByGame.iterrows():
    board.reset()
    # Split the moves and filter out move numbers and annotations
    moves = game['AN'].replace('.', ' ').split()
    # Remove digits and filter out annotations
    clean_moves = []
    if moves[-1] == game['Result']:
        moves.pop()
    for move in moves:
        if move.isdigit(): #or move.startswith('{') or move.startswith('[') or move.endswith(']') or move.endswith('}'):
            continue
        clean_moves.append(move)

    game_result = game['Result']
    is_white_win = game_result == '1-0'
    is_black_win = game_result == '0-1'

    for move_index, move in enumerate(clean_moves):
        try:
            move_obj = board.parse_san(move)
            if move_obj in board.legal_moves:
                #print(board)
                board.push(move_obj)
                rows_list.append(get_data_from_board(board))
            else:
                print(f"Illegal move found in game {index}: {move}")
                break
        except ValueError:
            break
df = pd.DataFrame(rows_list)


c:\Programming\Projects\Python\Chess2\sp2024_cs4200_finalproject_g02\code\machine_learning


# Encode the data in a way the model will understand

In [None]:
# Create an indexer to map moves to integers and back
# Moves are represented by the start square and end square only
from bidict import bidict
moves_indexer = bidict({
    f"{chess.square_name(s1)}{chess.square_name(s2)}" : s1*100+s2 for s1 in chess.SQUARES for s2 in chess.SQUARES
})
print(moves_indexer)
df['Moves'] = df['Moves'].map(moves_indexer)
display(df.head(5))

# Split the data into training and testing sets

In [None]:
from sklearn.model_selection import train_test_split

def SplitData(df, test_split=0.3):
    # Split the data into training and testing sets
    train_df, test_df = train_test_split(df, test_size=test_split, random_state=42)
    
    # Optionally, return the split data
    return train_df, test_df

def SplitDataLogReg(df, test_data=0.3):
    X = df.drop('Moves', axis=1)
    y = df['Moves']

    return train_test_split(X, y, test_size=0.3, random_state=42)
    

In [None]:
from sklearn.preprocessing import LabelEncoder
from scipy.sparse import csr_matrix, hstack, vstack

print("Splitting data...")
train_data, test_data = SplitData(df)
X_train, X_test, y_train, y_test = SplitDataLogReg(df)
print("Splitting Complete!")
display(test_data)

Unnamed: 0,Moves,BoardState
0,e2e4,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."
1,e7e6,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."
2,d2d4,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."
3,d7d5,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."
4,e4d5,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."


bidict({'a1a1': 0, 'a1b1': 1, 'a1c1': 2, 'a1d1': 3, 'a1e1': 4, 'a1f1': 5, 'a1g1': 6, 'a1h1': 7, 'a1a2': 8, 'a1b2': 9, 'a1c2': 10, 'a1d2': 11, 'a1e2': 12, 'a1f2': 13, 'a1g2': 14, 'a1h2': 15, 'a1a3': 16, 'a1b3': 17, 'a1c3': 18, 'a1d3': 19, 'a1e3': 20, 'a1f3': 21, 'a1g3': 22, 'a1h3': 23, 'a1a4': 24, 'a1b4': 25, 'a1c4': 26, 'a1d4': 27, 'a1e4': 28, 'a1f4': 29, 'a1g4': 30, 'a1h4': 31, 'a1a5': 32, 'a1b5': 33, 'a1c5': 34, 'a1d5': 35, 'a1e5': 36, 'a1f5': 37, 'a1g5': 38, 'a1h5': 39, 'a1a6': 40, 'a1b6': 41, 'a1c6': 42, 'a1d6': 43, 'a1e6': 44, 'a1f6': 45, 'a1g6': 46, 'a1h6': 47, 'a1a7': 48, 'a1b7': 49, 'a1c7': 50, 'a1d7': 51, 'a1e7': 52, 'a1f7': 53, 'a1g7': 54, 'a1h7': 55, 'a1a8': 56, 'a1b8': 57, 'a1c8': 58, 'a1d8': 59, 'a1e8': 60, 'a1f8': 61, 'a1g8': 62, 'a1h8': 63, 'b1a1': 100, 'b1b1': 101, 'b1c1': 102, 'b1d1': 103, 'b1e1': 104, 'b1f1': 105, 'b1g1': 106, 'b1h1': 107, 'b1a2': 108, 'b1b2': 109, 'b1c2': 110, 'b1d2': 111, 'b1e2': 112, 'b1f2': 113, 'b1g2': 114, 'b1h2': 115, 'b1a3': 116, 'b1b3': 117, 

Unnamed: 0,Moves,BoardState
29178,502,"[[0.0, 0.0, 0.0, 0.0, 7.0, 0.0, 0.0, 0.0], [4...."
28500,5446,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."
22878,2549,"[[0.0, 1.0, 0.0, 2.0, 7.0, 0.0, 0.0, 0.0], [0...."
9320,1229,"[[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [0..."
18329,5958,"[[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [6..."
...,...,...
22308,2230,"[[0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0], [0...."
25072,526,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 10.0], [2..."
28180,1812,"[[4.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 0.0], [0...."
28770,3736,"[[0.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0, 0.0], [0...."


# Save the data to csv files

In [None]:
print("Writing data to .csv files...")
X_train.to_csv('data/X_train.csv', index=False)
X_test.to_csv('data/X_test.csv', index=False)
y_train.to_csv('data/y_train.csv', index=False)
y_test.to_csv('data/y_test.csv', index=False)
train_data.to_csv('data/train_data.csv', index=False)
test_data.to_csv('data/test_data.csv', index=False)
print("Done writing")

# Load Data from csv files

In [None]:
# Load each dataset from a CSV file into a DataFrame
import pandas as pd
X_train = pd.read_csv('data/X_train.csv') 
X_test = pd.read_csv('data/X_test.csv')
y_train = pd.read_csv('data/y_train.csv')
y_test = pd.read_csv('data/y_test.csv')
train_data = pd.read_csv('data/train_data.csv')
test_data = pd.read_csv('data/test_data.csv')

print(test_data)

      Moves                                         BoardState
0       502  [[ 0.  0.  0.  0.  7.  0.  0.  0.]\n [ 4.  0. ...
1      5446  [[ 4.  1.  0.  0.  0.  0.  7. 10.]\n [ 2.  1. ...
2      2549  [[ 0.  1.  0.  2.  7.  0.  0.  0.]\n [ 0.  0. ...
3      1229  [[ 0.  1.  0.  0.  0.  0.  7. 10.]\n [ 0.  0. ...
4      5958  [[ 0.  1.  0.  0.  0.  0.  7. 10.]\n [ 6.  1. ...
...     ...                                                ...
8966   2230  [[ 0.  0.  4.  0.  0.  0.  0.  0.]\n [ 0.  0. ...
8967    526  [[ 4.  1.  0.  0.  0.  0.  7. 10.]\n [ 2.  1. ...
8968   1812  [[ 4.  1.  0.  0.  0.  0.  7.  0.]\n [ 0.  1. ...
8969   3736  [[ 0.  1.  0.  0.  0.  7.  0.  0.]\n [ 0.  1. ...
8970   5850  [[ 0.  1.  0.  0.  0.  0.  0.  0.]\n [ 0.  0. ...

[8971 rows x 2 columns]


# Resize data and check that it is in the correct format

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MaxAbsScaler

# Assume X_train and X_test are loaded as pandas DataFrame or numpy arrays

# Check data types and convert to float if necessary
if isinstance(X_train, pd.DataFrame):
    X_train = X_train.apply(pd.to_numeric, errors='coerce')  # Convert non-numeric to NaN
    X_test = X_test.apply(pd.to_numeric, errors='coerce')
elif isinstance(X_train, np.ndarray):
    X_train = X_train.astype(float)
    X_test = X_test.astype(float)

# Handle missing values
X_train = np.nan_to_num(X_train)  # Replace NaN with zero and inf with finite numbers
X_test = np.nan_to_num(X_test)

# Check and reshape data if it's a 1D array to 2D if necessary
if X_train.ndim == 1:
    X_train = X_train.reshape(-1, 1)
if X_test.ndim == 1:
    X_test = X_test.reshape(-1, 1)

# Initialize and apply MaxAbsScaler
scaler = MaxAbsScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

y_train = y_train.squeeze()  
y_test = y_test.squeeze()    


# Proceed with your model training/testing
print("Data is scaled and ready for model training/testing.")


Data is scaled and ready for model training/testing.


# Create and train the model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import MaxAbsScaler
import numpy as np
import time
import os

# Set environment variables to ensure CUDA operations are synchronized and visible
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


print("Shapes:")
print("X_train:", X_train.shape)
print("y_train:", y_train.shape)

# Load and preprocess data
scaler = MaxAbsScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Ensure X_train is a 2D numpy array
X_test_scaled = scaler.transform(X_test)

# Convert scaled data to torch tensors and reshape for CNN input
def convert_and_reshape(data, side_length):
    tensor = torch.tensor(data, dtype=torch.float32)
    return tensor.reshape(-1, 1, side_length, side_length)

side_length = int(np.sqrt(X_train_scaled.shape[1]))
X_train_torch = convert_and_reshape(X_train_scaled, side_length)
X_test_torch = convert_and_reshape(X_test_scaled, side_length)

# Convert labels to tensors
y_train_torch = torch.tensor(y_train.to_numpy(), dtype=torch.long)
y_test_torch = torch.tensor(y_test.to_numpy(), dtype=torch.long)

# Import the CNN Model
from cnn import CNN

model = CNN(side_length=side_length, num_channels=1, num_classes=y_train_torch.max().item() + 1).to(device)

# Setup training components
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("Shapes of all tensors:")
print("X_train_torch:", X_train_torch.shape)
print("y_train_torch:", y_train_torch.shape)

train_data = TensorDataset(X_train_torch, y_train_torch)
train_loader = DataLoader(train_data, batch_size=256, shuffle=True)

# Training loop
num_epochs = 3
for epoch in range(num_epochs):
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

    # Log progress
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}')




Shapes:
X_train: (20932, 1)
y_train: (20932,)
Shapes of all tensors:
X_train_torch: torch.Size([20932, 1, 1, 1])
y_train_torch: torch.Size([20932])
Epoch 1/3, Loss: 6.8400
Epoch 2/3, Loss: 6.7619
Epoch 3/3, Loss: 6.7710
Test Accuracy: 1.30%


# Test the model

In [None]:
# Evaluate the model
model.eval()
correct = total = 0
with torch.no_grad():
    for data, target in DataLoader(TensorDataset(X_test_torch, y_test_torch), batch_size=256):
        data, target = data.to(device), target.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")

# Top 5 moves from starting position

In [None]:
def get_top_moves(board_state, model, device, index_to_move):
    # Prepare the board state (assuming preprocessing is already done)
    board_state = torch.tensor(board_state).float()  # Convert to tensor
    board_state = board_state.to(device)  # Transfer to the correct device
    board_state = board_state.unsqueeze(0)  # Add batch dimension if single instance

    # Model prediction
    model.eval()
    with torch.no_grad():
        output = model(board_state)

    # Get top 5 moves
    _, top_moves = torch.topk(output, 5)
    top_moves = top_moves.cpu().numpy().tolist()[0]  # Convert to CPU and to list

    # Map indices to actual moves
    move_descriptions = [index_to_move(index) for index in top_moves]
    return move_descriptions

def index_to_move(index):
    try:
        return moves_indexer.inv[index]
    except KeyError:
        return f"INVALID MOVE {index}"


# Usage
board = chess.Board()
board_state = board_to_array(board)
top_moves = get_top_moves(board_state, model, device, index_to_move)
print("Top 5 Moves:", top_moves)


Top 5 Moves: ['g1f3', 'd2d4', 'b1c3', 'e2e4', 'e7e6']


# Save the model

In [None]:
# save the model
# Save only the state dictionary
# torch.save(model.state_dict(), 'saved_models/model_1.pth')
torch.save(model, 'saved_models/model_1.pth')