In [1]:
%pip install --quiet python-chess


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [2]:
%pip install --quiet torch


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


Нужные импорты

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import chess
import pandas as pd
import random
from typing import List

Для начала энкодер досок.

Кодирует доску в виде 15-канального изображения. Происходит что-то типо OneHotEncoder на уровне фигур и цветов. Например один из каналов выглядит так:

```
0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0
```

Это слой с чёрными пешками.

Также кодируется информация о взятии на проходе, рокировке и превращении пешки

In [4]:
import chess
import torch
import numpy as np
from typing import Literal, Union, List, Tuple


class MatrixEncoder:
    def encode(self, board: chess.Board) -> np.ndarray:
        # 12 каналов для фигур
        board_state = np.zeros((15, 8, 8), dtype=np.float32)

        # 1. Кодируем состояние доски
        for square in chess.SQUARES:
            piece = board.piece_at(square)
            if piece is not None:
                # Определяем канал:
                # 0-5: пешка, конь, слон, ладья, ферзь, король
                channel = piece.piece_type - 1
                if piece.color == chess.BLACK:
                    channel += 6
                row = square // 8
                col = square % 8
                board_state[channel, row, col] = 1.0

        # 2. Дополнительные признаки
        if board.has_kingside_castling_rights(chess.WHITE):
            board_state[12][7, 4] = 1.0  # Король белых на e1
        if board.has_queenside_castling_rights(chess.WHITE):
            board_state[12][7, 4] = 1.0  # Король белых на e1
        if board.has_kingside_castling_rights(chess.BLACK):
            board_state[12][7, 0] = -1.0  # Король чёрных на e8
        if board.has_queenside_castling_rights(chess.BLACK):
            board_state[12][7, 0] = -1.0  # Король чёрных на e8

        if board.ep_square is not None:
            ep_row = board.ep_square // 8
            ep_col = board.ep_square % 8
            board_state[13][ep_row, ep_col] = 1.0

        if board.peek() and board.peek().promotion is None:
            last_move = board.peek()
            if abs(last_move.from_square - last_move.to_square) == 16:  # Ход на две клетки
                double_move_row = last_move.to_square // 8
                double_move_col = last_move.to_square % 8
                board_state[14][double_move_row, double_move_col] = 1.0

        return board_state

    def get_encoded_shape(self):
        return (15, 8, 8)

Теперь архитектура модели

Residual блоки пока особо смысла не имеют, но можно попробовать обучить более глубокую модель.

Также было было предложено сравнить с обычным перцептроном

In [5]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(channels)
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += residual
        out = self.relu(out)
        return out

class Board2Vec(nn.Module):
    def __init__(self, hidden_dim, output_dim):
        super().__init__()
        # Входной блок с 15 каналов (6 фигур × 2 цвета + информация про рокировку, взятие на проходе и promotion)
        self.initial = nn.Sequential(
            nn.Conv2d(15, hidden_dim, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(hidden_dim),
            nn.ReLU(inplace=True)
        )

        # Резидуальные блоки
        self.block1 = ResidualBlock(hidden_dim)
        self.block2 = ResidualBlock(hidden_dim)

        # Глобальный пуллинг и финальные слои
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, boards: torch.Tensor):
        # boards: (batch_size, 15, 8, 8)
        x = self.initial(boards)
        x = self.block1(x)
        x = self.block2(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)

        # Конкатенация с дополнительными признаками
        x = self.fc(x)
        return x

Подготовка датасета происходит в другом [ноутбуке](prepare_dataset.ipynb)

Здесь происходит загрузка датасета. В функции getitem выбирается случайный файл и случайная позиция в нём (таргет). Далее выбирается случайная позиция из окна контекста (контекст). Наконец набирается некоторое количество негативных примеров - либо вне окна контекста, либо из предзаготовленного пула

In [6]:
import torch
import numpy as np
import os
import random
import logging
from torch.utils.data import Dataset, DataLoader
from functools import lru_cache
from time import time

