Los datos que se usan en este proyecto se pueden obtener en la siguiente carpeta de drive:
https://drive.google.com/drive/folders/1JtJOwMGcV6QVgm3wcaJRUa6ImQ_C_-d3?usp=sharing

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import chess # chess==1.3.0
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
import random
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [323]:
"""
Procesamos los datos de entrada originales que estan en formato FEN para 
convertilos un vector de 64 enteros y las salidas originales las normalizamos.
"""

data_path = 'data/fen_to_stockfish_evaluation.csv'

def process_data():
    """ 
    Procesamos la informacion del .csv para convertir el tablero en formato
    fen a un vector de 64 entradas     
    """
    def converter(fen_board):
        board = chess.Board(fen_board)
        input_vec = []
        for row in range(7,-1,-1):
            for col in range(8):
                piece = board.piece_at(chess.square(col,row))
                
                if piece is None:
                    val = 0
                else:
                    val = piece.piece_type
                    if not piece.color:
                        val *= -1
                input_vec.append(val)
        return input_vec
    
    data_frame = pd.read_csv(data_path,names=['fen','eval'],converters={'fen':converter})
    size = data_frame.shape[0]
    array = data_frame.to_numpy()
    fen = np.array(list(array[:,0]))
    eval_ = array[:,1].reshape((size,1))
    eval_ = (eval_-min(eval_))/(max(eval_)-min(eval_))
    
    array = np.zeros((size,65))
    array[:,:-1] = fen
    array[:,-1:] = eval_
        
    data_frame = pd.DataFrame(array)
    
    
    data_frame.to_csv('processed_data_normalizada.csv', index=False)
    
process_data()

In [3]:
"""
Creamos una subclase de Dataset, para poder obtener muestras de los datos
de forma mas eficiente
"""

p_data_path = 'data/processed_data_normalizada.csv'

class Data_set(Dataset):

    def __init__(self, csv_file):
        self.data_frame = pd.read_csv(csv_file)

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        fen = self.data_frame.iloc[idx, :-1]
        fen = fen.to_numpy()
        fen = fen.astype('int')
        
        eval_ = self.data_frame.iloc[idx, -1:]
        eval_ = eval_.to_numpy()
        eval_ = eval_.astype('float')
        
        sample = (torch.tensor(fen,dtype=torch.float,device=device),
                  torch.tensor(eval_,dtype=torch.float,device=device))
        

        return sample

# Cargamos los datos en p_data_path
dataset = Data_set(p_data_path)

In [4]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        self.h1 = nn.Linear(64,1024)
        self.h2 = nn.Linear(1024,1024)
        self.h3 = nn.Linear(1024,1024)
        self.outlayer = nn.Linear(1024,1)
        
        self.activation = nn.ELU()
        self.batchNormalization = nn.BatchNorm1d(64)
        self.hiddenNormalization = nn.BatchNorm1d(1024)
        
    def forward(self, batch):
        
        out = self.activation(self.h1(batch))
        out = self.hiddenNormalization(out)
        out = self.activation(self.h2(out))
        out = self.hiddenNormalization(out)
        out = self.activation(self.h3(out))
        out = self.hiddenNormalization(out)
        out = self.activation(self.outlayer(out))
        
        return out
    
    def train(self, epochs, data, criterion, optimizer, batch_size, data_size):
        errors = np.zeros(epochs)
        num_batches = data_size // batch_size
        print(f'Numero de lotes por epoch {num_batches}, tamaño de los lotes {batch_size}')
        time_stamp = time.time()
        for epoch in range(epochs):
            print(f'epoch {epoch}/{epochs}')
            running_loss = 0
            for batch_i in range(num_batches):
                if batch_i % int(num_batches*0.1) == 0:
                    print(f'{int((batch_i*100)/num_batches)}% epoch')
                
                batch = data[batch_i*batch_size : batch_i*batch_size + batch_size]
                inputs, vals = batch
                inputs = self.batchNormalization(inputs)
                optimizer.zero_grad()
                outputs = self(inputs)
                loss = criterion(outputs,vals)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()*inputs.size(0)
            epoch_loss = running_loss / data_size
            print(f'epoch_loss: {epoch_loss}')
            errors[epoch] = epoch_loss
            torch.save(net.state_dict(),"./net.pth")
            
        print(f'time(min): {(time.time()-time_stamp)/60}')
        
        if epochs > 1:
            X_axis = np.arange(epochs)
            plt.figure(1)
            plt.plot(X_axis, errors)
            
# Creamos una red nueva e intentamos cargar los pesos de una guardada si esta existe
net=Net() 
try:
    std_dict = torch.load('net.pth')
    net.load_state_dict(std_dict)
    print('Red cargada')
