# Chess-NN

In [1]:
import os
import io
import chess
import chess.pgn
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt

In [2]:
import numpy as np

In [3]:
from tqdm import tqdm

In [4]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [5]:
def count_games(pgn_file):
    count = 0
    
    with open(pgn_file) as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break  # Fin del archivo
            count += 1
    
    return count

In [6]:
def extract_fen_positions(pgn_file):
    fen_positions = []
    total_games = count_games(pgn_file)
    
    with open(pgn_file) as pgn:
        for _ in tqdm(range(total_games), desc="Processing games"):
            game = chess.pgn.read_game(pgn)
            if game is None:
                break  # Fin del archivo
            
            board = game.board()
            for move in game.mainline_moves():
                board.push(move)
                fen_positions.append(board.fen())
    
    return fen_positions


In [7]:
# Funciones para convertir el el tablero de ajedrez a tensor y visceversa
def board_to_tensor(board):
    pieces = ['P', 'N', 'B', 'R', 'Q', 'K', 'p', 'n', 'b', 'r', 'q', 'k']
    tensor = torch.zeros(12, 8, 8)
    for i, piece in enumerate(pieces):
        for pos in board.pieces(chess.Piece.from_symbol(piece).piece_type, chess.WHITE if piece.isupper() else chess.BLACK):
            tensor[i, pos // 8, pos % 8] = 1
    return tensor

def tensor_to_move(tensor):
    move_index = tensor.argmax().item()
    from_square = move_index // 64
    to_square = move_index % 64
    return chess.Move(from_square, to_square)

In [8]:
class ChessDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.fen_files = os.listdir(self.root_dir)

    def __len__(self):
        return len(self.fen_files)

    def __getitem__(self, idx):
        fen_path = os.path.join(self.root_dir, self.fen_files[idx])
        board = chess.Board(fen_path[:-4])  # Asumiendo que el nombre del archivo es el FEN
        image = Image.open(fen_path)
        
        if self.transform:
            image = self.transform(image)

        return board, image

In [9]:
class ChessPGNDataset(Dataset):
    def __init__(self, fen_positions, transform=None):
        self.fen_positions = fen_positions
        self.transform = transform

    def __len__(self):
        return len(self.fen_positions)

    def __getitem__(self, idx):
        fen = self.fen_positions[idx]
        board = chess.Board(fen)
        
        # Aquí puedes aplicar la transformación si es necesario, por ejemplo, si estás utilizando imágenes en lugar de FEN
        # Para simplificar, asumiremos que estás utilizando notaciones FEN directamente en el modelo
        if self.transform:
            board = self.transform(board)
        # Convierte el objeto `chess.Board` en un tensor
        board_tensor = board_to_tensor(board)

        return board_tensor

In [10]:
class ChessDataset(Dataset):
    def __init__(self, fen_list, uci_move_list, transform=None):
        self.fen_list = fen_list
        self.uci_move_list = uci_move_list
        self.transform = transform

    def __len__(self):
        return len(self.fen_list)

    def __getitem__(self, idx):
        fen = self.fen_list[idx]
        board = chess.Board(fen)
        move = self.uci_move_list[idx]
        
        if self.transform:
            # Aplicar transformaciones en el tablero si es necesario
            # Aquí, asumimos que la transformación se realiza en el tablero, no en la imagen
            board = self.transform(board)

        return board, move


In [11]:
def move_to_onehot(move, size):
    """Convierte un movimiento a un vector one-hot de tamaño size."""
    onehot = np.zeros(size)
    onehot[move] = 1
    return onehot

class ChessDataset(Dataset):
    def __init__(self, fen_list, uci_move_list, size, transform=None):
        self.fen_list = fen_list
        self.uci_move_list = uci_move_list
        self.size = size
        self.transform = transform

    def __len__(self):
        return len(self.fen_list)

    def __getitem__(self, idx):
        fen = self.fen_list[idx]
        board = chess.Board(fen)
        move = self.uci_move_list[idx]
        
        if self.transform:
            board = self.transform(board)

        board_tensor = board_to_tensor(board)
        move_tensor = torch.tensor(move_to_onehot(move, self.size))
        
        return board_tensor, move_tensor

In [12]:
class ChessDataset(Dataset):
    def __init__(self, fen_list, move_list, transform=None):
        self.fen_list = fen_list
        self.move_list = move_list
        self.transform = transform
        
    def __len__(self):
        return len(self.fen_list)
    
    def __getitem__(self, idx):
        fen = self.fen_list[idx]
        board = chess.Board().set_board_fen(fen)
        move = self.uci_move_list[idx]

        if self.transform:
            board = self.transform(board)

        board_tensor = board_to_tensor(board)
        move_tensor = torch.tensor(move_to_onehot(move, size=NUM_POSSIBLE_MOVES))

        return board_tensor, move_tensor


class ToTensor:
    def __call__(self, board):
        return board.pieces

In [13]:
class ChessAI(nn.Module):
    def __init__(self):
        super(ChessAI, self).__init__()
        self.conv1 = nn.Conv2d(12, 64, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU(inplace=True)
        self.fc1 = nn.Linear(64 * 8 * 8, 4096)
        self.relu2 = nn.ReLU(inplace=True)
        self.fc2 = nn.Linear(4096, 4672)  # Modificar la cantidad de salidas


    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = x.view(-1, 64 * 8 * 8)
        x = self.relu2(self.fc1(x))
        x = self.fc2(x)
        x = F.softmax(x, dim=1)  # Agregar capa Softmax
        return x


In [14]:
# pgn_file = "C:/Users/mated/Documents/GitHub/CHESS_DATA/lichess_db_standard_rated_2017-03.pgn"
pgn_file = "C:/Users/mated/Documents/GitHub/CHESS_DATA/lichess_db_standard_rated_2013-01.pgn"

In [None]:
dataset = ChessDataset("C:/Users/mated/Documents/GitHub/CHESS_DATA/lichess_db_standard_rated_2013-01.pgn")
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True)


In [None]:
class ChessDataset(Dataset):
    def __init__(self, pgn_file):
        self.games = []
        with open(pgn_file) as f:
            game = []
            for line in f:
                if line.startswith('[Event'):
                    if game:
                        self.games.append(game)
                    game = []
                if line.startswith('1.'):
                    moves = line.strip().split()[1:]
                    game.append(moves)
            if game:
                self.games.append(game)

        self.data = []
        for game in self.games:
            board = chess.Board()
            fen_moves = []
            for moves in game:
                for move in moves:
                    fen_moves.append((board.fen(), move))
                    board.push_uci(move)
            self.data.append(fen_moves)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]