# Настройка логгера
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler('chess_dataset.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
logger.disabled = True

class ChessDataset(Dataset):
    def __init__(self, data_dir, context_size=5, negatives_count=10, min_game_length=11):
        start_time = time()
        logger.info("Инициализация ChessDataset с data_dir=%s, context_size=%d, negatives_count=%d, min_game_length=%d",
                    data_dir, context_size, negatives_count, min_game_length)
        
        self.data_dir = data_dir
        self.context_size = context_size
        self.negatives_count = negatives_count
        self.min_game_length = max(min_game_length, 2 * context_size + 1)
        
        # Собираем только подходящие игры
        self.game_files = [f for f in os.listdir(data_dir) if f.endswith('.npy')]
        logger.info("Найдено %d .npy файлов", len(self.game_files))
        
        self.valid_game_files = []
        for game_file in self.game_files:
            try:
                game_path = os.path.join(self.data_dir, game_file)
                game_length = len(np.load(game_path, mmap_mode='r'))
                if game_length >= self.min_game_length:
                    self.valid_game_files.append(game_file)
                else:
                    logger.warning("Игра %s слишком короткая (длина=%d, требуется=%d), пропущена",
                                  game_file, game_length, self.min_game_length)
            except Exception as e:
                logger.error("Ошибка при проверке файла %s: %s", game_file, str(e))
        
        if not self.valid_game_files:
            logger.error("Нет игр, удовлетворяющих минимальной длине!")
            raise ValueError("Нет игр, удовлетворяющих минимальной длине!")
        
        logger.info("Найдено %d валидных игр", len(self.valid_game_files))
        
        # Кэшируем загрузку игр
        self._load_game = lru_cache(maxsize=10)(self._load_game)

        # Создаем пул игр для добора негативных примеров
        self.pool_size = min(20, len(self.valid_game_files))
        self.pool_files = random.sample(self.valid_game_files, self.pool_size)
        self.pool_data = [self._load_game(f) for f in self.pool_files]
        logger.info("Создан пул из %d игр для негативных примеров", self.pool_size)
        
        logger.info("Инициализация завершена за %.2f секунд", time() - start_time)
    
    def _load_game(self, game_file):
        """Загружает игру с помощью mmap"""
        start_time = time()
        # logger.debug("Загрузка игры %s", game_file)
        try:
            game_data = np.load(os.path.join(self.data_dir, game_file), mmap_mode='r')
            # logger.debug("Игра %s загружена за %.2f секунд, длина=%d",
                        # game_file, time() - start_time, len(game_data))
            return game_data
        except Exception as e:
            logger.error("Ошибка при загрузке игры %s: %s", game_file, str(e))
            raise
    
    def __len__(self):
        """Примерная оценка количества примеров"""
        length = len(self.valid_game_files) * 50
        logger.info("Возвращена примерная длина датасета: %d", length)
        return length
    
    def _get_random_position(self):
        """Выбирает случайную игру и позицию"""
        start_time = time()
        game_file = random.choice(self.valid_game_files)
        game_data = self._load_game(game_file)
        pos_idx = random.randint(self.context_size, len(game_data) - self.context_size - 1)
        # logger.debug("Выбрана позиция: игра=%s, индекс=%d, время=%.2f секунд",
                    # game_file, pos_idx, time() - start_time)
        return game_file, pos_idx
    
    def __getitem__(self, _):
        """Генерирует пример"""
        start_time = time()
        # logger.debug("Запрошен пример")
        
        try:
            # Выбор случайной позиции
            game_file, pos_idx = self._get_random_position()
            game_data = self._load_game(game_file)
            
            # Таргет
            target = game_data[pos_idx]
            # logger.debug("Таргет выбран, форма=%s", str(target.shape))
            
            # Контекст
            start = max(0, pos_idx - self.context_size)
            end = min(len(game_data), pos_idx + self.context_size + 1)
            context = game_data[random.randint(start, end - 1)]
            # logger.debug("Контекст выбран, форма=%s", str(context.shape))
            
            # Негативные примеры
            neg_indices = []
            for _ in range(self.negatives_count):
                neg_idx = random.randint(0, len(game_data) - 1)
                if abs(neg_idx - pos_idx) > self.context_size + 3:
                    neg_indices.append(neg_idx)
            
            # Если не хватает негативных примеров, добираем из пула
            if len(neg_indices) < self.negatives_count:
                needed = self.negatives_count - len(neg_indices)
                # logger.debug("Не хватает %d негативных примеров, добираем из пула", needed)
                
                for _ in range(needed):
                    # Выбираем случайную игру из пула
                    pool_game_data = random.choice(self.pool_data)
                    # Выбираем случайную позицию из этой игры
                    neg_idx = random.randint(0, len(pool_game_data) - 1)
                    neg_indices.append((pool_game_data, neg_idx))
            
            # Собираем все негативные примеры
            negatives = []
            for idx in neg_indices:
                if isinstance(idx, tuple):  # пример из пула
                    game, i = idx
                    negatives.append(game[i])
                else:  # пример из текущей игры
                    negatives.append(game_data[idx])
            
            negatives = np.stack(negatives)
            # logger.debug("Негативные примеры выбраны, форма=%s", str(negatives.shape))
            
            # Конвертация в тензоры
            target = torch.from_numpy(target.copy()).float()
            context = torch.from_numpy(context.copy()).float()
            negatives = torch.from_numpy(negatives.copy()).float()
            
            # logger.info("Пример сгенерирован за %.2f секунд: target_shape=%s, context_shape=%s, negatives_shape=%s",
                       # time() - start_time, str(target.shape), str(context.shape), str(negatives.shape))
            
            return target, context, negatives
        
        except Exception as e:
            logger.error("Ошибка при генерации примера: %s", str(e))
            raise
    
def create_dataloader(data_dir, batch_size=32, num_workers=4, **kwargs):
    start_time = time()
    logger.info("Создание DataLoader с batch_size=%d, num_workers=%d", batch_size, num_workers)
    
    try:
        dataset = ChessDataset(data_dir, **kwargs)
        dataloader = DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=num_workers,
            pin_memory=True,
            persistent_workers=True if num_workers > 0 else False
        )
        logger.info("DataLoader создан за %.2f секунд", time() - start_time)
        return dataloader
    except Exception as e:
        logger.error("Ошибка при создании DataLoader: %s", str(e))
        raise

