In [2]:
%pip install chess

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.model_selection import train_test_split
import chess
from torch.utils.tensorboard import SummaryWriter
import time
import os
import numpy as np
torch.manual_seed(42)

<torch._C.Generator at 0x26ed89f6150>

In [3]:
# # params
# # 512 * 3865
# PATH_DATA = '/kaggle/input/train-your-own-stockfish-nnue/train.csv'
# LEN_DATA = 512 * 3865
# BATCH_SIZE = 512
# NUM_EPOCHS = 300
# # early stopping
# TOLERANCE = 10
# MIN_DELTA = 50
# # write
# PATH_RUNS = 'runs/nnue_3_experiment'
# # device
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device

# params
# 512 * 3865
PATH_DATA = '../assets/chess-data/fen/train.csv'
LEN_DATA = 512 
BATCH_SIZE = 24
NUM_EPOCHS = 2
# early stopping
TOLERANCE = 2
MIN_DELTA = 1000
# write
PATH_RUNS = 'runs/nnue_3_experiment'
# device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device


device(type='cpu')

In [None]:
class NNUE(nn.Module):
    def __init__(self):
        super(NNUE, self).__init__()
        self.fc1 = nn.Linear(768, 8)
        self.fc2 = nn.Linear(8, 8)
        self.fc3 = nn.Linear(8, 1)

    def clipped_relu(self, x):
        return torch.clamp(x, 0, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.clipped_relu(x)
        x = self.fc2(x)
        x = self.clipped_relu(x)
        x = self.fc3(x)
        return x

# model = NNUE()
# print(model)
# dummy_input = torch.randn(1, 768) # Batch size 1, 768 features
# output = model(dummy_input)
# print(output)

In [None]:
def preprocess_fen(fen):
    board = chess.Board(fen)
    feature = torch.zeros(64, 6, 2)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece is not None:
            color = int(piece.color)
            piece_type = piece.piece_type - 1
            feature[square, piece_type, color] = 1

    # [64,6,2] -> [768] shape (batch size, 768 features)
    feature = feature.reshape(-1)
    return feature

class ChessDataset(Dataset):
    def __init__(self, features, targets):
        self.features = features
        self.targets = targets
    
    def __len__(self):
        return len(self.targets)
    
    def __getitem__(self, idx):
        feature = self.features[idx]
        target = self.targets[idx]

        feature = preprocess_fen(feature)
        target = torch.tensor(target, dtype=torch.float32)

        return feature, target
    
# fen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 0"
# print(preprocess_fen("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 0"))

In [None]:
df = pd.read_csv(PATH_DATA)
print('Original shape:', df.shape)
df = df[:LEN_DATA]
print('Used shape:', df.shape)

X = df['FEN'].to_list()
y = df['Evaluation'].to_list()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
train_dataset = ChessDataset(X_train, y_train)
test_dataset = ChessDataset(X_test, y_test)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)
print(f"train_loader: {len(train_loader)}\t test_loader: {len(test_loader)}")

In [None]:
# train_dataset[0]

In [None]:
class EarlyStopping:
    def __init__(self, tolerance=10, min_delta=10000):
        self.tolerance = tolerance
        self.counter = 0
        self.prev = False
        self.early_stop = False

        self.min_delta = min_delta
        self.min_train_loss = float('inf')

    def condition(self, train_loss, validation_loss):
        return abs(train_loss - validation_loss) <= self.min_delta and self.min_train_loss <= train_loss
    
    def __call__(self, train_loss, validation_loss):
        self.min_train_loss = min(self.min_train_loss, train_loss)
        if self.condition(train_loss, validation_loss):
            if self.prev == True:
                self.counter +=1
            else:
                self.counter = 1
            self.prev = True

            if self.counter >= self.tolerance:  
                self.early_stop = True
        else:
            self.prev = False

In [None]:
model = NNUE()
checkpoint = torch.load(f'./checkpoint/nnue_3_1978880d_512bs_300es_61e.pth', weights_only=True, map_location=torch.device(device))
model.load_state_dict(checkpoint['model_state_dict'])
model = model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

writer = SummaryWriter(PATH_RUNS)
os.makedirs('checkpoint', exist_ok=True)
model

In [None]:
early_stopping = EarlyStopping(tolerance=TOLERANCE, min_delta=MIN_DELTA)
min_avg_val_loss = checkpoint['loss']

