# Chess Engine with PyTorch

## Imports

In [1]:
import os
import numpy as np 
import time
import torch
import torch.nn as nn 
import torch.optim as optim 
from torch.utils.data import DataLoader     
from chess import pgn 
from tqdm import tqdm 
import chess
from auxiliary_func import create_input_for_nn, encode_moves
from dataset import ChessDataset
from model import ChessModel
from multiprocessing import Pool, cpu_count

In [2]:
os.system("taskset -p 0xff %d" % os.getpid())

pid 168115's current affinity mask: fffff
pid 168115's new affinity mask: ff


0

# Data preprocessing

## Load data

In [3]:
def load_pgn(file_path, limit_per_file=None):
    games = []
    with open(file_path, 'r') as pgn_file:
        count = 0
        while True:
            game = pgn.read_game(pgn_file)
            if game is None:
                break
            games.append(game)
            count += 1
            if limit_per_file is not None and count >= limit_per_file:
                break
    return games

# Lister tous les fichiers PGN
files = [file for file in os.listdir("../data/pgn") if file.endswith(".pgn")]
LIMIT_OF_FILES = min(len(files), 28)
LIMIT_PER_FILE = 10000  # Limite de parties par fichier

# Charger les fichiers avec une barre de progression
games = []
for i, file in enumerate(tqdm(files[:LIMIT_OF_FILES], desc="Loading PGN files")):
    games.extend(load_pgn(f"../data/pgn/{file}", limit_per_file=LIMIT_PER_FILE))

print(f"GAMES PARSED: {len(games)}")

Loading PGN files: 100%|██████████| 1/1 [00:16<00:00, 16.41s/it]

GAMES PARSED: 10000





In [4]:
print(f"GAMES PARSED: {len(games)}")

GAMES PARSED: 10000


## Convert data into tensors

In [5]:
X, y = create_input_for_nn(games)

print(f"NUMBER OF SAMPLES: {len(y)}")

NUMBER OF SAMPLES: 840640


In [8]:
X = X[0:2500000]
y = y[0:2500000]

In [9]:
y, move_to_int = encode_moves(y)
num_classes = len(move_to_int)

In [10]:
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)

# Preliminary actions

In [11]:
# Create Dataset and DataLoader
dataset = ChessDataset(X, y)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Model Initialization
model = ChessModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

Using device: cuda


In [12]:
def evaluate_material(board: chess.Board):
    piece_values = {
        'p': 1, 'n': 3, 'b': 3, 'r': 5, 'q': 9, 'k': 0,  # Noir
        'P': 1, 'N': 3, 'B': 3, 'R': 5, 'Q': 9, 'K': 0   # Blanc
    }
    material = 0
    for piece in board.board_fen():
        if piece.isalpha():
            material += piece_values[piece]
    return material

# Training

In [None]:
def custom_loss_function(outputs, labels, boards, criterion, device):
    """
    Combinaison de la perte classique et d'une pénalité pour perte matérielle.
    """
    # Perte classique (CrossEntropyLoss)
    loss = criterion(outputs, labels)
    
    # Calcul de la pénalité pour perte matérielle
    material_penalty = 0.0
    for i, board in enumerate(boards):
        # Évaluer le matériel avant le coup
        material_before = evaluate_material(board)
        
        # Récupérer les coups légaux
        legal_moves = list(board.legal_moves)
        
        # Vérifier si l'indice est valide
        move = labels[i].item()
        if move < 0 or move >= len(legal_moves):
            print(f"Invalid move index: {move}, skipping...")
            continue
        
        # Simuler le coup correspondant à l'output
        uci_move = legal_moves[move].uci()
        board.push_uci(uci_move)
        
        # Évaluer le matériel après le coup
        material_after = evaluate_material(board)
        
        # Calculer la perte matérielle
        material_penalty += max(0, material_before - material_after)
        
        # Annuler le coup pour restaurer l'état initial
        board.pop()
    
    # Normaliser la pénalité
    material_penalty = torch.tensor(material_penalty / len(boards), device=device)
    
    # Combiner les deux pertes
    total_loss = loss + 0.01 * material_penalty  # Ajustez le coefficient (0.01) selon vos besoins
    return total_loss

In [20]:
num_epochs = 50
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        # Obtenir les prédictions du modèle
        outputs = model(inputs)  # Raw logits

        # Créer une liste des échiquiers correspondants
        boards = [game.board() for game in games]  # Assurez-vous que `games` est accessible ici

        # Calculer la perte personnalisée
        loss = custom_loss_function(outputs, labels, boards, criterion, device)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()
    epoch_time = end_time - start_time
    minutes: int = int(epoch_time // 60)
    seconds: int = int(epoch_time) - minutes * 60
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s')

  0%|          | 0/13135 [00:00<?, ?it/s]

Invalid move index: 1465, skipping...
Invalid move index: 212, skipping...
Invalid move index: 176, skipping...
Invalid move index: 1457, skipping...
Invalid move index: 1825, skipping...
Invalid move index: 1460, skipping...
Invalid move index: 410, skipping...
Invalid move index: 390, skipping...
Invalid move index: 819, skipping...
Invalid move index: 111, skipping...
Invalid move index: 262, skipping...
Invalid move index: 698, skipping...
Invalid move index: 1228, skipping...
Invalid move index: 1524, skipping...
Invalid move index: 1465, skipping...
Invalid move index: 1334, skipping...
Invalid move index: 1703, skipping...
Invalid move index: 780, skipping...
Invalid move index: 877, skipping...
Invalid move index: 1091, skipping...
Invalid move index: 1394, skipping...
Invalid move index: 1830, skipping...
Invalid move index: 728, skipping...
Invalid move index: 587, skipping...
Invalid move index: 1131, skipping...
Invalid move index: 1223, skipping...
Invalid move index: 618,




IndexError: index 64 is out of bounds for dimension 0 with size 64

In [20]:
num_epochs = 50
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        outputs = model(inputs)  # Raw logits

        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()
    epoch_time = end_time - start_time
    minutes: int = int(epoch_time // 60)
    seconds: int = int(epoch_time) - minutes * 60
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s')

100%|██████████| 2701/2701 [00:05<00:00, 453.63it/s]


Epoch 1/50, Loss: 5.9271, Time: 0m5s


100%|██████████| 2701/2701 [00:05<00:00, 453.07it/s]


Epoch 2/50, Loss: 5.2826, Time: 0m5s


 41%|████      | 1104/2701 [00:02<00:03, 457.79it/s]


KeyboardInterrupt: 

# Save the model and mapping

In [18]:
# Save the model
torch.save(model.state_dict(), "../../models/TORCH_100EPOCHS.pth")

In [16]:
import pickle

with open("../../models/heavy_move_to_int", "wb") as file:
    pickle.dump(move_to_int, file)