In [7]:
def criterion(target_embed: torch.Tensor, context_embed: torch.Tensor, negatives_embed: torch.Tensor):
    # Положительные примеры: скалярное произведение между target и context
    pos_scores = torch.mul(target_embed, context_embed).sum(dim=1)
    pos_loss = -torch.nn.functional.logsigmoid(pos_scores)

    # Негативные примеры: скалярное произведение между target и negatives
    neg_scores = torch.bmm(negatives_embed, target_embed.unsqueeze(2)).squeeze(2)
    neg_loss = -torch.nn.functional.logsigmoid(-neg_scores).sum(dim=1)

    # Общая потеря: усредняем по батчу
    loss = (pos_loss + neg_loss).mean()
    return loss

---

In [None]:
import torch
import torch.optim as optim
from tqdm import tqdm
import json
import os
import csv

BATCH_SIZE = 64
CONTEXT_SIZE = 6
NUM_WORKERS = 0
NEGATIVES_COUNT = 5
MIN_GAME_LENGTH = 15

HIDDEN_DIM = 128
OUTPUT_DIM = 64

NUM_EPOCHS = 5
LEARNING_RATE = 0.01

# ========== Data Loaders ==========
train_loader = create_dataloader(
    '/home/jupyter/datasphere/project/games/train',
    batch_size=BATCH_SIZE,
    context_size=CONTEXT_SIZE,
    num_workers=NUM_WORKERS,
    negatives_count=NEGATIVES_COUNT,
    min_game_length=MIN_GAME_LENGTH
)
test_loader = create_dataloader(
    '/home/jupyter/datasphere/project/games/test',
    batch_size=BATCH_SIZE,
    context_size=CONTEXT_SIZE,
    num_workers=NUM_WORKERS,
    negatives_count=NEGATIVES_COUNT,
    min_game_length=MIN_GAME_LENGTH
)
val_loader = create_dataloader(
    '/home/jupyter/datasphere/project/games/val',
    batch_size=BATCH_SIZE,
    context_size=CONTEXT_SIZE,
    num_workers=NUM_WORKERS,
    negatives_count=NEGATIVES_COUNT,
    min_game_length=MIN_GAME_LENGTH
)

