# TRAIN

In [1]:
import numpy as np
import h5py
import chess
import chess.pgn
import chess.engine
import chess.svg

import io
import time
import torch
from torch.utils.data import Dataset, DataLoader
from IPython.display import display,SVG,clear_output 
from torchvision.transforms import ToTensor
from torch.utils.data import random_split

import torch.nn as nn
import torch.nn.functional as F


import torch.optim as optim

import torchvision
import torchvision.datasets as datasets

from tqdm import tqdm
import matplotlib.pyplot as plt
import pylab as pl

In [2]:
# Diccionarios
capa_pieza = {'p': 1, 'r': 3, 'n': 5, 'b': 7, 'q': 9, 'k': 11, 'P': 0, 'R': 2, 'N': 4, 'B': 6, 'Q': 8, 'K': 10}

numero_letra = {0: 'a', 1: 'b', 2: 'c', 3: 'd', 4: 'e', 5: 'f', 6: 'g', 7: 'h'}

letra_numero = {'a': 0, 'b': 1, 'c': 2, 'd':3 , 'e': 4, 'f': 5, 'g':6 , 'h': 7}


# representación tablero
def representacion_tablero(board,verbose=False):
    fen = board.fen().split()
    tablero_piezas_fen = fen[0]
    turno_fen = fen[1]
    enroque_fen = fen[2]
    enpassant_fen = fen[3]

    tablero_r = np.zeros((8,8,22),dtype=np.float32)

    fila=0
    columna=0
    
    for caracter in tablero_piezas_fen:
        
        if not caracter.isdigit():
         
            if caracter == '/':
                fila+=1
                columna=0
           
            else:
    
                tablero_r[fila][columna][capa_pieza[caracter]]=1
                columna+=1
                
        else:
            columna+= int(caracter)

    if turno_fen == 'w':
        tablero_r[:, :, 12] = 1
    if 'K' in enroque_fen:
        tablero_r[:, :, 13] = 1
    if 'k' in enroque_fen:
        tablero_r[:, :, 14] = 1
    if 'Q' in enroque_fen:
        tablero_r[:, :, 15] = 1
    if 'q' in enroque_fen:
        tablero_r[:, :, 16] = 1
           

    if enpassant_fen != "-":
        tablero_r[8-int(enpassant_fen[1])][letra_numero[enpassant_fen[0]]][17] = 1


    for square in chess.SQUARES:
        row = 7 - (square // 8)
        col = square % 8  
        tablero_r[row, col, 18] = len(board.attackers(chess.WHITE, square))/10
        tablero_r[row, col, 19] = len(board.attackers(chess.BLACK, square))/10

    for move in board.legal_moves:
        to_square = move.to_square
        row = 7 - (to_square // 8)
        col = to_square % 8
        tablero_r[row][col][20] = 1

    if board.is_check():
        tablero_r[:, :, 21] = 1


    if turno_fen == 'b':
        tablero_r = np.flip(tablero_r[:, :, :22], axis=(0, 1)).copy()

    if verbose:       
        for capa in range(22):
             for fila in range(8):
                 for columna in range(8):
                     print(tablero_r[fila][columna][capa], end="")
                     print(" ",end="")
                 print(" ")
             print("\n")



    return tablero_r



# representación movimiento
def move_to_policy(uci_move,turn,verbose=False):
    move = np.zeros((8,8,2),dtype=np.float32)
    casilla_origen = uci_move[:2]
    casilla_destino = uci_move[2:4]

    casilla_origen_columna = letra_numero[casilla_origen[0]]
    casilla_origen_fila = casilla_origen[1]
    
    casilla_destino_columna = letra_numero[casilla_destino[0]]
    casilla_destino_fila = casilla_destino[1]


    casilla_origen_fila = 8 - int(casilla_origen_fila)

    casilla_destino_fila = 8 - int(casilla_destino_fila)


    move[casilla_origen_fila][casilla_origen_columna][0]=1
    move[casilla_destino_fila][casilla_destino_columna][1]=1

    if turn != chess.WHITE:
        move = np.flip(move[:, :, :2], axis=(0, 1)).copy()

    if verbose:
        for capa in range(2):
             for fila in range(8):
                 for columna in range(8):
                     print(move[fila][columna][capa], end="")
                     print(" ",end="")
                 print(" ")
             print("\n")
        
        
    return move

In [8]:
class CustomDataset(Dataset):
    def __init__(self, partidas_file, transform=None, target_transform=None):

        self.partidas_file = partidas_file
        self.transform = transform
        self.target_transform = target_transform
        self.posiciones = None
        self.keys = None
        self.archivo = None
        
        with h5py.File(partidas_file, 'r') as archivo:
 
            game_keys = np.array(list(archivo['posiciones'].keys()))
            array_posiciones = np.zeros(len(game_keys),dtype=np.uint32)
            positions = 0
            for i, game_key in enumerate(game_keys):
                positions += archivo['posiciones'][game_key].shape[0]
                array_posiciones[i]=  positions
           
        self.posiciones =  array_posiciones
        self.keys =  game_keys
        self.total_positions = self.posiciones[-1]
        self.half_positions = self.total_positions // 50# Calcular la mitad del dataset


    def __len__(self):
        return self.half_positions

    def __getitem__(self, idx):

        if self.archivo is None:
            self.archivo = h5py.File(self.partidas_file, 'r')
        
        game_idx, position_idx = self.binary_search_iterative(idx)
        
        grupo_posiciones = self.archivo['posiciones']
        grupo_movimientos = self.archivo['movimientos']
        grupo_evaluaciones = self.archivo['evaluaciones']

        key_posiciones = self.keys[game_idx]
        key_movimientos = key_posiciones.replace("board", "mov")
        key_evaluaciones = key_posiciones.replace("board", "eval")

        posicion = grupo_posiciones[key_posiciones][position_idx]
        movimiento = grupo_movimientos[key_movimientos][position_idx]
        evaluacion = grupo_evaluaciones[key_evaluaciones][position_idx]

        if self.transform:
            posicion = self.transform(posicion)
            movimiento = self.transform(movimiento)

        if self.target_transform:
            pass
            #evaluacion = self.target_transform(evaluacion)

        return posicion, movimiento, evaluacion


    def binary_search_iterative(self, x):
        
        left, right = 0, len(self.posiciones) - 1
        while left <= right:
            mid = (left + right) // 2
            if self.posiciones[mid] == x:
                indice = 0
                return mid+1,indice
            elif self.posiciones[mid] < x:
                left = mid + 1
            else:
                right = mid - 1
       
    
        if left!=0:
            indice = x-self.posiciones[left-1]
        else:
            indice = x
        return left,indice
        
partidas_file = 'dataset.h5'  # Reemplaza con la ruta a tu archivo h5
to_tensor = ToTensor()

# Crear el dataset
dataset = CustomDataset(partidas_file,transform=to_tensor)
        
# Define el tamaño de los conjuntos de entrenamiento y prueba
train_size = int(0.9 * len(dataset))  # 90% para entrenamiento
test_size = len(dataset) - train_size  # 10% para prueba

# Divide el dataset en entrenamiento y prueba
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

print(f"Tamaño del conjunto de entrenamiento: {len(train_dataset)}")
print(f"Tamaño del conjunto de prueba: {len(test_dataset)}")
train_loader = DataLoader(train_dataset, batch_size=256)
test_loader = DataLoader(test_dataset, shuffle=True,batch_size=256)

iterador =  iter(train_loader) 
positions,mov,eval = next(iterador)

print("Estructura del batch: ")
print(f"Posición batch shape: {positions.size()}")
print(f"Movimiento batch shape: {mov.size()}")
print(f"Evaluación batch shape: {eval.size()}")

Tamaño del conjunto de entrenamiento: 17555
Tamaño del conjunto de prueba: 1951
Estructura del batch: 
Posición batch shape: torch.Size([256, 22, 8, 8])
Movimiento batch shape: torch.Size([256, 2, 8, 8])
Evaluación batch shape: torch.Size([256])


In [4]:
class ResNet(nn.Module):
    def __init__(self, num_resBlocks, num_hidden):
        super().__init__()
        self.startBlock = nn.Sequential(
            nn.Conv2d(22, num_hidden, kernel_size=3, padding=1,bias=False),
            nn.BatchNorm2d(num_hidden),
            nn.ReLU()
        )
        
        self.backBone = nn.ModuleList(
            [ResBlock(num_hidden) for i in range(num_resBlocks)]
        )
        
        self.policyHead = nn.Sequential(
            nn.Conv2d(num_hidden, num_hidden, kernel_size=3,padding=1,bias=False),
            nn.BatchNorm2d(num_hidden),
            nn.ReLU(),
            nn.Conv2d(num_hidden, 2, kernel_size=3,padding=1),
        )

        self.valueHead = nn.Sequential(
            nn.Conv2d(num_hidden, 3, kernel_size=3,padding=1,bias=False),
            nn.BatchNorm2d(3),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3 * 8 * 8, 1),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.startBlock(x)
        for resBlock in self.backBone:
            x = resBlock(x)
        policy = self.policyHead(x)
        value =  self.valueHead(x)
        return policy,value
    
class ResBlock(nn.Module):
    def __init__(self, num_hidden):
        super().__init__()
        self.conv1 = nn.Conv2d(num_hidden, num_hidden, kernel_size=3, padding=1,bias=False)
        self.bn1 = nn.BatchNorm2d(num_hidden)
        self.conv2 = nn.Conv2d(num_hidden, num_hidden, kernel_size=3, padding=1,bias=False)
        self.bn2 = nn.BatchNorm2d(num_hidden)
        
    def forward(self, x):
        residual = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += residual
        x = F.relu(x)
        return x
    

In [5]:
def train(model, device, train_loader, optimizer):
    
    model.train()
    loss_v = 0

    for posicion, movimiento, evaluacion in train_loader:
    
        posicion, movimiento,evaluacion = posicion.to(device), movimiento.to(device), evaluacion.to(device)
        optimizer.zero_grad()
        output,value = model(posicion) 

        lossP = F.cross_entropy(output[:,0,:], movimiento[:,0,:])
        lossP1 = F.cross_entropy(output[:,1,:], movimiento[:,1,:])
        lossV = F.mse_loss(value.view(-1), evaluacion)

        loss = lossP+lossP1+lossV
        loss.backward()
        optimizer.step()
        loss_v += loss.item()
      

    loss_v /= len(train_loader)
    return loss_v


def test(model, device, test_loader):

    model.eval()
    test_loss = 0
    
    with torch.no_grad():
        for posicion, movimiento, evaluacion in test_loader:
            
            posicion, movimiento,evaluacion = posicion.to(device), movimiento.to(device), evaluacion.to(device)
            output,value = model(posicion) 

            lossP = F.cross_entropy(output[:,0,:], movimiento[:,0,:])
            lossP1 = F.cross_entropy(output[:,1,:], movimiento[:,1,:])
            lossV = F.mse_loss(value.view(-1), evaluacion)

            loss = lossP+lossP1+lossV
            test_loss +=  loss.item()
          
    test_loss /= len(test_loader)
    return test_loss

In [6]:
#torch.manual_seed(33)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

epochs = 10

modelSimple = ResNet(10, 256).to(device)

lr = 0.01

optimizer = optim.SGD(modelSimple.parameters(), lr=lr,weight_decay=0.001)


# Guardam el valor de peèrdua mig de cada iteració (època)
train_l = np.zeros((epochs))
test_l = np.zeros((epochs))

    #pbar = tqdm(range(1, epochs+1))

tiempo_inicial = time.time()  # Obtener el tiempo inicial

    # Bucle d'entrenament
for epoch in range(epochs):
    print(f"\n=== Epoch {epoch + 1}/{epochs} ===")
    
    train_l[epoch] = train(modelSimple, device, train_loader, optimizer)
    test_l[epoch]  = test(modelSimple, device, test_loader)
    
    print(f"Pérdida de entrenamiento: {train_l[epoch]}")
    print(f"Pérdida de validación : {test_l[epoch]}")

tiempo_final = time.time()  
    # Obtener el tiempo final
tiempo_transcurrido = tiempo_final - tiempo_inicial  # Calcular el tiempo transcurrido
        
print("Tiempo transcurrido:", tiempo_transcurrido, "segundos")


=== Epoch 1/10 ===
Pérdida de entrenamiento: 1.1278706745824951
Pérdida de validación : 0.8966252729296684

=== Epoch 2/10 ===
Pérdida de entrenamiento: 0.7211151429708453
Pérdida de validación : 0.6589106097817421

=== Epoch 3/10 ===
Pérdida de entrenamiento: 0.5051594782566678
Pérdida de validación : 0.5921506434679031

=== Epoch 4/10 ===
Pérdida de entrenamiento: 0.38405791702477826
Pérdida de validación : 0.521225344389677

=== Epoch 5/10 ===
Pérdida de entrenamiento: 0.3184693481611169
Pérdida de validación : 0.4998319074511528

=== Epoch 6/10 ===
Pérdida de entrenamiento: 0.27249957988227624
Pérdida de validación : 0.4816940873861313

=== Epoch 7/10 ===
Pérdida de entrenamiento: 0.23816967183265134
Pérdida de validación : 0.47409604489803314

=== Epoch 8/10 ===
Pérdida de entrenamiento: 0.20708697710348212
Pérdida de validación : 0.47348760440945625

=== Epoch 9/10 ===
Pérdida de entrenamiento: 0.18330738790657208
Pérdida de validación : 0.46843351796269417

=== Epoch 10/10 ===


In [9]:
torch.save(modelSimple.state_dict(), 'modelo_pesos.pth')

In [11]:
modelSimple.eval()

correct_posicion_inicial = 0
correct_posicion_final = 0
correct_eval = 0
total = 0

with torch.no_grad():
    for posicion, movimiento, evaluacion in test_loader:     
        posicion, movimiento, evaluacion = posicion.to(device), movimiento.to(device), evaluacion.to(device)
        output, value = modelSimple(posicion) 

        # Obtener los índices máximos para cada muestra en el batch
        _, indices_origen = torch.max(output[:, 0, :, :].view(output.size(0), -1), dim=1)
        _, indices_movimiento_origen = torch.max(movimiento[:, 0, :, :].view(movimiento.size(0), -1), dim=1)

        _, indices_final = torch.max(output[:, 1, :, :].view(output.size(0), -1), dim=1)
        _, indices_movimiento_final = torch.max(movimiento[:, 1, :, :].view(movimiento.size(0), -1), dim=1)

        # Contar las predicciones correctas
        correct_posicion_inicial += torch.sum(indices_origen == indices_movimiento_origen).item()
        correct_posicion_final += torch.sum(indices_final == indices_movimiento_final).item()

        # Redondear las predicciones del modelo a los valores enteros más cercanos (-1, 0, 1)
        rounded_value = torch.where(value >= 0.2, torch.tensor(1.0), torch.where(value <= -0.2, torch.tensor(-1.0), torch.tensor(0.0)))

        # Contar las predicciones de evaluación correctas
        correct_eval += torch.sum(rounded_value.squeeze() == evaluacion).item()

        total += posicion.size(0)  # Sumar el tamaño del batch
 
# Calcular la precisión total
accuracy_origen = correct_posicion_inicial / total
accuracy_final = correct_posicion_final / total  
accuracy_eval = correct_eval / total

print(f'Accuracy casilla inicial: {accuracy_origen}')
print(f'Accuracy casilla final: {accuracy_final}')
print(f'Accuracy evaluación: {accuracy_eval}')

Accuracy casilla inicial: 0.34495130702203997
Accuracy casilla final: 0.25627883136852897
Accuracy evaluación: 0.8206048180420298
