In [199]:
%cd /home/assaf/jupyter/chess
%reset -f

import chess
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from chess_utils import matrix_to_board

import torch.nn as nn



/home/assaf/jupyter/chess


### Build a DataSet

In [200]:


class ChessDataset(Dataset):
    def __init__(self, filename):
        with open(filename, "rb") as f:
            self.states_df = pickle.load(f)

    def __len__(self):
        return len(self.states_df)

    def __getitem__(self, idx):
        # display(self.states_df.iloc[idx])
        if self.states_df.iloc[idx].winner == 'black':
            winner = -1 
        elif self.states_df.iloc[idx].winner == 'white':
            winner = 1 
        else:
            winner = 0
        if self.states_df.iloc[idx].turn %2:   #white turn 
            brd_state =  self.states_df.iloc[idx].matrix
        else:
            brd_state =  np.flipud(-self.states_df.iloc[idx].matrix).copy() 
            winner = -winner
        n = torch.tensor(brd_state+7).unsqueeze(0).to(torch.int64)
        brd_state3d = torch.zeros(15, 8, 8, dtype=torch.float32)
        brd_state3d.scatter_(0, n, 1.0)
        brd_state3d = brd_state3d[[0,1,2,3,4,5,6,8,9,10,11,12,13,14],:,:]
        steps = self.states_df.iloc[idx].turns-self.states_df.iloc[idx].turn 
        result = torch.tensor((winner, steps), dtype=torch.float32)
        return brd_state3d, result
        # return torch.tensor(brd_state3d, dtype=torch.float32).unsqueeze(0), result

train_ds = ChessDataset("train_states_ds.pkl")
test_ds = ChessDataset("test_states_ds.pkl")
len(train_ds), len(test_ds)

# train_ds[1][0]

(1080598, 10021)

In [201]:
class MyResNet(nn.Module):
    def __init__(self):
        super(MyResNet, self).__init__()
        
        do = 0.1

        self.cnn_0 = nn.Conv2d(in_channels=14,  out_channels=64,  kernel_size=1, padding=0, stride=1) 

        self.cnn_1 = nn.Conv2d(in_channels=64,  out_channels=64,  kernel_size=5, padding=2, stride=1)
        self.cnn_2 = nn.Conv2d(in_channels=64,  out_channels=64,  kernel_size=5, padding=2, stride=1)
        self.cnn_3 = nn.Conv2d(in_channels=64,  out_channels=128, kernel_size=3, padding=1, stride=1) #here comes pooling 8->4
        self.cnn_4 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1, stride=1)
        self.cnn_5 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1, stride=1) #here comes pooling 4->2
        self.cnn_6 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1, stride=1)
        self.cnn_7 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1, stride=1) #here comes pooling 2->1

        self.ln2d_1 = nn.LayerNorm([64, 8, 8])
        self.ln2d_2 = nn.LayerNorm([64, 8, 8])
        self.ln2d_3 = nn.LayerNorm([128, 4, 4])
        self.ln2d_4 = nn.LayerNorm([128, 4, 4])
        self.ln2d_5 = nn.LayerNorm([256, 2, 2])
        self.ln2d_6 = nn.LayerNorm([256, 2, 2])
        self.ln2d_7 = nn.LayerNorm([512, 1, 1])

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.output_layer = nn.Linear(512, 2)
        self.dropout = nn.Dropout(p=do)

    def forward(self, x):
        x = self.cnn_0(x)

        x = torch.relu(self.cnn_1(x))
        x = self.ln2d_1(x)
        x = self.dropout(x)

        x = torch.relu(self.cnn_2(x))
        x = self.ln2d_2(x)
        x = self.dropout(x)

        x = torch.relu(self.cnn_3(x))
        x = self.pool(x)
        x = self.ln2d_3(x)
        x = self.dropout(x)

        x = torch.relu(self.cnn_4(x))
        x = self.ln2d_4(x)
        x = self.dropout(x)

        x = torch.relu(self.cnn_5(x))
        x = self.pool(x)
        x = self.ln2d_5(x)
        x = self.dropout(x)

        x = torch.relu(self.cnn_6(x))
        x = self.ln2d_6(x)
        x = self.dropout(x)

        x = torch.relu(self.cnn_7(x))
        x = self.pool(x)
        x = self.ln2d_7(x)
        x = self.dropout(x)

        x = torch.flatten(x,x.ndim-3)

        x = self.output_layer(x)
        # print(x.shape)

        return x

In [202]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')
# device = 'cpu'
lr = 1e-4
weight_decay = 1e-3
batch_size = 128
epochs = 100

model = MyResNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

