In [78]:
import zstandard as zstd
import chess.pgn
import io
import re
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import chess.svg
from IPython.display import SVG, display

In [79]:
# Utility functions

def show_board(board: chess.Board):
    display(SVG(chess.svg.board(board=board)))

In [None]:
# SimpleNNUE Model Description
#
# This model is a lightweight feedforward neural network inspired by NNUE 
# architectures used in modern chess engines.
#
# Architecture:
# - Input: 768-dim binary vector representing piece-square features 
#   (12 planes × 64 squares), always from the side to move perspective.
#
# - Hidden Layer:
#   - Fully connected (Linear) layer of size (768 → hidden_size)
#   - Activation: SCReLU (Squared Clipped ReLU)
#     f(x) = clamp(x, 0, 1)^2
#
# - Output Layer:
#   - Fully connected layer of size (hidden_size → 1)
#   - Activation: Sigmoid, maps output to probability (0, 1)
#
# - Output: A single scalar interpreted as the win probability 
#   for the side to move.
#
# Purpose:
# - Trained to classify positions as likely win/loss (label 0 or 1),
#   or optionally to model draw probability (label = 0.5).

In [89]:
class SimpleNNUE(nn.Module):
    def __init__(self, hidden_size=1024):
        super(SimpleNNUE, self).__init__()
        self.hidden = nn.Linear(768, hidden_size, bias=True)
        self.output = nn.Linear(hidden_size, 1, bias=True)

    def forward(self, x):
        # Apply hidden layer + SCReLU
        x = self.hidden(x)
        x = torch.clamp(x, 0, 1) ** 2 # Apply SCReLU activation
        x = self.output(x)
        x = torch.sigmoid(x)
        return x.squeeze(1)  


In [92]:
PIECE_ORDER = [
    chess.PAWN,
    chess.KNIGHT,
    chess.BISHOP,
    chess.ROOK,
    chess.QUEEN,
    chess.KING,
]

def extract_vector(board: chess.Board):
    if board.turn == chess.BLACK:
        board = board.mirror()

    vec = np.zeros(768, dtype=np.int8)
    
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            type_index = PIECE_ORDER.index(piece.piece_type)
            color_offset = 0 if piece.color == chess.WHITE else 6
            idx = 64 * (color_offset + type_index) + square
            vec[idx] = 1
    return vec


def extract_vectors_from_game(game):
    board = game.board()
    data = []

    result = game.headers.get("Result")
    if result not in ("1-0", "0-1", "1/2-1/2"):
        return []  # skip unrecognized or incomplete games

    for node in game.mainline():
        board.push(node.move)

        if board.is_check():
            continue

        vector = extract_vector(board)

        # Determine label from side to move's perspective
        stm = board.turn  # chess.WHITE or chess.BLACK

        if result == "1-0":
            label = 1 if stm == chess.WHITE else 0
        elif result == "0-1":
            label = 1 if stm == chess.BLACK else 0
        else:  # draw
            label = 0.5  # or skip if you don't want draws

        data.append((vector, label))

    return data

def process_pgn_zst(file_path, max_games=None, min_elo=2000):
    dctx = zstd.ZstdDecompressor()
    vectors = []

    with open(file_path, 'rb') as f:
        with dctx.stream_reader(f) as reader:
            text_stream = io.TextIOWrapper(reader, encoding='utf-8')
            game_count = 0
            while True:
                try:
                    game = chess.pgn.read_game(text_stream)
                    if game is None:
                        break

                    # --- Elo filter ---
                    try:
                        white_elo = int(game.headers.get("WhiteElo", 0))
                        black_elo = int(game.headers.get("BlackElo", 0))
                        if white_elo < min_elo or black_elo < min_elo:
                            continue
                    except ValueError:
                        continue  # skip if ELOs are not integers

                    vectors.extend(extract_vectors_from_game(game))
                    game_count += 1
                    if max_games and game_count >= max_games:
                        break
                except Exception as e:
                    print(f"Error parsing game #{game_count}: {e}")
                    continue
    positions = np.array([v[0] for v in vectors], dtype=np.int32)
    labels = np.array([v[1] for v in vectors], dtype=np.float32)
    return positions, labels



In [None]:
positions, labels = process_pgn_zst("lichess_db_standard_rated_2015-11.pgn.zst", max_games=100, min_elo=2000)


In [94]:
print(positions.shape, labels.shape)

(7202, 768) (7202,)


In [95]:

# Convert to PyTorch tensors
X = torch.tensor(positions, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.float32)

# Create dataset and dataloader
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=64, shuffle=True)

# Define model
model = SimpleNNUE(hidden_size=1024)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCELoss()  # Binary Cross-Entropy Loss for sigmoid output

# Move to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model.to(device)

# Training loop
epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss = 0

    for batch_x, batch_y in loader:
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = loss_fn(outputs, batch_y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}")

Using device: cuda
Epoch 1/10, Loss: 67.2652
Epoch 2/10, Loss: 39.8400
Epoch 3/10, Loss: 26.4357
Epoch 4/10, Loss: 21.3649
Epoch 5/10, Loss: 18.4142
Epoch 6/10, Loss: 16.6601
Epoch 7/10, Loss: 15.6389
Epoch 8/10, Loss: 15.0096
Epoch 9/10, Loss: 14.7221
Epoch 10/10, Loss: 13.8288


In [111]:
def save_weights_to_txt(model, filename="nnue_weights.txt"):
    with open(filename, "w") as f:
        # First layer weights: (1024 x 768)
        first_layer_weights = model.hidden.weight.detach().cpu().numpy()
        for row in first_layer_weights:
            f.write(" ".join(f"{w:.6f}" for w in row) + "\n")

        # Blank line
        f.write("\n")

        # Hidden layer biases: (1024,)
        hidden_biases = model.hidden.bias.detach().cpu().numpy()
        f.write(" ".join(f"{b:.6f}" for b in hidden_biases) + "\n")

        # Blank line
        f.write("\n")

        # Output layer weights: (1 x 1024)
        output_layer_weights = model.output.weight.detach().cpu().numpy()
        for row in output_layer_weights:
            f.write(" ".join(f"{w:.6f}" for w in row) + "\n")

        # Blank line
        f.write("\n")

        # Output bias: scalar
        output_bias = model.output.bias.item()
        f.write(f"{output_bias:.6f}\n")

save_weights_to_txt(model, "nnue_weights.txt")


In [114]:
fen = "r1bq1rk1/1p3pp1/p1nbp2p/3n4/P1BP4/2N2N2/1P1BQPPP/2RR2K1 w - - 2 14"
#fen = "rnbqkb1r/ppp2ppp/4pn2/8/2BP4/4PN2/PP3PPP/RNBQK2R b KQkq - 0 5"

board = chess.Board(fen)

x = extract_vector(board)
x = torch.tensor(x, dtype=torch.float32).unsqueeze(0).to(device)
model.eval()
with torch.no_grad():
    prediction = model(x).item()
print(f"Predicted win probability for the side to move: {prediction:.4f}")


Predicted win probability for the side to move: 0.0902


In [100]:
print(torch.sigmoid(torch.tensor(0.21)).item())  # Example of using sigmoid on a scalar value

0.5523079037666321