# ========== Save hyperparameters ==========
config = {
    "HIDDEN_DIM": HIDDEN_DIM,
    "OUTPUT_DIM": OUTPUT_DIM,
    "LEARNING_RATE": LEARNING_RATE,
    "NEGATIVES_COUNT": NEGATIVES_COUNT,
    "NUM_EPOCHS": NUM_EPOCHS,
}
os.makedirs("checkpoints", exist_ok=True)
with open("checkpoints/config.json", "w") as f:
    json.dump(config, f, indent=4)

# ========== Setup CSV Logger ==========
os.makedirs("logs", exist_ok=True)
csv_log_path = "logs/train_log.csv"
with open(csv_log_path, mode='w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["epoch", "step", "train_loss", "val_loss"])  # Write header

# ========== Training ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Board2Vec(HIDDEN_DIM, OUTPUT_DIM).to(device)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

log_every = 50  # steps

for epoch in range(NUM_EPOCHS):
    model.train()
    total_loss = 0.0
    train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Train]", leave=False)

    for step, batch in enumerate(train_bar):
        target, context, negatives = batch
        target = target.to(device)
        context = context.to(device)
        negatives = negatives.view(-1, 15, 8, 8).to(device)

        optimizer.zero_grad()
        target_embed = model(target)
        context_embed = model(context)
        negatives_embed = model(negatives).reshape((-1, NEGATIVES_COUNT, OUTPUT_DIM))
        loss = criterion(target_embed, context_embed, negatives_embed)
        loss.backward()
        optimizer.step()

        batch_loss = loss.item()
        total_loss += batch_loss

        # Log every `log_every` steps
        if step % log_every == 0:
            with open(csv_log_path, mode='a', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([epoch + 1, step, batch_loss, ""])  # Leave val_loss empty for now
        
        train_bar.set_postfix(loss=f"{batch_loss:.4f}")

    avg_loss = total_loss / len(train_loader)

    # ========== Validation ==========
    model.eval()
    val_loss = 0.0
    val_bar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Val]", leave=False)

    with torch.no_grad():
        for batch in val_bar:
            target, context, negatives = batch
            target = target.to(device)
            context = context.to(device)
            negatives = negatives.view(-1, 15, 8, 8).to(device)

            target_embed = model(target)
            context_embed = model(context)
            negatives_embed = model(negatives).reshape((-1, NEGATIVES_COUNT, OUTPUT_DIM))
            loss = criterion(target_embed, context_embed, negatives_embed)
            val_loss += loss.item()
            val_bar.set_postfix(loss=f"{loss.item():.4f}")
    
    avg_val_loss = val_loss / len(val_loader)

    # Log the final train and validation loss for the epoch
    with open(csv_log_path, mode='a', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([epoch + 1, "final", avg_loss, avg_val_loss])

    # ========== Save model ==========
    checkpoint_path = f"checkpoints/board2vec_epoch{epoch+1}.pt"
    torch.save(model.state_dict(), checkpoint_path)

                                                                                   

KeyboardInterrupt: 