train_dataloader = DataLoader(train_ds, batch_size=batch_size, num_workers=8, shuffle=True)
test_dataloader = DataLoader(test_ds, batch_size=batch_size, num_workers=8, shuffle=False)

def criterion(output, targets):
    msel = nn.MSELoss()
    loss = msel(torch.tanh(output[:,0]),targets[:,0]) , msel(output[:,1],targets[:,1])
    return loss

def predict(y,target, tau=0.33):
    z=y*0
    z[y>tau] = 1
    z[y<-tau] = -1
    return torch.isclose(z,target, atol=1e-4, rtol=1e-4)

mloss = []
for epoch in range(epochs):
    model.train()
    p = 0
    acc_sum = 0
    loss_sum=np.array((0.,0.,0.))
    for features, targets in train_dataloader:
        features = features.to(device)
        targets = targets.to(device)
        output = model(features)
        loss2 = criterion(output, targets)
        loss = loss2[0]


        optimizer.zero_grad()  
        loss.backward()  
        optimizer.step()  

        loss_sum += np.array((loss.item(),loss2[0].item(),loss2[1].item()))
        acc_sum += predict(output[:,0],targets[:,0]).sum().item()
        p += 1
    print(epoch, loss_sum/p, acc_sum/len(train_ds))
    mloss.append(loss_sum/p)


    model.eval()
    p = 0
    acc_sum = 0
    loss_sum=np.array((0.,0.,0.))
    with torch.no_grad():
        for features, targets in test_dataloader:
            features = features.to(device)
            targets = targets.to(device)
            output = model(features)
            loss2 = criterion(output, targets)
            loss = loss2[0]

            loss_sum += np.array((loss.item(),loss2[0].item(),loss2[1].item()))
            acc_sum += predict(output[:,0],targets[:,0]).sum().item()
            p += 1
    print('test-results', epoch, loss_sum/p, acc_sum/len(test_ds))


Using device: cuda:0
0 [7.88316619e-01 7.88316619e-01 2.38591017e+03] 0.3007196015539544
test-results 0 [7.51224475e-01 7.51224475e-01 2.69176157e+03] 0.3054585370721485
1 [7.29955212e-01 7.29955212e-01 2.38502328e+03] 0.3715340950103554
test-results 1 [7.12530010e-01 7.12530010e-01 2.69176157e+03] 0.36174034527492266
2 [7.11603377e-01 7.11603377e-01 2.38505212e+03] 0.3962666967734532
test-results 2 [6.87065000e-01 6.87065000e-01 2.69176157e+03] 0.37960283404849815
3 [6.94440734e-01 6.94440734e-01 2.38506391e+03] 0.4174790255025458
test-results 3 [7.03112608e-01 7.03112608e-01 2.69176157e+03] 0.41063766091208465
4 [6.77811842e-01 6.77811842e-01 2.38504187e+03] 0.43643612148088373
test-results 4 [7.08055680e-01 7.08055680e-01 2.69176157e+03] 0.3918770581778266
5 [6.61607962e-01 6.61607962e-01 2.38499735e+03] 0.45513687791389584
test-results 5 [7.10362957e-01 7.10362957e-01 2.69176157e+03] 0.43039616804710107
6 [6.47051695e-01 6.47051695e-01 2.38492552e+03] 0.47168697332402987
test-resul

In [None]:
(matrix_to_board(train_ds[76][0]))

n  = (train_ds[76][0])
print(n.shape)
n = torch.round(train_ds[76][0]+7).to(torch.int64)
out = torch.zeros(14, 8, 8, dtype=torch.float32)
out.scatter_(0, n, 1.0)

out

torch.Size([1, 8, 8])


tensor([[[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]],

        [[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]],

        [[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
       

In [None]:
import torch.nn as nn 
import torch
pool = nn.MaxPool2d(kernel_size=2, stride=2)

a = torch.arange(8).unsqueeze(1)
b = torch.arange(8).unsqueeze(1)/10
x = a.T+b
x = x.unsqueeze(0)
y = pool(x)
print(x,y)

In [None]:
model = MyResNet()
inp = train_ds[76][0]
# display(inp.shape)
inp = torch.stack([inp])
# inp = torch.cat([inp, inp])
# display(inp.shape)
# display(inp)
output = model(inp)
output , train_ds[76][1]

a=np.array((1,2,3))
a+(1,22,3)

import os
os.cpu_count()

20

In [89]:
def predict(y,target, tau=0.33):
    z=y*0
    z[y>tau] = 1
    z[y<-tau] = -1
    return z
x=torch.arange(-1,1,0.1)

torch.stack([x,predict(x,x)]).T
torch.isclose(predict(x,x) ,x*0)

tensor([False, False, False, False, False, False, False,  True,  True,  True,
         True,  True,  True,  True, False, False, False, False, False, False])