In [166]:
import chess.pgn

MIN_ELO = 1900
MAX_ELO = 2100
GENERATE_GAMES = 2000

TIME_CONTROL = "480+0"


In [167]:
# read games from a .pgn file

def process_game(game: chess.pgn.Game) -> None:
    # iterate over game moves and store fen in a list
    fen_list = []
    time_list = []
    board = game.board()
        
    node = game.next()
    while node != None:
      board.push(node.move)

      # skip last move
      if node.next() == None:
        break
      
      fen_list.append(" ".join(board.fen().split(" ")[:2]))
      time_list.append(node.clock())
      node = node.next()


    
    # return list of fens
    return fen_list, time_list

moves = []
times = []
with open("../data/data_w_clock.pgn") as pgn:
    while len(moves) < GENERATE_GAMES:
        game = None
        game = chess.pgn.read_game(pgn)

        if game is None:
            break

        # if elo is too low, skip game
        elo = (int(game.headers["WhiteElo"]) + int(game.headers["BlackElo"])) / 2
        if elo < MIN_ELO and elo > MAX_ELO:
          continue

        # if time control is not 480+0, skip game
        if game.headers["TimeControl"] != TIME_CONTROL:
          continue

        # process game
        fens, t_out = process_game(game)
        moves.extend(fens)
        times.extend(t_out)

print(len(moves))
print(len(times))


2055
2055


In [168]:
import numpy as np
def fen_to_npy(fen: str) -> np.array:
    board = chess.Board(fen)
    npy = np.zeros((12, 8,8))
    for square, piece in board.piece_map().items():
        x = square%8
        y = square//8
        piece_owner = 0 if piece.color == chess.WHITE else 1
        piece_type = piece.piece_type -1 + 6*piece_owner

        npy[piece_type][y][x] = 1
    return npy

In [169]:
one_hot_boards = np.array([fen_to_npy(fen) for fen in moves][:-1])
times = np.array(times)

In [170]:
# calculate move times for each move by subtracting the time of the next move from the current move
move_times = np.array([abs(times[i+1] - times[i]) for i in range(len(times)-1)])
move_times = move_times.reshape(-1,1)

In [171]:
# split data into train and test
split_index = int(len(one_hot_boards)*0.8)

train_boards = one_hot_boards[:split_index]
train_times = move_times[:split_index]

test_boards = one_hot_boards[split_index:]
test_times = move_times[split_index:]


# convert to torch tensors
import torch
train_boards = torch.from_numpy(train_boards).float()
train_times = torch.from_numpy(train_times).float()

test_boards = torch.from_numpy(test_boards).float()
test_times = torch.from_numpy(test_times).float()

# print shapes
print(train_boards.shape)
print(train_times.shape)

torch.Size([1643, 12, 8, 8])
torch.Size([1643, 1])


In [182]:
import torch.functional as F
import torch
import torch.nn as nn

relu = F.F.relu


# define the neural network
class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # fully connected layers
        self.fc1 = nn.Linear(12*8*8, 2028)
        self.fc3 = nn.Linear(2028, 256)
        self.fc4 = nn.Linear(256, 228)
        self.fc5 = nn.Linear(228, 64)
        self.fc6 = nn.Linear(64, 32)
        self.fc7 = nn.Linear(32, 1)

        self.dropout = nn.Dropout(0.3)



    def forward(self, x):
        # reshape
        x = x.view(-1, 12*8*8)

        # forward pass
        x = relu(self.fc1(x))
        x = self.dropout(x)
        # x = relu(self.fc2(x))
        # x = self.dropout(x)
        x = relu(self.fc3(x))
        x = self.dropout(x)
        x = relu(self.fc4(x))
        x = self.dropout(x)
        x = relu(self.fc5(x))
        x = self.dropout(x)
        x = relu(self.fc6(x))
        x = self.dropout(x)
        x = self.fc7(x)

        return x
    
    def predict(self, x):
        # reshape
        x = x.view(-1, 12*8*8)

        # forward pass
        x = relu(self.fc1(x))
        x = relu(self.fc3(x))
        x = relu(self.fc4(x))
        x = relu(self.fc5(x))
        x = relu(self.fc6(x))
        x = self.fc7(x)


        return x

In [175]:
net = Net()

# define the loss function
criterion = torch.nn.L1Loss()
# criterion = torch.nn.L1Loss( reduction='mean', reduce=True)

# define the optimizer
optimizer = torch.optim.Adagrad(net.parameters(), lr=0.002)

for epoch in range(550):
    optimizer.zero_grad()
    outputs = net.forward(train_boards)
    loss = criterion(outputs, train_times)
    loss.backward()
    optimizer.step()
    print('epoch {}, loss {}'.format(epoch, (loss.item())))

epoch 0, loss 41.792572021484375
epoch 1, loss 41.70122146606445
epoch 2, loss 41.45191192626953
epoch 3, loss 40.59287643432617
epoch 4, loss 39.01887893676758
epoch 5, loss 37.02381134033203
epoch 6, loss 34.86542892456055
epoch 7, loss 33.391380310058594
epoch 8, loss 31.954452514648438
epoch 9, loss 30.647424697875977
epoch 10, loss 28.786441802978516
epoch 11, loss 27.353059768676758
epoch 12, loss 25.890972137451172
epoch 13, loss 24.73990821838379
epoch 14, loss 24.043481826782227
epoch 15, loss 23.249967575073242
epoch 16, loss 23.19440269470215
epoch 17, loss 21.53426742553711
epoch 18, loss 21.20599937438965
epoch 19, loss 20.406757354736328
epoch 20, loss 20.461585998535156
epoch 21, loss 19.7215518951416
epoch 22, loss 19.43614387512207
epoch 23, loss 19.27555274963379
epoch 24, loss 18.30621337890625
epoch 25, loss 18.358171463012695
epoch 26, loss 18.072864532470703
epoch 27, loss 17.65209197998047
epoch 28, loss 17.53446388244629
epoch 29, loss 17.092655181884766
epoch 3

In [183]:
# save the model
torch.save(net.state_dict(), "model_times.pth")


In [184]:
# evaluate all the games in the test set
test_outputs = net.predict(test_boards)

fens = moves[split_index:]
fens = fens[:-1]


# sort the games by the predicted move time
sorted_games = sorted(zip(fens, test_outputs), key=lambda x: x[1].item(), reverse=True)

# print the games with the longest predicted move times
for fen, time in sorted_games[:100]:
    board = chess.Board(fen)
    print(time.item())
    display(board)

AttributeError: 'Net' object has no attribute 'fc2'