In [15]:
# fen_positions = extract_fen_positions(pgn_file)

In [16]:
from sklearn.model_selection import train_test_split

# # Divide las posiciones FEN en datos de entrenamiento y prueba
# train_fen_positions, test_fen_positions = train_test_split(fen_positions, test_size=0.2, random_state=42)

In [17]:
# # Crea los conjuntos de datos
# train_dataset = ChessPGNDataset(train_fen_positions)
# test_dataset = ChessPGNDataset(test_fen_positions)

# # Crea los DataLoader
# train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [18]:
def extract_fen_and_moves_from_pgn(pgn_file):
    fen_list = []
    uci_move_list = []
    total_games = count_games(pgn_file)
    
    with open(pgn_file) as pgn:
        for _ in tqdm(range(total_games), desc="Processing games"):
            game = chess.pgn.read_game(pgn)
            if game is None:
                break  # Fin del archivo
            
            board = game.board()
            for move in game.mainline_moves():
                board.push(move)
                fen_list.append(board.fen())
                uci_move_list.append(move.uci())
    
    return fen_list, uci_move_list


In [19]:
# Cargar las posiciones FEN y los movimientos UCI a partir del archivo PGN
fen_list, uci_move_list = extract_fen_and_moves_from_pgn(pgn_file)

Processing games: 100%|██████████| 121332/121332 [13:30<00:00, 149.68it/s]


In [20]:
batch_size = 32

In [21]:
transform = ToTensor()

In [22]:
# Dividir las listas de FEN y movimientos UCI en conjuntos de entrenamiento y prueba
# train_fen_positions, test_fen_positions = train_test_split(fen_list, test_size=0.2, random_state=42)
# train_uci_move_list, test_uci_move_list = train_test_split(uci_move_list, test_size=0.2, random_state=42)
train_fen_list, test_fen_list, train_uci_move_list, test_uci_move_list = train_test_split(fen_list, uci_move_list, test_size=0.2, random_state=42)

In [23]:
# Crear conjuntos de datos y DataLoaders para entrenamiento y prueba
train_dataset = ChessDataset(train_fen_list, train_uci_move_list, transform=transform)
test_dataset = ChessDataset(test_fen_list, test_uci_move_list, transform=transform)

In [24]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

In [25]:
for i, (fen, uci_move)  in enumerate(test_dataloader):
    print(f'Los tamaños de los elementos son: fen={fen.shape}, uni_move={uci_move.shape}')
    if i > 2:
        break

