<a href="https://colab.research.google.com/github/hstananana/covid-dashboard-pkg/blob/master/chessbot4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install chess

Collecting chess
  Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess
Successfully installed chess-1.10.0


In [2]:
import chess
import numpy as np
import pandas as pd

class PIECE_INT:
    PAWN = 1
    KNIGHT = 2
    BISHOP = 3
    ROOK = 4
    QUEEN = 5
    KING = 6

def get_piece_int(piece):
    if piece == "P":
        return PIECE_INT.PAWN
    elif piece == "N":
        return PIECE_INT.KNIGHT
    elif piece == "B":
        return PIECE_INT.BISHOP
    elif piece == "R":
        return PIECE_INT.ROOK
    elif piece == "Q":
        return PIECE_INT.QUEEN
    elif piece == "K":
        return PIECE_INT.KING
    elif piece == "p":
        return -PIECE_INT.PAWN
    elif piece == "n":
        return -PIECE_INT.KNIGHT
    elif piece == "b":
        return -PIECE_INT.BISHOP
    elif piece == "r":
        return -PIECE_INT.ROOK
    elif piece == "q":
        return -PIECE_INT.QUEEN
    elif piece == "k":
        return -PIECE_INT.KING
    else:
        return 0
def convert_fen_to_dict(fen):
    board = chess.Board(fen)
    board_list = []
    for i in range(64):
        piece = board.piece_at(i)
        if piece is not None:
            board_list.append(get_piece_int(piece.symbol()))
        else:
            board_list.append(0)
    turn = board.turn
    if turn:
        board_list.append(1)
    else:
        board_list.append(-1)
    castling_rights = board.castling_rights
    if castling_rights & chess.BB_H1:
        board_list.append(1)
    else:
        board_list.append(0)
    if castling_rights & chess.BB_A1:
        board_list.append(1)
    else:
        board_list.append(0)
    if castling_rights & chess.BB_H8:
        board_list.append(1)
    else:
        board_list.append(0)
    if castling_rights & chess.BB_A8:
        board_list.append(1)
    else:
        board_list.append(0)
    en_passant = board.ep_square
    if en_passant is not None:
        board_list.append(en_passant)
    else:
        board_list.append(0)
    halfmove_clock = board.halfmove_clock
    board_list.append(halfmove_clock)
    fullmove_number = board.fullmove_number
    board_list.append(fullmove_number)
    return board_list

def convert_dict_to_fen(board_dict):
    board_string = ""
    for i in range(64):
        piece = board_dict[i]
        if piece == PIECE_INT.PAWN:
            board_string += "P"
        elif piece == PIECE_INT.KNIGHT:
            board_string += "N"
        elif piece == PIECE_INT.BISHOP:
            board_string += "B"
        elif piece == PIECE_INT.ROOK:
            board_string += "R"
        elif piece == PIECE_INT.QUEEN:
            board_string += "Q"
        elif piece == PIECE_INT.KING:
            board_string += "K"
        elif piece == -PIECE_INT.PAWN:
            board_string += "p"
        elif piece == -PIECE_INT.KNIGHT:
            board_string += "n"
        elif piece == -PIECE_INT.BISHOP:
            board_string += "b"
        elif piece == -PIECE_INT.ROOK:
            board_string += "r"
        elif piece == -PIECE_INT.QUEEN:
            board_string += "q"
        elif piece == -PIECE_INT.KING:
            board_string += "k"
        else:
            board_string += "1"
    board_string += " "
    if board_dict[64] == 1:
        board_string += "w"
    else:
        board_string += "b"
    board_string += " "
    if board_dict[65] == 1:
        board_string += "K"
    if board_dict[66] == 1:
        board_string += "Q"
    if board_dict[67] == 1:
        board_string += "k"
    if board_dict[68] == 1:
        board_string += "q"
    if board_dict[65] == 0 and board_dict[66] == 0 and board_dict[67] == 0 and board_dict[68] == 0:
        board_string += "-"
    board_string += " "
    if board_dict[69] != 0:
        board_string += chess.square_name(board_dict[69])
    else:
        board_string += "-"
    board_string += " "
    board_string += str(board_dict[70])
    board_string += " "
    board_string += str(board_dict[71])
    return board_string

