In [12]:
import pandas as pd
import numpy as np
import os
import re
import glob
import dask.dataframe as dd
import torch
import chess
from pathlib import Path
from tqdm import tqdm
import gc

In [2]:
def generate_full_uci_move_vocabulary() -> tuple[dict,dict]:

    """
    Generates a set of all the posible movements in a chess board of 64 squares, 
    create two dictionaries which will represent the board move in uci format and their respective idx value and viceversa
    
    Returns
    -------
        uci_to_idx : dict
                     All the possible uci moves in a chess board, uci format as keys and idx as values
        idx_to_uci : dict 
                     All the possible uci moves in a chess board, uci format as keys and idx as values
    
    """
    move_set = set()
    
    for from_sq in chess.SQUARES:
        for to_sq in chess.SQUARES:
            if from_sq == to_sq:
                continue

            move = chess.Move(from_sq, to_sq)
            move_set.add(move.uci())
            
            from_rank = chess.square_rank(from_sq) # Get the row in which the piece is coming from
            to_rank = chess.square_rank(to_sq) # Get the row in which will be moved the piece
            # if to_rank in [0, 7] and from_rank in [1,6]:  # posibles promociones
            if (from_rank == 1 and to_rank == 0) or (from_rank == 6 and to_rank ==7):
                
                for promo in [chess.QUEEN, chess.ROOK, chess.BISHOP, chess.KNIGHT]:
                    move_set.add(chess.Move(from_sq, to_sq, promotion=promo).uci())
                    
    move_list = sorted(move_set)
    uci_to_idx = {uci: idx for idx, uci in enumerate(move_list)}
    idx_to_uci = {idx: uci for uci, idx in uci_to_idx.items()}
    return uci_to_idx, idx_to_uci