ValueError: expected position part of fen, got multiple parts: '2q2rk1/1n2bpp1/2pp3p/1p2p2n/1P2P3/2PP1N1P/1BBQ1PPK/R7 b - - 1 21'

In [None]:
# # Creamos conjuntos de datos: dataloaders
# transform = transforms.Compose([transforms.Resize((8, 8)), transforms.ToTensor()])

# train_dataset = ChessDataset("train", transform=transform)
# test_dataset = ChessDataset("test", transform=transform)

# train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
# test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True, num_workers=2)


In [None]:
import torchmetrics

In [None]:
model = ChessAI()
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# metrics = [nn.MSELoss(), torchmetrics.Accuracy(), torchmetrics.F1(num_clases=2)]#(num_classes=4672)]
from sklearn.metrics import mean_squared_error, mean_absolute_error

# metrics = [
#     mean_squared_error,
#     mean_absolute_error,
#     nn.CrossEntropyLoss()
#     # torchmetrics.Accuracy(num_classes=4672),
#     # torchmetrics.F1(num_classes=4672, average='macro'),
# ]

metrics = {
    'mse': nn.MSELoss(),
    'mse': nn.MSELoss(),
    'mae': nn.L1Loss()
    # 'accuracy': torchmetrics.Accuracy(num_classes=4672),
    # 'f1_score': torchmetrics.F1(num_classes=4672, average='macro')
}


In [None]:
from torchsummary import summary

summary(model, input_size=(12, 8, 8))

In [None]:
# num_epochs = 50
# train_losses = []
# test_losses = []

# train_metrics = {key: [] for key in metrics.keys()}
# test_metrics = {key: [] for key in metrics.keys()}

# for epoch in range(num_epochs):
#     model.train()
#     train_loss = 0
#     train_metric_vals = {key: 0 for key in metrics.keys()}
#     for i, (board, image) in enumerate(train_dataloader):
#         optimizer.zero_grad()
        
#         input_tensor = board_to_tensor(board).unsqueeze(0).to(device)
#         output_tensor = model(input_tensor)
#         target_tensor = image.view(1, -1).to(device)

#         loss = criterion(output_tensor, target_tensor)
#         train_loss += loss.item()

#         for metric_name, metric_fn in metrics.items():
#             metric_val = metric_fn(output_tensor, target_tensor)
#             train_metric_vals[metric_name] += metric_val.item()

#         loss.backward()
#         optimizer.step()
        
#     train_loss /= len(train_dataloader)
#     train_losses.append(train_loss)
#     for metric_name in metrics.keys():
#         train_metric_vals[metric_name] /= len(train_dataloader)
#         train_metrics[metric_name].append(train_metric_vals[metric_name])

#     model.eval()
#     test_loss = 0
#     test_metric_vals = {key: 0 for key in metrics.keys()}
#     for i, (board, image) in enumerate(test_dataloader):
#         input_tensor = board_to_tensor(board).unsqueeze(0).to(device)
#         output_tensor = model(input_tensor)
#         target_tensor = image.view(1, -1).to(device)

#         loss = criterion(output_tensor, target_tensor)
#         test_loss += loss.item()

#         for metric_name, metric_fn in metrics.items():
#             metric_val = metric_fn(output_tensor, target_tensor)
#             test_metric_vals[metric_name] += metric_val.item()

#     test_loss /= len(test_dataloader)
#     test_losses.append(test_loss)
#     for metric_name in metrics.keys():
#         test_metric_vals[metric_name] /= len(test_dataloader)
#         test_metrics[metric_name].append(test_metric_vals[metric_name])

#     print(f"Epoch: {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
#     for metric_name in metrics.keys():
#         print(f"    {metric_name.capitalize()}: Train {train_metric_vals[metric_name]:.4f}, Test {test_metric_vals[metric_name]:.4f}")


In [None]:
num_epochs = 50
train_losses = []
test_losses = []

