# Ref
https://github.com/shafitek/DeepChess-AI/blob/master/scripts/DeepChess.py

In [1]:
import pandas as pd
import numpy as np
import os 
import chess
import chess.engine
import chess.svg
import random
import torch 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from models.MiniDeepchess import SiameseNetwork, AutoEncoder
from tqdm import tqdm

In [2]:
class AutoEncoderChessDataset(Dataset):
    def __init__(self, whiteWonStates, whiteLostStates):
        """
        Dataset d√πng ƒë·ªÉ hu·∫•n luy·ªán AutoEncoder t·ª´ c√°c tr·∫°ng th√°i th·∫Øng v√† thua.

        Args:
            whiteWonStates (np.ndarray): Tr·∫°ng th√°i b√†n c·ªù khi tr·∫Øng th·∫Øng.
            whiteLostStates (np.ndarray): Tr·∫°ng th√°i b√†n c·ªù khi tr·∫Øng thua (t·ª©c ƒëen th·∫Øng).
        """
        sampleSize = min(len(whiteWonStates), len(whiteLostStates))
        whiteWonStates = whiteWonStates.copy()
        whiteLostStates = whiteLostStates.copy()
        np.random.shuffle(whiteWonStates)
        np.random.shuffle(whiteLostStates)
        whiteWonStates = whiteWonStates[:sampleSize]
        whiteLostStates = whiteLostStates[:sampleSize]

        # G√°n label: tr·∫Øng th·∫Øng ‚Üí 1, tr·∫Øng thua (ƒëen th·∫Øng) ‚Üí 0
        whiteWonLabels = np.ones((sampleSize, 1))
        whiteLostLabels = np.zeros((sampleSize, 1))

        self.data = np.concatenate((whiteWonStates, whiteLostStates), axis=0)
        self.labels = np.concatenate((whiteWonLabels, whiteLostLabels), axis=0)

        # Shuffle c√πng l√∫c data v√† label
        perm = np.random.permutation(len(self.data))
        self.data = self.data[perm]
        self.labels = self.labels[perm]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        """
        Tr·∫£ v·ªÅ tr·∫°ng th√°i b√†n c·ªù v√† nh√£n d∆∞·ªõi d·∫°ng tensor.

        Returns:
            x (Tensor): Tr·∫°ng th√°i b√†n c·ªù.
            y (Tensor): Nh√£n (1 n·∫øu tr·∫Øng th·∫Øng, 0 n·∫øu tr·∫Øng thua).
        """
        x = torch.from_numpy(self.data[index]).float()
        y = torch.tensor(self.labels[index]).float().squeeze()  # squeeze ƒë·ªÉ c√≥ d·∫°ng scalar
        return x, y


In [None]:
percentTrain = 0.8

whiteWin = np.load("./data/whiteWin.npy")
whiteLost = np.load("./data/blackWin.npy")

whiteWinTrain = whiteWin[:int(len(whiteWin) * percentTrain)]
whiteLostTrain = whiteLost[:int(len(whiteLost) * percentTrain)]
whiteWinTest = whiteWin[int(len(whiteWin) * percentTrain):]
whiteLostTest = whiteLost[int(len(whiteLost) * percentTrain):]

print(f"Train: {len(whiteWinTrain)} white wins, {len(whiteLostTrain)} black wins")
print(f"Test: {len(whiteWinTest)} white wins, {len(whiteLostTest)} black wins")

dataset_train = AutoEncoderChessDataset(
    whiteWonStates=whiteWinTrain,
    whiteLostStates=whiteLostTrain
)
dataloader_train = DataLoader(dataset_train, batch_size=512, shuffle=True)

dataset_test = AutoEncoderChessDataset(
    whiteWonStates=whiteWinTest,
    whiteLostStates=whiteLostTest
)
dataloader_test = DataLoader(dataset_test, batch_size=512, shuffle=True)

Train: 1600000 white wins, 1600000 black wins
Test: 400000 white wins, 400000 black wins


In [4]:
model = AutoEncoder(
    layer=[773, 600, 400, 200, 100]
)
model_path = "./checkpoints/AutoEncoder.pth"
if os.path.exists(model_path):
    print("Loading AutoEncoder model from checkpoint...")
    model.load_state_dict(torch.load(model_path, map_location='cpu'))
model.eval()

Loading AutoEncoder model from checkpoint...


AutoEncoder(
  (encoder): Sequential(
    (0): Linear(in_features=773, out_features=600, bias=True)
    (1): BatchNorm1d(600, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): Linear(in_features=600, out_features=400, bias=True)
    (4): BatchNorm1d(400, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): LeakyReLU(negative_slope=0.01)
    (6): Linear(in_features=400, out_features=200, bias=True)
    (7): BatchNorm1d(200, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.01)
    (9): Linear(in_features=200, out_features=100, bias=True)
    (10): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): LeakyReLU(negative_slope=0.01)
  )
  (decoder): Sequential(
    (0): Linear(in_features=100, out_features=200, bias=True)
    (1): BatchNorm1d(200, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Le

In [5]:
optimizer = optim.Adam(model.parameters())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [6]:
def loss_function(recon_x, x):
    return F.binary_cross_entropy(recon_x, x, reduction='sum')

def mse_loss_function(recon_x, x):
    return F.mse_loss(recon_x, x, reduction='sum')

In [18]:
def train(model, dataloader, optimizer, mse_loss_function, device, model_path):
    model.train()
    running_loss = 0.0
    
    for i, (x, _) in enumerate(tqdm(dataloader)):
        x = x.to(device)

        optimizer.zero_grad()

        # Forward pass
        recon_batch, _ = model(x)
        loss = mse_loss_function(recon_batch, x)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    train_loss = running_loss / len(dataloader)
    return train_loss


@torch.no_grad()
def test(model, dataloader, mse_loss_function, device):
    model.eval()
    total_loss = 0.0

    for x, _ in dataloader:
        x = x.to(device)

        recon_batch, _ = model(x)
        loss = mse_loss_function(recon_batch, x)
        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Test Loss: {avg_loss:.6f}")
    return avg_loss

In [None]:
epochs = 1000
patience = 5
best_loss = float('inf')
patience = 5

for epoch in range(epochs):
    train_loss = train(model, dataloader_train, optimizer, mse_loss_function, device, model_path)
    test_loss = test(model, dataloader_test, mse_loss_function, device)
    
    print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.6f}, Test Loss: {test_loss:.6f}")
    
    if test_loss < best_loss:
        best_loss = test_loss
        patience_counter = 0
        torch.save(model.state_dict(), model_path)
        print("‚úÖ Saved best model")
    else:
        patience_counter += 1
        print(f"‚è∏Ô∏è No improvement. Patience counter: {patience_counter}/{patience}")
        if patience_counter >= patience:
            print("üõë Early stopping triggered")
            break

 71%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà   | 17637/25000 [02:31<00:59, 124.66it/s]