for epoch in range(NUM_EPOCHS):
    start_time = time.time()
    # training phase
    model.train()
    train_loss = 0.0
    for batch in train_loader:
        features, targets = batch
        features = features.to(device)
        targets = targets.to(device)
        
        outputs = model(features)
        # outputs: (batchsize, 1) -> (batchsize), target; (batchsize)
        loss = criterion(outputs.reshape(-1), targets)
        train_loss += loss.item()
        
        optimizer.zero_grad()
        loss.backward() 
        optimizer.step()
    avg_train_loss = train_loss / len(train_loader)
    writer.add_scalar('Loss/train', avg_train_loss, epoch)

    # evaluation phase
    model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    with torch.no_grad():
        for batch in test_loader:
            features, targets = batch
            features = features.to(device)
            targets = targets.to(device)
            
            outputs = model(features)
    
            loss = criterion(outputs.reshape(-1), targets)
            val_loss += loss.item()
    avg_val_loss = val_loss / len(test_loader)
    writer.add_scalar('Loss/val', avg_val_loss, epoch)
    
    early_stopping(avg_train_loss, avg_val_loss)
    if early_stopping.early_stop:
        print(f"Early stopping triggered after {epoch+1} epochs.")
        break
        
    # Save model checkpoint if validation loss improves
    if avg_val_loss < min_avg_val_loss:
        min_avg_val_loss = avg_val_loss
        path = f'./checkpoint/nnue_3_{LEN_DATA}d_{BATCH_SIZE}bs_{NUM_EPOCHS}es_{epoch+1}e.pth'
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': min_avg_val_loss
        }
        torch.save(checkpoint, path)
        
    end_time = time.time()
    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Time: {round(end_time - start_time)}s")

writer.close()


In [None]:
# saved_model = NNUE()
# saved_model.load_state_dict(torch.load(f'./checkpoint/nnue_3_1978880d_512bs_300es_61e.pth', weights_only=True, map_location=torch.device(device)))
# saved_model.eval()
# test_inp = preprocess_fen("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 0")
# test_out = saved_model(test_inp)
# test_out

In [None]:
# checkpoint = torch.load(f'./checkpoint/nnue_3_512d_24bs_2es_1e.pth', map_location=torch.device(device))
# checkpoint

In [6]:
%reload_ext tensorboard

In [7]:
%load_ext tensorboard
%tensorboard --logdir runs --port 6006

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6006 (pid 12492), started 0:02:25 ago. (Use '!kill 12492' to kill it.)

In [None]:
# class MinimaxNNUE:
#     def __init__(self, depth=3, path_file='./checkpoint/nnue_512batchsize_101epochs'):
#         self.depth = depth
#         self.nnue_3 = NNUE()
#         self.nnue_3.load_state_dict(torch.load(path_file, weights_only=True, map_location=torch.device(device)))
#         self.nnue_3.eval()
        
#     def evaluate_board(self, board):
#         """Evaluate the board position based on material and piece-square tables."""
#         if board.is_checkmate():
#             return -10000 if board.turn == chess.WHITE else 10000
#         if board.is_stalemate() or board.is_insufficient_material():
#             return 0
        
#         inp = preprocess_fen(board.fen())
#         score = self.nnue_3(inp)
#         return score

#     def alphabeta(self, board, depth, alpha, beta, maximizing_player):
#         """Minimax with Alpha-Beta pruning."""
#         if depth == 0 or board.is_game_over():
#             return self.evaluate_board(board)
        
#         if maximizing_player:
#             max_eval = float('-inf')
#             for move in board.legal_moves:
#                 board.push(move)
#                 eval = self.alphabeta(board, depth - 1, alpha, beta, False)
#                 board.pop()
#                 max_eval = max(max_eval, eval)
#                 alpha = max(alpha, eval)
#                 if beta <= alpha:
#                     break  # Beta cutoff
#             return max_eval
#         else:
#             min_eval = float('inf')
#             for move in board.legal_moves:
#                 board.push(move)
#                 eval = self.alphabeta(board, depth - 1, alpha, beta, True)
#                 board.pop()
#                 min_eval = min(min_eval, eval)
#                 beta = min(beta, eval)
#                 if beta <= alpha:
#                     break  # Alpha cutoff
#             return min_eval

#     def find_best_move(self, board):
#         """Find the best move for the current position."""
#         if board.is_game_over():
#             return None
        
#         best_move = None
#         best_value = float('-inf') if board.turn == chess.WHITE else float('inf')
#         alpha = float('-inf')
#         beta = float('inf')
        
#         for move in board.legal_moves:
#             board.push(move)
#             value = self.alphabeta(board, self.depth - 1, alpha, beta, not board.turn)
#             board.pop()
            
#             if board.turn == chess.WHITE:  # Maximizing (White)
#                 if value > best_value:
#                     best_value = value
#                     best_move = move
#                 alpha = max(alpha, value)
#             else:  # Minimizing (Black)
#                 if value < best_value:
#                     best_value = value
#                     best_move = move
#                 beta = min(beta, value)
        
#         return best_move