train_metrics = {key: [] for key in metrics.keys()}
test_metrics = {key: [] for key in metrics.keys()}

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    train_metric_vals = {key: 0 for key in metrics.keys()}
    for i, (board, move) in enumerate(train_dataloader):
        optimizer.zero_grad()

        input_tensor = board_to_tensor(board).to(device)
        output_tensor = model(input_tensor)
        target_tensor = torch.tensor(move).to(device)

        loss = criterion(output_tensor, target_tensor)
        train_loss += loss.item()

        for metric_name, metric_fn in metrics.items():
            if metric_name == 'mse':
                metric_val = metric_fn(output_tensor, target_tensor.float())
            else:
                metric_val = metric_fn(output_tensor.argmax(dim=1), target_tensor)
            train_metric_vals[metric_name] += metric_val.item()

        loss.backward()
        optimizer.step()

    train_loss /= len(train_dataloader)
    train_losses.append(train_loss)
    for metric_name in metrics.keys():
        train_metric_vals[metric_name] /= len(train_dataloader)
        train_metrics[metric_name].append(train_metric_vals[metric_name])

    model.eval()
    test_loss = 0
    test_metric_vals = {key: 0 for key in metrics.keys()}
    for i, (board, move) in enumerate(test_dataloader):
        input_tensor = board_to_tensor(board).to(device)
        output_tensor = model(input_tensor)
        target_tensor = torch.tensor(move).to(device)

        loss = criterion(output_tensor, target_tensor)
        test_loss += loss.item()

        for metric_name, metric_fn in metrics.items():
            if metric_name == 'mse':
                metric_val = metric_fn(output_tensor, target_tensor.float())
            else:
                metric_val = metric_fn(output_tensor.argmax(dim=1), target_tensor)
            test_metric_vals[metric_name] += metric_val.item()

    test_loss /= len(test_dataloader)
    test_losses.append(test_loss)
    for metric_name in metrics.keys():
        test_metric_vals[metric_name] /= len(test_dataloader)
        test_metrics[metric_name].append(test_metric_vals[metric_name])

    print(f"Epoch: {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
    for metric_name in metrics.keys():
        print(f"    {metric_name.capitalize()}: Train {train_metric_vals[metric_name]:.4f}, Test {test_metric_vals[metric_name]:.4f}")


In [None]:
plt.plot(train_losses, label="Train Loss")
plt.plot(test_losses, label="Test Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
torch.save(model.state_dict(), "chess_ai_model.pth")

In [None]:
import matplotlib.pyplot as plt

# Graficar función de pérdida
plt.figure(figsize=(12,8))
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Testing Loss over Time')
plt.legend()
plt.show()

# Graficar métricas
for metric_name, metric_values in metric_dict.items():
    plt.figure(figsize=(8,5))
    plt.plot(metric_values, label=metric_name)
    plt.xlabel('Epoch')
    plt.ylabel(metric_name)
    plt.title(f'{metric_name} over Time')
    plt.legend()
    plt.show()


In [None]:
# Función de pérdida
plt.figure(figsize=(10,6))
plt.plot(train_losses_ce, label='Train CE')
plt.plot(test_losses_ce, label='Test CE')
plt.plot(train_losses_mse, label='Train MSE')
plt.plot(test_losses_mse, label='Test MSE')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Function Comparison')
plt.legend()
plt.show()

# Métricas
plt.figure(figsize=(10,6))
plt.plot(train_accuracies_ce, label='Train Accuracy CE')
plt.plot(test_accuracies_ce, label='Test Accuracy CE')
plt.plot(train_accuracies_mse, label='Train Accuracy MSE')
plt.plot(test_accuracies_mse, label='Test Accuracy MSE')
plt.plot(train_f1_scores_ce, label='Train F1-Score CE')
plt.plot(test_f1_scores_ce, label='Test F1-Score CE')
plt.plot(train_f1_scores_mse, label='Train F1-Score MSE')
plt.plot(test_f1_scores_mse, label='Test F1-Score MSE')
plt.xlabel('Epoch')
plt.ylabel('Metric Value')
plt.title('Metric Comparison')
plt.legend()
plt.show()

In [None]:
import pandas as pd

metrics_dict = {'train_loss': train_losses,
                'train_mse': train_mses,
                'train_acc': train_accs,
                'train_f1': train_f1s,
                'test_loss': test_losses,
                'test_mse': test_mses,
                'test_acc': test_accs,
                'test_f1': test_f1s}

metrics_df = pd.DataFrame(metrics_dict)

In [None]:
import matplotlib.pyplot as plt

fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(10, 8))

metrics_df[['train_loss', 'test_loss']].plot(ax=axs[0, 0], title='Loss')
metrics_df[['train_mse', 'test_mse']].plot(ax=axs[0, 1], title='MSE')
metrics_df[['train_acc', 'test_acc']].plot(ax=axs[1, 0], title='Accuracy')
metrics_df[['train_f1', 'test_f1']].plot(ax=axs[1, 1], title='F1 Score')

plt.tight_layout()
plt.show()

In [None]:
# Gráfico de comparación de pérdidas y métricas por época

# Definir los datos
epochs = list(range(1, num_epochs+1))
train_loss_values = [train_metrics_dict['loss'][i] for i in range(num_epochs)]
test_loss_values = [test_metrics_dict['loss'][i] for i in range(num_epochs)]
mse_values = [test_metrics_dict['mse'][i] for i in range(num_epochs)]
accuracy_values = [test_metrics_dict['accuracy'][i] for i in range(num_epochs)]
f1score_values = [test_metrics_dict['f1score'][i] for i in range(num_epochs)]

# Crear la figura y los ejes
fig, ax1 = plt.subplots(figsize=(10,6))

# Plotear las pérdidas
color = 'tab:red'
ax1.set_xlabel('Épocas')
ax1.set_ylabel('Pérdida', color=color)
ax1.plot(epochs, train_loss_values, color=color, linestyle='--', label='Train Loss')
ax1.plot(epochs, test_loss_values, color=color, label='Test Loss')
ax1.tick_params(axis='y', labelcolor=color)

# Crear los ejes secundarios
ax2 = ax1.twinx()

# Plotear las métricas
color = 'tab:blue'
ax2.set_ylabel('Métricas', color=color)
ax2.plot(epochs, mse_values, color=color, linestyle=':', label='MSE')
ax2.plot(epochs, accuracy_values, color=color, linestyle='-.', label='Accuracy')
ax2.plot(epochs, f1score_values, color=color, linestyle='--', label='F1-Score')
ax2.tick_params(axis='y', labelcolor=color)

# Añadir leyendas y título
plt.legend()
plt.title('Comparación de pérdidas y métricas por época')

# Mostrar la gráfica
plt.show()

In [None]:
# Gráfico de comparación de pérdidas de entrenamiento y prueba

# Definir los datos
epochs = list(range(1, num_epochs+1))
train_loss_values = [train_metrics_dict['loss'][i] for i in range(num_epochs)]
test_loss_values = [test_metrics_dict['loss'][i] for i in range(num_epochs)]

# Crear la figura y los ejes
fig, ax = plt.subplots(figsize=(10,6))

# Plotear las pérdidas
ax.plot(epochs, train_loss_values, label='Train Loss')
ax.plot(epochs, test_loss_values, label='Test Loss')

# Añadir leyendas y título
plt.legend()
plt.title('Comparación de pérdidas de entrenamiento y prueba por época')

# Mostrar la gráfica
plt.show()


In [None]:
# Gráfico de métricas de prueba por época

# Definir los datos
epochs = list(range(1, num_epochs+1))
mse_values = [test_metrics_dict['mse'][i] for i in range(num_epochs)]
accuracy_values = [test_metrics_dict['accuracy'][i] for i in range(num_epochs)]
f1score_values = [test_metrics_dict['f1score'][i] for i in range(num_epochs)]

# Crear la figura y los ejes
fig, ax = plt.subplots(figsize=(10,6))

# Plotear las métricas
for i, (board, image) in enumerate(train_dataloader):
    optimizer.zero_grad()
        
    input_tensor = board_to_tensor(board).unsqueeze(0).to(device)
    output_tensor = model(input_tensor)
    target_tensor = image.view(1, -1).to(device)

    loss = criterion(output_tensor, target_tensor)
    loss.backward()
    optimizer.step()
        
    train_loss += loss.item()
        
    # Calculate accuracy
    _, predicted = torch.max(output_tensor.data, 1)
    total += target_tensor.size(1)
    correct += (predicted == target_tensor).sum().item()

train_accuracy = correct / total
train_accuracies.append(train_accuracy)

De manera similar, podemos calcular la precisión en el conjunto de prueba al final de cada época:

In [None]:
model.eval()
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
    for i, (board, image) in enumerate(test_dataloader):
        input_tensor = board_to_tensor(board).unsqueeze(0).to(device)
        output_tensor = model(input_tensor)
        target_tensor = image.view(1, -1).to(device)

        loss = criterion(output_tensor, target_tensor)
        test_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(output_tensor.data, 1)
        total += target_tensor.size(1)
        correct += (predicted == target_tensor).sum().item()

test_accuracy = correct / total
test_accuracies.append(test_accuracy)

In [None]:
plt.plot(train_accuracies, label="Train Accuracy")
plt.plot(test_accuracies, label="Test Accuracy")
plt.legend()
plt.title("Accuracy by Epoch")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.show()


In [None]:
board = chess.Board("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1")
input_tensor = board_to_tensor(board).unsqueeze(0).to(device)
output_tensor = model(input_tensor)
image = tensor_to_image(output_tensor.squeeze().cpu())

fig, ax = plt.subplots(1, 2)
ax[0].imshow(board)
ax[1].imshow(image)
ax[0].axis("off")
ax[1].axis("off")
plt.show()