def fen_to_tensor(fen:str) -> torch.Tensor:
    """
    Converts a FEN position into a torch tensor of shape (12,8,8),
    12 matrix of 8x8 positions, in which each type of piece eaither PNBRQK or pnbrqk,
    will ocupate a place in the matrix, each matrix for each set of piece representation.

    Parameters
    ----------
    fen : str
          The notation FEN to convert into numerical values
    Returns
    -------
    board_tensor : torch.Tensor
                   The representation of FEN notation in 12 matrix of 8x8

    """

    board = chess.Board(fen)
    
    piece_to_index = {piece:idx for idx,piece in enumerate('PNBRQKpnbrqk')} # represents the piece and index of each value of the str

    #TODO: In version 2; add extra canals to indicate if there is castling available(4) canals,passant square(1), halfmove clock(1)
    
    board_tensor = torch.zeros((12,8,8),dtype=torch.float32)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            idx = piece_to_index[piece.symbol()]
            row = 7 - (square // 8)
            col = square %8
            board_tensor[idx,row,col] = 1.0
    return board_tensor


def get_legal_moves_vocab(fen:str) -> tuple[dict[str,int],dict[int,str]]:
    """
    Generates a set of legal posible moves for a given position 

    IMPORTANT ---> All the dict generated are LOCAL and could not match with the global dict --> generate_full_uci_move_vocabulary()

    Parameters
    ----------
        fen: FEN notation of the current position
    Returns
    -------
        uci_to_idx: Dict {uci_move : idx}
        idc_to_uci: Dict {idx : uci_move}
    """

    board = chess.Board(fen)
    legal_moves = list(board.legal_moves)
    
    legal_moves_sorted = sorted(legal_moves, key=lambda m: m.uci())

    uci_to_idx = {move.uci():  idx for idx, move in enumerate(legal_moves_sorted)}
    idx_to_uci = {idx: move.uci() for idx,move in enumerate(legal_moves_sorted)}
    return uci_to_idx, idx_to_uci

 ## POSIBLEMENTE DESCARTADO, MEJORA ALTERNATIVA CON FUNCION  --> get_legal_moves_vocab enfoque "SPARSE"
# def get_legal_mask(board: chess.Board, uci_to_index: dict) -> torch.Tensor:
#     mask = torch.zeros(len(uci_to_index), dtype=torch.float32)
#     for move in board.legal_moves:
#         uci = move.uci()
#         if uci in uci_to_index:
#             mask[uci_to_index[uci]] = 1.0
#     return mask  # Shape: (n_moves,)

    


    move_list = sorted(move_set)
    uci_to_index = {uci: idx for idx, uci in enumerate(move_list)}
    index_to_uci = {idx: uci for uci, idx in uci_to_index.items()}
    return uci_to_index, index_to_uci
# Globales cargados una vez al inicio

def move_to_index(uci_move: str) -> int:
    return uci_to_index.get(uci_move, -1)  # -1 si no está

def index_to_move(idx: int) -> str:
    return index_to_uci.get(idx, "0000")  # dummy por si acaso

class ChessSequenceDataset(torch.utils.data.Dataset):


    def __init__(self,*,uci_to_idx=None,df=None,games=None):
        """
        Initializes the ChessSequenceDataset with either a dataframe of game moves or a list of game tensors.
    
        Parameters
        ----------
            uci_to_idx:      dict {uci_move : idx}
                             Dictionary mapping UCI (Universal Chess Interface) moves to a numerical value. The dictionary represents all possible move classes.
                             
            df:    pd.DataFrame
                             Pandas dataframe which will have a variety of columns, game_id,pyl,fen,and move_uci are relevant for this constructor.
                             
            games: list(list(Torch.tensor))
                             List of half moves represented as Torch.tensors which are the snapshots of a given FEN(Forsyth-Edwards Notation) nested in other list with full games.
                             Specifications:
                             n_games: number of games
                             n_moves_per_game: moves per game
                             dim: dimensions of each move's tensor 8*8*12
    
                             The total dimension: n_games*n_moves_per_game*dim
                    
        Notes
        -----
        Either 'df' or 'games' should be provided to create a dataset instance for training an LSTM model.
        
        """
        if games is not None:
            self.games = games
            self.uci_to_idx = uci_to_idx
        elif df is not None and uci_to_idx is not None:
            self.games = []
            self.uci_to_idx = uci_to_idx
            grouped = df.groupby('game_id')
    
            for game_id, group in grouped:
                group_sorted = group.sort_values(by='pyl',ascending=True)#group.sort_values(by='pyl',ascending=True)
                sequence = []
    
                for _,row in group_sorted.iterrows():
                    fen = row['fen']
                    uci = row['move_uci']
    
                    move_idx = uci_to_idx.get(uci,-1)
                    if move_idx ==-1:
                        continue
                    try:
                        fen_tensor = fen_to_tensor(fen)
                    except Exception as e:
                        print(f'Error in FEN {fen} --->{e}')
                        continue
                    sequence.append((fen_tensor,move_idx))
                if len(sequence)>0:
                    self.games.append(sequence)
    @classmethod
    def from_multiple_files(cls,file_list,uci_to_idx,stop_idx):
        """
        Initializes the ChessSequenceDataset with a file list of games previously processed, within a range of a given index

        Parameters
        ----------
            file_list: list
                list of paths of files which contain the processed games
            uci_to_idx: dict {uci_move: idx}
                Dictionary mapping UCI (Universal Chess Interface) moves to a numerical value. The dictionary represents all possible move classes.
            stop_idx: int
                Determine if the loading should stop in a certain number of processed dataset tho save resources.
                
        """
        
        all_games = []
        for _,file in enumerate(file_list):
            if _ > stop_idx:
                break
            games = torch.load(file)
            all_games.extend(games)

        return cls(games=all_games,uci_to_idx=uci_to_index)
            
    def __len__(self):
        return len(self.games)
    def __getitem__(self,idx):
        return self.games[idx]
        
def identity_collate(batch):
    return batch[0]  # simplemente devuelve la secuencia tal cual

class ChessLSTMPolicyNet(torch.nn.Module):
    def __init__(self,input_channels=12,lstm_hidden_size=512,lstm_layers=1,output_dim=4544):
        """
        Initialize the ChessLSTMPolicyNet model, inheriting class methods of module torch.nn.Module

        Parameters
        ----------
            input_channels: int(12)
                Channels of the possible pieces to play in chess, 6 for white and 6 for black
            lstm_hidden_size: int(512)
                Quantity of LSTM neurons which will be the model trained on
            lstm_layers: int(1)
                Quantity of layers LSTM
            output_dim: int(4544)
                Quantity of possible move classes
        """
        

        
        super().__init__()

        
        self.conv = torch.nn.Sequential(
            torch.nn.Conv2d(input_channels,64,kernel_size=3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(64,128,kernel_size=3,padding=1),
            torch.nn.ReLU()
        )
        self.flattened_size = 128*8*8
        self.lstm = torch.nn.LSTM(
            input_size= self.flattened_size,
            hidden_size=lstm_hidden_size,
            num_layers=lstm_layers,
            batch_first=True
        )


        self.fc = torch.nn.Linear(lstm_hidden_size,output_dim)


    def forward(self,x):
            B, T, C, H, W = x.shape
            x = x.view(B * T, C, H, W)           # (B*T, C, 8, 8)
            x = self.conv(x)                     # (B*T, 128, 8, 8)
            x = x.view(B, T, -1)                 # (B, T, 8192)

            lstm_out, _ = self.lstm(x)           # (B, T, hidden)
            logits = self.fc(lstm_out)           # (B, T, output_dim)
            return logits
def train_lstm(model, dataloader, criterion, optimizer,device, epochs=5,start_epoch=0, checkpoint_path='checkpoint.pth'):
    """
    Function to train the model for a given dataset, saving each epoch result to segregate the traning into smaller blocks of processing

    Parameters
    ----------
        model: ChessLSTMPolicyNet
            Architecture of the model that define the flow of how the dataloader will be processed for machine learning.
        dataloader: torch.utils.data.DataLoader
            Iterator of lenght of n_games/batch_size, normally batch_size will be equal to 1, considering that is needed process just complete sequences of a game 
        criterion: torch.nn.CrossEntropyLoss()
            Loss function to calculate the loss 
        optimizer: torch.optim.Adam()
            Opimizer which will help us to optimze the weights in the neural network based on the results of the loss function
        device:
            Device to perform the training, usually a GPU
        epochs:
            Number of epochs to iterate the training
        start_epoch:
            Index from where will be start the training
        checkpoint_path: 
            File with the saved training epochs """

    
    model.train()

    for epoch in range(start_epoch,start_epoch+epochs):
        total_loss = 0
        total_correct = 0
        total_moves = 0

        for sequence in dataloader:
            # batch: list of 1 element (sequence of (fen_tensor, move_idx))
            if not all(isinstance(x,tuple) and isinstance(x[0],torch.Tensor) for x in sequence):
                print('Invalid sequence detected and omitted...')
                continue
            try: 
                
                inputs = torch.stack([x[0] for x in sequence])  # (T, C, 8, 8)
                targets = torch.tensor([x[1] for x in sequence],dtype=torch.long)  # (T,)

            # Reshape to (B, T, C, 8, 8)
                inputs = inputs.unsqueeze(0).to(device)
                targets = targets.unsqueeze(0).to(device)

                optimizer.zero_grad()
                outputs = model(inputs)  # (B, T, output_dim)

            # Aplanar para CrossEntropy
                logits = outputs.view(-1, outputs.size(-1))     # (T, output_dim)
                target_flat = targets.view(-1)                  # (T,)

                loss = criterion(logits, target_flat)
                loss.backward()
                optimizer.step()
    
                total_loss += loss.item()
                preds = torch.argmax(logits, dim=1)
                total_correct += (preds == target_flat).sum().item()
                total_moves += target_flat.size(0)
            except Exception as e:
                print(f'Something went wrong error {e}, skipping sequence')
                continue

        acc = total_correct / total_moves
        print(f"Epoch {epoch+1}: Loss = {total_loss:.4f}, Accuracy = {acc:.4f}")
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': total_loss
        }, checkpoint_path)