except:
    pass
net.to(device)

Net(
  (h1): Linear(in_features=64, out_features=1024, bias=True)
  (h2): Linear(in_features=1024, out_features=1024, bias=True)
  (h3): Linear(in_features=1024, out_features=1024, bias=True)
  (outlayer): Linear(in_features=1024, out_features=1, bias=True)
  (activation): ELU(alpha=1.0)
  (batchNormalization): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (hiddenNormalization): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

In [5]:
# Entrenamiento
params = net.parameters()
optimizer = optim.SGD(params, lr=0.0001, momentum=0.1, nesterov=True)
criterion = nn.MSELoss()
batch_size = 256
# train_data_size = 256*1000
train_data_size = int(len(dataset)*0.8)
net.train(1,dataset,criterion,optimizer,batch_size,train_data_size)

torch.save(net.state_dict(),"./net.pth")
print('Model Saved')

Numero de lotes por epoch 13757, tamaño de los lotes 256
epoch 0/1
0% epoch
9% epoch
19% epoch
29% epoch
39% epoch
49% epoch
59% epoch
69% epoch
79% epoch
89% epoch
99% epoch
epoch_loss: 0.05351503683840172
time(min): 22.0408526579539
Model Saved


In [6]:
net=Net()
try:
    std_dict = torch.load('./net.pth')
    net.load_state_dict(std_dict)
    print('Red cargada')
except:
    print('No se pudo cargar la red')
net.to(device)

Red cargada


Net(
  (h1): Linear(in_features=64, out_features=1024, bias=True)
  (h2): Linear(in_features=1024, out_features=1024, bias=True)
  (h3): Linear(in_features=1024, out_features=1024, bias=True)
  (outlayer): Linear(in_features=1024, out_features=1, bias=True)
  (activation): ELU(alpha=1.0)
  (batchNormalization): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (hiddenNormalization): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

In [7]:
# Verificacion del modelo

delta = 0.02

def contarCorrectas(net,batch,vals):
    '''Dado un batch y sus etiquetas, cuenta el numero de respuestas
    correctas de una red, el parametro func aplica una modificacion al 
    tensor que contiene los datos'''
    
    salidas=net(batch)
    cantidadCorrectas=(torch.abs(salidas-vals) < delta).sum()
    return cantidadCorrectas
    
def calcularPrecisionGlobal(net,data_set,batch_size,range_):
    '''Calcula la precision de una red dado un data_loader,
    recibe una funcion que transforma los datos en caso de ser necesario'''
    correctas=0
    num_batches = (range_[1]-range_[0]) // batch_size
    for batch_i in range(num_batches):
            batch = data_set[range_[0]+batch_i*batch_size : range_[0]+batch_i*batch_size + batch_size]
            inputs, vals = batch
            correctas+=contarCorrectas(net,inputs,vals)        
    correctas=correctas.data.tolist()
    return (100*correctas)/(num_batches*batch_size)

train_data_size = int(len(dataset)*0.8)
test_data_size = int(len(dataset)*0.1)
val_data_size = int(len(dataset)*0.1)
range_train = (0,256000)
range_test = (train_data_size,train_data_size+test_data_size)
range_val = (train_data_size+test_data_size, train_data_size+test_data_size+val_data_size)
prec_train = calcularPrecisionGlobal(net,dataset,512,range_train)
prec_test = calcularPrecisionGlobal(net,dataset,256,range_test)
prec_val = calcularPrecisionGlobal(net,dataset,256,range_test)
print("Precision en conjunto de entrenamiento: %.4f%%"%(prec_train))
print("Precision en conjunto de prueba: %.4f%%"%(prec_test))
print("Precision en conjunto de validacion: %.4f%%"%(prec_val))

Precision en conjunto de entrenamiento: 13.8125%
Precision en conjunto de prueba: 12.4046%
Precision en conjunto de validacion: 12.4046%


In [8]:
# Muestra aleatoria

i = random.randint(0,len(dataset)-10) 

m = net.batchNormalization
print(dataset[i:i+10][1])
print(net(m(dataset[i:i+10][0])))

tensor([[0.5302],
        [0.5272],
        [0.5411],
        [0.5382],
        [0.5520],
        [0.5464],
        [0.5473],
        [0.5466],
        [0.5451],
        [0.5461]])
tensor([[0.5567],
        [0.1330],
        [0.5260],
        [0.3145],
        [0.5900],
        [0.7342],
        [0.7721],
        [0.2822],
        [0.2686],
        [0.6954]], grad_fn=<EluBackward>)
