In [125]:
import pathlib
import chess
import chess.pgn
import chess.engine
import numpy as np
import h5py

In [126]:
# General space saving algorithms that are often used in chess programming sites

def board_to_arr(board: chess.Board):
    black, white = board.occupied_co

    bitboards = np.array([
        black & board.pawns,
        black & board.knights,
        black & board.bishops,
        black & board.rooks,
        black & board.queens,
        black & board.kings,
        white & board.pawns,
        white & board.knights,
        white & board.bishops,
        white & board.rooks,
        white & board.queens,
        white & board.kings,
    ], dtype=np.uint64)

    return bitboards

def bitboards_to_array(bb: np.ndarray) -> np.ndarray:
    bb = np.asarray(bb, dtype=np.uint64)[:, np.newaxis]
    s = 8 * np.arange(7, -1, -1, dtype=np.uint64)
    b = (bb >> s).astype(np.uint8)
    b = np.unpackbits(b, bitorder="little")
    return b.reshape(-1, 8, 8)


## Create Model Architecture

In [258]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_kernels, v_planes=2):
        super(ResNet, self).__init__()
        self.in_planes = 12
        self.value_planes = v_planes

        self.conv1 = nn.Conv2d(self.in_planes, self.in_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_planes)
        self.layer1 = self._make_layer(block, num_kernels[0], num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, num_kernels[1], num_blocks[1], stride=1)
        self.layer3 = self._make_layer(block, num_kernels[2], num_blocks[2], stride=1)
        self.layer4 = self._make_layer(block, num_kernels[3], num_blocks[3], stride=1)

        # value head
        self.finalLayerValue = nn.Sequential(
            nn.Conv2d(num_kernels[3], self.value_planes, kernel_size=1, stride=1, padding=0),  # 64, 1
            nn.BatchNorm2d(self.value_planes),
            nn.ReLU()
            )
        self.valueLinear = nn.Linear(64*self.value_planes*block.expansion, 1)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        
        out = self.finalLayerValue(out)
        out = out.view(out.size(0), -1)
        out = self.valueLinear(out)
        return out


In [259]:
def ResNetSmall():
    return ResNet(BasicBlock, [1,1,1,2], [32,32,32,32], v_planes=2)

def ResNetRegular():
    return ResNet(BasicBlock, [3,3,3,3], [256,256,256,256], v_planes=8)

In [260]:
model = ResNetSmall()

In [261]:
saveName = 'positions.h5'

with h5py.File(saveName, 'r') as hf:
    X = hf["X"][:]
    y = hf["y"][:]
    
position = torch.from_numpy(bitboards_to_array(X[1])).unsqueeze(0).float()   
model(position.float())


tensor([[-0.2150]], grad_fn=<AddmmBackward0>)

### Dataloader Generator

In [262]:
class TrainingDataset(torch.utils.data.Dataset):

    def __init__(self, X, y):
        self.X = X
        self.y = torch.from_numpy(y)

    def __getitem__(self, index):

        return torch.from_numpy(bitboards_to_array(self.X[index])).float(), self.y[index]

    def __len__(self):
        return len(self.X)


### Train Model

In [305]:
with h5py.File('positions.h5', 'r') as hf:
    X = hf["X"][:]
    y = hf["y"][:]

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define your model
model = ResNetSmall()

# Set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Set the number of epochs
num_epochs = 100

trainloader = torch.utils.data.DataLoader(dataset=TrainingDataset(X, y), batch_size=64, shuffle=True)

# Training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    for inputs, labels in trainloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        #print(f"Batch Loss: {loss.item()}")

    epoch_loss = running_loss / len(trainloader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/100], Loss: 0.0032
Epoch [2/100], Loss: 0.0030
Epoch [3/100], Loss: 0.0030
Epoch [4/100], Loss: 0.0030
Epoch [5/100], Loss: 0.0030
Epoch [6/100], Loss: 0.0030
Epoch [7/100], Loss: 0.0030
Epoch [8/100], Loss: 0.0030
Epoch [9/100], Loss: 0.0030
Epoch [10/100], Loss: 0.0030
Epoch [11/100], Loss: 0.0030
Epoch [12/100], Loss: 0.0030
Epoch [13/100], Loss: 0.0030


In [274]:
from scipy.stats import spearmanr

def spearman_corr(x, y):
    x_indices = list(range(len(x)))
    y_indices = list(range(len(y)))

    x, x_indices = map(list, zip(*sorted(zip(x, x_indices))))
    y, y_indices = map(list, zip(*sorted(zip(y, y_indices))))

    correlation, _ = spearmanr(x_indices, y_indices)
    
    return correlation