In [3]:
def save_dataset_chunks(df, uci_to_idx,chunk_size=50_000, output_prefix='chunk_sequence'):
    game_ids = df['game_id'].unique()
    total_games = len(game_ids)
    num_chunks = (total_games + chunk_size -1) // chunk_size

    for i in range(num_chunks):
        save_path = Path.cwd().parent /'data'/'torch_datasets'/f'{output_prefix}_{i:04d}.pt'
        save_path.parent.mkdir(parents=True,exist_ok=True)
        print(f'Veryfing if chunk number: {i:04d} exists...')

        if os.path.exists(save_path):
            print(f'Chunk {i:04d} exist, skipping...')
            continue
        else:
            start = i*chunk_size
            end = min(start + chunk_size,total_games)
            chunks_ids = game_ids[start:end]
            chunk_df = df.loc[df['game_id'].isin(chunks_ids)]
    
            print(f'Processing chunk number {i}/{num_chunks} with {len(chunks_ids)} games...')
            dataset = ChessSequenceDataset(chunk_df,uci_to_idx)
    
            torch.save(dataset.games,save_path)
            print(f'[💾] Saved: {save_path} with {len(dataset)} valid games')
    

        
    

In [4]:
torch_datasets = glob.glob(str(Path.cwd().parent / 'data'/'torch_datasets'/'*.pt'))