def convert_fen_to_array(fen):
    board = chess.Board(fen)
    board_array = np.zeros((8,8))
    for i in range(64):
        piece = board.piece_at(i)
        if piece is not None:
            board_array[i//8][i%8] = get_piece_int(piece.symbol())
        else:
            board_array[i//8][i%8] = 0

    # create a new row for information about the board
    board_array = np.vstack((board_array, np.zeros((1,8))))
    turn = board.turn
    if turn:
        board_array[8][0] = 1
    else:
        board_array[8][0] = -1
    castling_rights = board.castling_rights
    if castling_rights & chess.BB_H1:
        board_array[8][1] = 1
    else:
        board_array[8][1] = 0
    if castling_rights & chess.BB_A1:
        board_array[8][2] = 1
    else:
        board_array[8][2] = 0
    if castling_rights & chess.BB_H8:
        board_array[8][3] = 1
    else:
        board_array[8][3] = 0
    if castling_rights & chess.BB_A8:
        board_array[8][4] = 1
    else:
        board_array[8][4] = 0
    en_passant = board.ep_square
    if en_passant is not None:
        board_array[8][5] = en_passant
    else:
        board_array[8][5] = 0
    halfmove_clock = board.halfmove_clock
    board_array[8][6] = halfmove_clock
    fullmove_number = board.fullmove_number
    board_array[8][7] = fullmove_number
    return board_array

def convert_fen_to_dataframe(fen):
    board = chess.Board(fen)
    board_dataframe = pd.DataFrame()
    for i in range(64):
        piece = board.piece_at(i)
        if piece is not None:
            position = chess.square_name(i)
            board_dataframe[position] = [get_piece_int(piece.symbol())]
        else:
            position = chess.square_name(i)
            board_dataframe[position] = [0]
    turn = board.turn
    if turn:
        board_dataframe['turn'] = [1]
    else:
        board_dataframe['turn'] = [-1]
    castling_rights = board.castling_rights
    if castling_rights & chess.BB_H1:
        board_dataframe['castling_rights_H1'] = [1]
    else:
        board_dataframe['castling_rights_H1'] = [0]
    if castling_rights & chess.BB_A1:
        board_dataframe['castling_rights_A1'] = [1]
    else:
        board_dataframe['castling_rights_A1'] = [0]
    if castling_rights & chess.BB_H8:
        board_dataframe['castling_rights_H8'] = [1]
    else:
        board_dataframe['castling_rights_H8'] = [0]
    if castling_rights & chess.BB_A8:
        board_dataframe['castling_rights_A8'] = [1]
    else:
        board_dataframe['castling_rights_A8'] = [0]
    en_passant = board.ep_square
    if en_passant is not None:
        board_dataframe['en_passant'] = [en_passant]
    else:
        board_dataframe['en_passant'] = [0]
    halfmove_clock = board.halfmove_clock
    board_dataframe['halfmove_clock'] = [halfmove_clock]
    fullmove_number = board.fullmove_number
    board_dataframe['fullmove_number'] = [fullmove_number]
    return board_dataframe


def convert_dataframe_to_board(board_dataframe):
    board = chess.Board()
    board.clear()
    for i in range(64):
        position = chess.square_name(i)
        piece = board_dataframe[position]
        if piece != 0:
            if piece > 0:
                board.set_piece_at(i, chess.Piece(piece, chess.WHITE))
            else:
                board.set_piece_at(i, chess.Piece(-piece, chess.BLACK))
    turn = board_dataframe['turn']
    if turn == -1:
        board.turn = False
    castling_rights = 0
    if board_dataframe['castling_rights_H1'] == 1:
        castling_rights |= chess.BB_H1
    if board_dataframe['castling_rights_A1'] == 1:
        castling_rights |= chess.BB_A1
    if board_dataframe['castling_rights_H8'] == 1:
        castling_rights |= chess.BB_H8
    if board_dataframe['castling_rights_A8'] == 1:
        castling_rights |= chess.BB_A8
    board.castling_rights = castling_rights
    en_passant = board_dataframe['en_passant']
    if en_passant != 0:
        board.ep_square = en_passant
    halfmove_clock = board_dataframe['halfmove_clock']
    board.halfmove_clock = halfmove_clock
    fullmove_number = board_dataframe['fullmove_number']
    board.fullmove_number = fullmove_number
    return board

def convert_array_to_fen(board_array):
    board_string = ""
    for i in range(64):
        piece = board_array[i//8][i%8]
        if piece == PIECE_INT.PAWN:
            board_string += "P"
        elif piece == PIECE_INT.KNIGHT:
            board_string += "N"
        elif piece == PIECE_INT.BISHOP:
            board_string += "B"
        elif piece == PIECE_INT.ROOK:
            board_string += "R"
        elif piece == PIECE_INT.QUEEN:
            board_string += "Q"
        elif piece == PIECE_INT.KING:
            board_string += "K"
        elif piece == -PIECE_INT.PAWN:
            board_string += "p"
        elif piece == -PIECE_INT.KNIGHT:
            board_string += "n"
        elif piece == -PIECE_INT.BISHOP:
            board_string += "b"
        elif piece == -PIECE_INT.ROOK:
            board_string += "r"
        elif piece == -PIECE_INT.QUEEN:
            board_string += "q"
        elif piece == -PIECE_INT.KING:
            board_string += "k"
        else:
            board_string += "1"
    board_string += " "
    if board_array[8][0] == 1:
        board_string += "w"
    else:
        board_string += "b"
    board_string += " "
    if board_array[8][1] == 1:
        board_string += "K"
    if board_array[8][2] == 1:
        board_string += "Q"
    if board_array[8][3] == 1:
        board_string += "k"
    if board_array[8][4] == 1:
        board_string += "q"
    if board_array[8][1] == 0 and board_array[8][2] == 0 and board_array[8][3] == 0 and board_array[8][4] == 0:
        board_string += "-"
    board_string += " "
    if board_array[8][5] != 0:
        board_string += chess.square_name(board_array[8][5])
    else:
        board_string += "-"
    board_string += " "
    board_string += str(board_array[8][6])
    board_string += " "
    board_string += str(board_array[8][7])
    return board_string

'''
test_fen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq -"
test_dict = convert_fen_to_dict(test_fen)
print(test_dict)
print(convert_dict_to_fen(test_dict))
test_array = convert_fen_to_array(test_fen)
print(test_array)
print(convert_array_to_fen(test_array))

'''

'\ntest_fen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq -"\ntest_dict = convert_fen_to_dict(test_fen)\nprint(test_dict)\nprint(convert_dict_to_fen(test_dict))\ntest_array = convert_fen_to_array(test_fen)\nprint(test_array)\nprint(convert_array_to_fen(test_array))\n\n'

In [5]:
import os
from pathlib import Path
import chess
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Flatten
import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import chess
import matplotlib.pyplot as plt
import pandas as pd
from optimizers import *
#import adam and sgd
from torch.optim import Adam, SGD

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# Create a tensorboard writer
writer = SummaryWriter()


default_board = chess.Board()
default_board_fen = default_board.fen()
default_board_fen = convert_fen_to_array(default_board_fen)
default_board_fen = np.asarray(default_board_fen).astype('float32')


'''
for file in os.listdir('./minedGames'):
    if file.endswith('.csv'):
        # print the file name
        print(file)
        data = pd.concat([data, pd.read_csv('./minedGames/' + file)])


dataList = []
for file in os.listdir('./pgns'):
    if file.endswith('.csv'):
        # print the file name
        print(file)
        # multiply the evaluation by 100 to align it with the other evaluations
        fileData = pd.read_csv('./pgns/' + file)
        #fileData['evaluation'] = fileData['evaluation'] * 100
        dataList.append(fileData)
'''

# using dd, a dask dataframe, to read the csv file in lichessEvalsFormatted
dataList = []
for file in os.listdir('./lichessEvalsFormatted'):
    if file.endswith('.csv'):
        print(file)
        chunksize = 10 ** 6
        for chunk in pd.read_csv('./lichessEvalsFormatted/'+file, chunksize=chunksize):
            # convert all columns to float16
            for column in chunk.columns:
                if chunk[column].dtype == 'float64' or chunk[column].dtype == 'float32':
                    chunk[column] = chunk[column].astype('float16')
                if chunk[column].dtype == 'int64' or chunk[column].dtype == 'int32':
                    chunk[column] = chunk[column].astype('int16')
            # remove any rows with where the evaluation type is 'mate'
            chunk = chunk[chunk.evaluation_type != 'mate']
            dataList.append(chunk)
            print(chunk.head())

data = pd.concat(dataList)

# create a dataset class to store the data
class ChessDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data.iloc[idx]

# create a dataset object
data = pd.concat(dataList)
print(data.head())

#dataset = ChessDataset(data)





# divide all evaluations by 100 to convert them to centipawns
#y = y / 100

# normalize y so that all values greater than 15 are 15 and all values less than -15 are -15
# normalize y so that all values are between -1 and 1
#y = (y - y.min()) / (y.max() - y.min()) * 2 - 1


data = pd.concat(dataList)
data = data.dropna()
print(data.head())
print(data.shape)

x = data.drop(['evaluation', 'evaluation_type', 'Unnamed: 0'], axis=1)
y = data['evaluation']

# normalize y so that all values greater than 15 are 15 and all values less than -15 are -15
y = np.clip(y, -15, 15)




from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

X_train = pd.get_dummies(X_train, drop_first=False)
X_test = pd.get_dummies(X_test, drop_first=False)

X_train = X_train.astype('float16')
X_test = X_test.astype('float16')

# convert the data to a tensor
X_train = torch.tensor(X_train.values, dtype=torch.bfloat16)

y_train = torch.tensor(y_train.values, dtype=torch.bfloat16)
# reshape y_train to be a 2D tensor
y_train = y_train.view(-1, 1)

# print the first 5 rows of y_train
print(y_train[:5])


X_test = torch.tensor(X_test.values, dtype=torch.bfloat16)

y_test = torch.tensor(y_test.values, dtype=torch.bfloat16)
# reshape y_test to be a 2D tensor
y_test = y_test.view(-1, 1)


print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

# create a data loader
from torch.utils.data import DataLoader, TensorDataset

train_loader = DataLoader(dataset= TensorDataset(X_train, y_train), batch_size=1024, shuffle=True)

test_loader = DataLoader(dataset= TensorDataset(X_test, y_test), batch_size=1024, shuffle=True)

def create_optimizer(args, model_params):
    args.optim = args.optim.lower()
    if args.optim == 'sgd':
        return SGD(model_params, lr=args.lr, momentum=args.momentum,
                         weight_decay=args.weight_decay)
    elif args.optim == 'adam':
        return Adam(model_params, lr=args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == 'fromage':
        return Fromage(model_params, args.lr)
    elif args.optim == 'radam':
        return RAdam(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == 'adamw':
        return AdamW(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == 'adabelief':
        return AdaBelief(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == 'yogi':
        return Yogi(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay)
    elif args.optim == 'msvag':
        return MSVAG(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay)
    elif args.optim == 'ladam':
        return LAdam(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == 'ladabelief':
        return LAdaBelief(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == 'aida':
        return Aida(model_params, args.lr, betas=(args.beta1, args.beta2),
                          weight_decay=args.weight_decay, eps=args.eps)
    elif args.optim == "setadam":
        return SETAdam(model_params, args.lr, betas=(args.beta1, args.beta2),
                            weight_decay=args.weight_decay, eps=args.eps)


class Args:
    def __init__(self):
        self.optim = 'setadam'
        self.lr = 1e-4
        self.momentum = 0.9
        self.weight_decay = 1e-9
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.eps = 1e-7
    def optim(self):
        return self.optim
    def lr(self):
        return self.lr
    def momentum(self):
        return self.momentum
    def weight_decay(self):
        return self.weight_decay
    def beta1(self):
        return self.beta1
    def beta2(self):
        return self.beta2
    def eps(self):
        return self.eps
args = Args()


# Define the neural network model
class RegressionModel(nn.Module):
    def __init__(self, input_size):
        super(RegressionModel, self).__init__()
        self.fc1 = nn.Linear(input_size, input_size)
        self.fc2 = nn.Linear(input_size, input_size)
        self.fc3 = nn.Linear(input_size, input_size)
        self.fc4 = nn.Linear(input_size, input_size)
        self.fc5 = nn.Linear(input_size, input_size)
        self.fc6 = nn.Linear(input_size, input_size)
        self.fc10 = nn.Linear(input_size, 1)

    def forward(self, x):
        # ensure that the input is a bfloat16
        x = x.to(torch.float32)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))
        x = torch.relu(self.fc5(x))
        x = torch.relu(self.fc6(x))
        x = self.fc10(x)
        return x


# Create the model
criterion = nn.L1Loss()

shape = X_train[0].shape
# print the first row of the first batch
model = RegressionModel(shape[0]).to(device)
print("X_train.shape[1]:", shape[0])
# Define the loss function and optimizer
optimizer = create_optimizer(args, model.parameters())
# Train the model
epochs = 20
losses = []
averageLosses = []
startTime = datetime.datetime.now()

for epoch in range(epochs):
    startTime = datetime.datetime.now()
    model.train()
    firstLossOfEpoch = 0
    lastLossOfEpoch = 0
    noprogressEpochs = 0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        # Zero the parameter gradients
        optimizer.zero_grad()
        # Forward pass
        outputs = model(inputs.float())
        loss = criterion(outputs, labels)
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        averageLosses.append(loss.item())
        writer.add_scalar('training loss', loss.item(), epoch * len(train_loader) + i)
        # if the error is extremely large, print the error and the inputs
        '''
        if loss.item() > 0.1:
            print(f'Error: {loss.item()}')
            print(f'Inputs: {inputs}')
        '''
        if i % 1000 == 0 and i != 0:
            averageLoss = sum(averageLosses) / len(averageLosses)
            averageLosses = []
            predictedTime = datetime.datetime.now() + datetime.timedelta(seconds=(len(train_loader) - i) * (datetime.datetime.now() - startTime).seconds / (i))
            # draw a graph with the predicted value and the actual value
            writer.add_scalar('predicted vs actual', outputs[0].item(), epoch * len(train_loader) + i)
            writer.add_scalar('predicted vs actual', labels[0].item(), epoch * len(train_loader) + i)
            print(f'Epoch [{epoch + 1}/{epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}, Average Loss: {averageLoss:.4f}, Time Remaining: {predictedTime - datetime.datetime.now()}')
            if firstLossOfEpoch == 0:
                firstLossOfEpoch = loss.item()
            else:
                lastLossOfEpoch = loss.item()

            if i == len(train_loader) - 1:
                lastLossOfEpoch = loss.item()
    print(f'Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}')
    if firstLossOfEpoch <= lastLossOfEpoch:
        noprogressEpochs += 1
    else:
        noprogressEpochs = 0
    if noprogressEpochs >= 3:
        print("No progress for 3 epochs. Stopping training.")
        break
    if epoch % 5 == 0:
        # save the model
        dateStr = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        torch.save(model.state_dict(), f'./models/model_epoch{epoch, args.optim, args.lr, dateStr}.pt')



'''
# Test the model
board = chess.Board()
board.push_san('e4')
board.push_san('e5')
board.push_san('Nf3')
board.push_san('Nc6')
board.push_san('Bb5')

board_fen = board.fen()
board_fen = convert_fen_to_dataframe(board_fen)
board_fen = pd.get_dummies(board_fen, drop_first=False)
board_fen = board_fen.astype(float)
board_fen = torch.tensor(board_fen.values).float()

print(board_fen.shape, X_train.shape)
print(board_fen)
output = model(board_fen[0])
print(output.item())
stockfish.set_fen_position(board.fen())
print(stockfish.get_evaluation()["value"])
'''
optimizerAlgorithm = args.optim
# check if there is already a model with the same optimizer
if os.path.exists(f'./models/{optimizerAlgorithm}.pt'):
    optimizerAlgorithm = f'{optimizerAlgorithm}_1'
    while os.path.exists(f'./models/{optimizerAlgorithm}.pt'):
        optimizerAlgorithm = f'{optimizerAlgorithm[:-2]}_{int(optimizerAlgorithm[-1]) + 1}'
# Save the model
optimizersEps = args.eps
batchSize = train_loader.batch_size
modelName = optimizerAlgorithm+"_epoch"+str(epochs)+"_eps"+str(optimizersEps)+"_lr"+str(args.lr)+"_batchSize"+str(batchSize)+"_modelsize 6"
torch.save(model.state_dict(), f'./models/{modelName}.pt')

# evaluate the model using the test data
model.eval()

with torch.no_grad():
    for i, (inputs, labels) in enumerate(test_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        averageLosses.append(loss.item())
        writer.add_scalar('test loss', loss.item(), i)
        if i % 1000 == 0:
            print(f'Loss: {loss.item()}')
print(f'Loss: {loss.item()}')


# Close the writer
writer.close()

import matplotlib.pyplot as plt

# Plot the loss
plt.plot(averageLosses, label='Loss averages over 1000 steps')
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.show()

Device: cuda


FileNotFoundError: [Errno 2] No such file or directory: './lichessEvalsFormatted'