In [7]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import StandardScaler

In [39]:
df = pd.read_csv('chess.csv', nrows=100000)
df["Evaluation"] = df["Evaluation"].apply(lambda x: float(x[1:]) if x.startswith("#") else float(x))
df.head()

Unnamed: 0,FEN,Evaluation
0,rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR ...,-10.0
1,rnbqkbnr/pppp1ppp/4p3/8/4P3/8/PPPP1PPP/RNBQKBN...,56.0
2,rnbqkbnr/pppp1ppp/4p3/8/3PP3/8/PPP2PPP/RNBQKBN...,-9.0
3,rnbqkbnr/ppp2ppp/4p3/3p4/3PP3/8/PPP2PPP/RNBQKB...,52.0
4,rnbqkbnr/ppp2ppp/4p3/3p4/3PP3/8/PPPN1PPP/R1BQK...,-26.0


In [40]:
def fen_to_bitboards(fen):
    """
    Convert a FEN string to a bitboard representation.

    Parameters:
    - fen (str): A FEN string representing the chess position.

    Returns:
    - List[int]: A list of 12 integers representing the bitboards for each piece type.
                 Order:
                 [White Pawns, White Knights, White Bishops, White Rooks,
                  White Queens, White King, Black Pawns, Black Knights,
                  Black Bishops, Black Rooks, Black Queens, Black King]
    """
    # Initialize 12 bitboards to 0
    bitboards = [0] * 12

    # Mapping from piece to bitboard index
    piece_to_index = {
        'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
        'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11
    }

    # Extract the piece placement field from FEN
    piece_placement = fen.split(' ')[0]
    ranks = piece_placement.split('/')

    if len(ranks) != 8:
        raise ValueError("Invalid FEN: Should have 8 ranks.")

    # Iterate over each rank starting from rank 8 to rank 1
    for rank_idx, rank in enumerate(ranks):
        file_idx = 0  # Files go from 'a' to 'h' (0 to 7)
        for char in rank:
            if char.isdigit():
                # Empty squares; skip the number of squares indicated
                file_idx += int(char)
            elif char in piece_to_index:
                # Calculate the square index (0 to 63)
                square = (7 - rank_idx) * 8 + file_idx
                bit = 1 << square
                index = piece_to_index[char]
                bitboards[index] |= bit
                file_idx += 1
            else:
                raise ValueError(f"Invalid character in FEN: '{char}'")
        if file_idx != 8:
            raise ValueError(f"Invalid FEN: Rank {rank_idx + 1} does not have exactly 8 squares.")

    return bitboards

fen_to_bitboards("rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq - 0 1")

[268496640,
 66,
 36,
 129,
 8,
 16,
 71776119061217280,
 4755801206503243776,
 2594073385365405696,
 9295429630892703744,
 576460752303423488,
 1152921504606846976]

In [41]:
def bitboards_to_tensor(bitboards):
    tensor = []
    for bb in bitboards:
        binary_str = bin(bb)[2:].zfill(64)
        binary_list = [int(bit) for bit in binary_str]
        tensor.append(binary_list)
    return torch.tensor(tensor, dtype=torch.float32)

# Custom Dataset
class ChessDataset(Dataset):
    def __init__(self, dataframe):
        self.bitboards = dataframe['Bitboard'].apply(bitboards_to_tensor).tolist()
        self.evaluations = torch.tensor(dataframe['Eval_scaled'].values, dtype=torch.float32).unsqueeze(1)
    
    def __len__(self):
        return len(self.evaluations)
    
    def __getitem__(self, idx):
        return self.bitboards[idx], self.evaluations[idx]

# Neural Network Model
class NNUEModel(nn.Module):
    def __init__(self):
        super(NNUEModel, self).__init__()
        self.input_size = 12 * 64
        self.hidden_size = 256
        self.output_size = 1
        
        self.fc1 = nn.Linear(self.input_size, self.hidden_size)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(self.hidden_size, self.hidden_size)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(self.hidden_size, self.output_size)
        
    def forward(self, x):
        x = x.view(-1, self.input_size)
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x
    
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for bitboards, evaluations in dataloader:
        bitboards = bitboards.to(device)
        evaluations = evaluations.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(bitboards)
        loss = criterion(outputs, evaluations)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * bitboards.size(0)
    
    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss

def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for bitboards, evaluations in dataloader:
            bitboards = bitboards.to(device)
            evaluations = evaluations.to(device)
            
            outputs = model(bitboards)
            loss = criterion(outputs, evaluations)
            
            running_loss += loss.item() * bitboards.size(0)
    
    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss

def test_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for bitboards, evaluations in dataloader:
            bitboards = bitboards.to(device)
            evaluations = evaluations.to(device)
            
            outputs = model(bitboards)
            loss = criterion(outputs, evaluations)
            
            running_loss += loss.item() * bitboards.size(0)
    
    test_loss = running_loss / len(dataloader.dataset)
    return test_loss

test_loss = test_model(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}')


Test Loss: 0.3958


In [42]:
# df = df[(-100 <= df['Evaluation']) & (df['Evaluation'] <= 100)]
df['Bitboard'] = df['FEN'].apply(fen_to_bitboards)
print(df.head())

                                                 FEN  Evaluation  \
0  rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR ...       -10.0   
1  rnbqkbnr/pppp1ppp/4p3/8/4P3/8/PPPP1PPP/RNBQKBN...        56.0   
2  rnbqkbnr/pppp1ppp/4p3/8/3PP3/8/PPP2PPP/RNBQKBN...        -9.0   
3  rnbqkbnr/ppp2ppp/4p3/3p4/3PP3/8/PPP2PPP/RNBQKB...        52.0   
4  rnbqkbnr/ppp2ppp/4p3/3p4/3PP3/8/PPPN1PPP/R1BQK...       -26.0   

                                            Bitboard  
0  [268496640, 66, 36, 129, 8, 16, 71776119061217...  
1  [268496640, 66, 36, 129, 8, 16, 67290111619891...  
2  [402712320, 66, 36, 129, 8, 16, 67290111619891...  
3  [402712320, 66, 36, 129, 8, 16, 65038346165944...  
4  [402712320, 2112, 36, 129, 8, 16, 650383461659...  


In [43]:
scaler = StandardScaler()
df['Eval_scaled'] = scaler.fit_transform(df[['Evaluation']])

# Create dataset and dataloaders
dataset = ChessDataset(df)
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Initialize model, loss, optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = NNUEModel().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

# Training loop
num_epochs = 50
best_val_loss = float('inf')
patience = 5
trigger_times = 0

for epoch in range(1, num_epochs + 1):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss = validate_epoch(model, val_loader, criterion, device)
    
    print(f'Epoch {epoch}: Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}')
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        trigger_times = 0
        torch.save(model.state_dict(), 'best_nnue_model.pth')
    else:
        trigger_times += 1
        if trigger_times >= patience:
            print('Early stopping!')
            break

# Load best model and evaluate
model.load_state_dict(torch.load('best_nnue_model.pth'))
test_loss = test_model(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}')

Epoch 1: Train Loss = 0.7387, Val Loss = 0.5156
Epoch 2: Train Loss = 0.4270, Val Loss = 0.3889
Epoch 3: Train Loss = 0.3103, Val Loss = 0.4184
Epoch 4: Train Loss = 0.2542, Val Loss = 0.3913
Epoch 5: Train Loss = 0.2227, Val Loss = 0.3283
Epoch 6: Train Loss = 0.1893, Val Loss = 0.3641
Epoch 7: Train Loss = 0.1592, Val Loss = 0.3456
Epoch 8: Train Loss = 0.1491, Val Loss = 0.4170
Epoch 9: Train Loss = 0.1431, Val Loss = 0.3383
Epoch 10: Train Loss = 0.1337, Val Loss = 0.3145
Epoch 11: Train Loss = 0.1123, Val Loss = 0.3448
Epoch 12: Train Loss = 0.1127, Val Loss = 0.3447
Epoch 13: Train Loss = 0.1103, Val Loss = 0.3192
Epoch 14: Train Loss = 0.1022, Val Loss = 0.3802
Epoch 15: Train Loss = 0.0945, Val Loss = 0.3485
Early stopping!
Test Loss: 0.3700


  model.load_state_dict(torch.load('best_nnue_model.pth'))