In [16]:
parquet_datasets = glob.glob(str(Path.cwd().parent / 'data' / 'processed' /'*.parquet'))

In [18]:
df_sample = pd.read_parquet(parquet_datasets[0])

In [19]:
uci_to_idx,idx_to_uci = generate_full_uci_move_vocabulary()

In [23]:
unique_games = df_sample['game_id'].unique()

In [25]:
sample_games =np.random.choice(unique_games,size=5000,replace=False)

In [27]:
df_sampled = df_sample.loc[df_sample['game_id'].isin(sample_games)].copy()

In [30]:
if uci_to_idx:
    dataset = ChessSequenceDataset(uci_to_idx=uci_to_idx,df=df_sampled)

In [73]:
t = 0
for i in range(0,10):
    out_box = 1024*(1/2)**t
    inner_box = -(1024*(1/2)**t)+1024
    t +=1

    print(f'Inner Box {inner_box:<03} Outter Box {out_box:<03}')

Inner Box 0.0 Outter Box 1024.0
Inner Box 512.0 Outter Box 512.0
Inner Box 768.0 Outter Box 256.0
Inner Box 896.0 Outter Box 128.0
Inner Box 960.0 Outter Box 64.0
Inner Box 992.0 Outter Box 32.0
Inner Box 1008.0 Outter Box 16.0
Inner Box 1016.0 Outter Box 8.0
Inner Box 1020.0 Outter Box 4.0
Inner Box 1022.0 Outter Box 2.0


In [76]:
t = 0
for i in range(0,10):
    out_box = 1024*(1/2)**t
    inner_box = 1024 *((1-(1/2)**i)/(1-(1/2)))
    t +=1

    print(f'Inner Box {inner_box:<03} Outter Box {out_box:<03}')

Inner Box 0.0 Outter Box 1024.0
Inner Box 1024.0 Outter Box 512.0
Inner Box 1536.0 Outter Box 256.0
Inner Box 1792.0 Outter Box 128.0
Inner Box 1920.0 Outter Box 64.0
Inner Box 1984.0 Outter Box 32.0
Inner Box 2016.0 Outter Box 16.0
Inner Box 2032.0 Outter Box 8.0
Inner Box 2040.0 Outter Box 4.0
Inner Box 2044.0 Outter Box 2.0
