In [29]:
import polars as pl
import chess
import torch
import torch.nn as nn
import numpy as np
import json

In [30]:
# print all rows and columns of a Polars DataFrame
pl.Config.set_tbl_rows(100)
pl.Config.set_tbl_cols(100)

polars.config.Config

In [31]:
raw_data=pl.read_csv("data/raw/lichess_db_puzzle.csv")
raw_data.describe

<bound method DataFrame.describe of shape: (5_682_618, 10)
┌─────────┬─────────┬─────────┬────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐
│ PuzzleI ┆ FEN     ┆ Moves   ┆ Rating ┆ RatingD ┆ Popular ┆ NbPlays ┆ Themes  ┆ GameUrl ┆ Opening │
│ d       ┆ ---     ┆ ---     ┆ ---    ┆ eviatio ┆ ity     ┆ ---     ┆ ---     ┆ ---     ┆ Tags    │
│ ---     ┆ str     ┆ str     ┆ i64    ┆ n       ┆ ---     ┆ i64     ┆ str     ┆ str     ┆ ---     │
│ str     ┆         ┆         ┆        ┆ ---     ┆ i64     ┆         ┆         ┆         ┆ str     │
│         ┆         ┆         ┆        ┆ i64     ┆         ┆         ┆         ┆         ┆         │
╞═════════╪═════════╪═════════╪════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ 00008   ┆ r6k/pp2 ┆ f2g3    ┆ 1829   ┆ 76      ┆ 95      ┆ 8958    ┆ crushin ┆ https:/ ┆ null    │
│         ┆ r2p/4Rp ┆ e6e7    ┆        ┆         ┆         ┆         ┆ g hangi ┆ /liches ┆         │
│         ┆ 1Q/3p4/ ┆ b2b1    ┆ 

In [32]:
#limiter aux 100 premières lignes
raw_data=raw_data.head(20000)

In [33]:
#affiche la FEN et le GameUrl de la ligne 50
#affiche entièrement le FEN

fen_str = raw_data[50, "FEN"]
display(fen_str)
print(raw_data[50,["PuzzleId"]])


'r3brk1/5pp1/p1nqpn1p/P2pN3/2pP4/2P1PN2/5PPP/RB1QK2R b KQ - 4 16'

shape: (1, 1)
┌──────────┐
│ PuzzleId │
│ ---      │
│ str      │
╞══════════╡
│ 002IE    │
└──────────┘


In [34]:
useless_col=["GameUrl","Rating","RatingDeviation","NbPlays","OpeningTags"]
raw_data = raw_data.drop(useless_col)
# Ajoute la colonne "n_moves" à chaque ligne
raw_data = raw_data.with_columns(
    pl.col("Moves").str.split(" ").list.len().alias("n_moves")
)

In [35]:
distribution = (
    raw_data.group_by("n_moves")
    .len()  # Dans les versions récentes de Polars, .len() remplace .count()
    .sort("n_moves")
)

# Optionnel : Si vous voulez un affichage plus "propre" ligne par ligne
for row in distribution.iter_rows(named=True):
   print(f"{row['n_moves']} coups : {row['len']} problèmes")


2 coups : 2914 problèmes
4 coups : 10238 problèmes
6 coups : 5189 problèmes
8 coups : 1169 problèmes
10 coups : 333 problèmes
12 coups : 108 problèmes
14 coups : 27 problèmes
16 coups : 10 problèmes
18 coups : 9 problèmes
20 coups : 3 problèmes


In [36]:
#filter to keep only positions with less than 14 moves
filtered_data = raw_data.filter(pl.col("n_moves") < 10)

In [37]:
useless_themes = ["bishopEndgame", "endgame", "enPassant", "knightEndgame", "long", "master", 
                  "masterVsMaster", "middlegame", "oneMove", "opening", "pawnEndgame", 
                  "queenEndgame", "queenRookEndgame", "rookEndgame", "short", "superGM", 
                  "veryLong", "mix", "playerGames", "puzzleDownloadInformation","mat","castling","promotion","underpromotion"]

#retirer les lignes où tous les thèmes sont dans useless_themes
filtered_data = (
    filtered_data
    .with_columns(pl.col("Themes").str.split(" ")) 
    .filter(
        pl.col("Themes").list.set_difference(useless_themes).list.len() > 0
    )
)

In [None]:

# Compte le nombre de positions par thème
theme_distribution = (
    filtered_data
    .explode("Themes")
    .group_by("Themes")
    .len()
    .sort("len", descending=True)
)

for row in theme_distribution.iter_rows(named=True):
   print(f"{row['Themes']} : {row['len']} problèmes")

#fait uen liste des thèmes avec moins de 10 problèmes
themes_to_remove = [row['Themes'] for row in theme_distribution.iter_rows(named=True) if row['len'] < 10]
themes_to_remove = list(set(themes_to_remove) - set(useless_themes))
print("Thèmes à retirer (moins de 10 problèmes):", themes_to_remove)

#pour chaque ligne, retirer les thèmes avec moins de 10 problèmes ou useless
final_data = filtered_data.with_columns(
    pl.col("Themes").list.filter(~pl.element().is_in(themes_to_remove+useless_themes))
).filter(pl.col("Themes").list.len() > 0)

short : 10202 problèmes
endgame : 9593 problèmes
middlegame : 8839 problèmes
crushing : 7362 problèmes
mate : 6344 problèmes
advantage : 5674 problèmes
long : 5155 problèmes
oneMove : 2914 problèmes
mateIn1 : 2905 problèmes
master : 2725 problèmes
mateIn2 : 2715 problèmes
fork : 2524 problèmes
kingsideAttack : 1745 problèmes
sacrifice : 1424 problèmes
defensiveMove : 1177 problèmes
veryLong : 1160 problèmes
pin : 1138 problèmes
advancedPawn : 1092 problèmes
rookEndgame : 1054 problèmes
discoveredAttack : 1002 problèmes
opening : 991 problèmes
deflection : 750 problèmes
hangingPiece : 749 problèmes
quietMove : 726 problèmes
backRankMate : 683 problèmes
attraction : 646 problèmes
mateIn3 : 639 problèmes
pawnEndgame : 637 problèmes
exposedKing : 579 problèmes
promotion : 403 problèmes
discoveredCheck : 401 problèmes
skewer : 389 problèmes
queensideAttack : 292 problèmes
bishopEndgame : 261 problèmes
pillsburysMate : 259 problèmes
clearance : 252 problèmes
masterVsMaster : 252 problèmes
op

In [26]:

# 1. On récupère la liste unique des thèmes
all_themes = final_data.select(pl.col("Themes").explode()).unique().sort("Themes").to_series().to_list()
theme_to_index = {theme: idx for idx, theme in enumerate(all_themes)}

# 2. On transforme la colonne "Themes" en une matrice NumPy (One-Hot Encoding multi-label)
# On utilise une liste de listes temporaire pour la conversion vers NumPy
def create_label_matrix(series, theme_map):
    num_rows = len(series)
    num_labels = len(theme_map)
    # On crée une matrice de zéros avec NumPy
    matrix = np.zeros((num_rows, num_labels), dtype=np.int64)
    
    for i, themes in enumerate(series):
        for theme in themes:
            if theme in theme_map:
                matrix[i, theme_map[theme]] = 1
    return matrix

# Extraction de la colonne "Themes" vers NumPy
# 1. Générer la matrice NumPy (comme on l'a fait avant)
themes_series = final_data["Themes"].to_list()
y_matrix = create_label_matrix(themes_series, theme_to_index)

# 2. L'ajouter au DataFrame proprement
# On convertit la matrice NumPy en une liste de listes pour que Polars l'accepte
filtered_data = filtered_data.with_columns(
    pl.Series("theme_vector", y_matrix.tolist())
)

In [183]:
#crée une fonction qui transforme une position plus un mouvemement en une nouvelle position FEN
def apply_move_to_fen(fen, move_uci):
    """
    Applique un mouvement à une position donnée en FEN et retourne la nouvelle position en FEN.
    
    Args:
        fen (str): Position en notation FEN
        move_uci (str): Mouvement en notation UCI (ex: 'e2e4')
        
    Returns:
        str: Nouvelle position en notation FEN après application du mouvement
    """
    board = chess.Board(fen)
    move = chess.Move.from_uci(move_uci)
    
    if move in board.legal_moves:
        board.push(move)
        return board.fen()
    else:
        raise ValueError("Mouvement illégal pour la position donnée.")

In [184]:
#pour chaque ligne, crée une nouvelle colonne "all_FEN" qui correspond à une liste de toutes les positions FEN obtenues après chaque mouvement
def generate_all_fens(fen, moves):
    move_list = moves.split(" ")
    fens = [fen]
    current_fen = fen
    
    for move in move_list:
        current_fen = apply_move_to_fen(current_fen, move)
        fens.append(current_fen)
    
    return fens

filtered_data = filtered_data.with_columns(
    pl.struct(["FEN", "Moves"])
    .map_elements(
        lambda x: generate_all_fens(x["FEN"], x["Moves"]), 
        return_dtype=pl.List(pl.Utf8)  # On précise que la fonction renvoie une liste de chaînes (FENs)
    )
    .alias("all_FEN")
)

In [185]:
#pour chaque position crée un tensor qui décrit les noeuds (cases du plateau)
"""
0-5,Type de pièce,"[1,0,0,0,0,0] pour un Pion, [0,0,0,0,1,0] pour une Reine"
6,Couleur,"1 (Blanc), -1 (Noir), 0 (Vide)"
7-8,"Position x,y","0.12, 0.50 (coordonnées normalisées entre 0 et 1)"
9,Sécurité,Score d'attaquants vs défenseurs (ex: -0.2)
"""
# crée un tensor qui représente les noeuds controlés par chaque pièce
"0 case de départ des attaqus"
"1 case ciblée par les attaques"



def create_position_tensor(fen):
    """
    Crée un tensor représentant une position d'échecs à partir d'un FEN.
    
    Args:
        fen (str): Position en notation FEN
        
    Returns:
        tuple: (node_features, edge_index)
            - node_features: Tensor de forme (64, 10) pour les caractéristiques des cases
            - edge_index: Tensor de forme (2, N) pour les arêtes (attaques)
    """
    board = chess.Board(fen)
    
    # Initialiser le tensor des noeuds (64 cases, 10 features)
    node_features = torch.zeros((64, 10), dtype=torch.float32)
    
    # Mapping des types de pièces
    piece_type_map = {
        chess.PAWN: [1, 0, 0, 0, 0, 0],
        chess.KNIGHT: [0, 1, 0, 0, 0, 0],
        chess.BISHOP: [0, 0, 1, 0, 0, 0],
        chess.ROOK: [0, 0, 0, 1, 0, 0],
        chess.QUEEN: [0, 0, 0, 0, 1, 0],
        chess.KING: [0, 0, 0, 0, 0, 1]
    }
    
    # Remplir les features des noeuds
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        
        # Type de pièce (indices 0-5)
        if piece:
            node_features[square, 0:6] = torch.tensor(piece_type_map[piece.piece_type])
        
        # Couleur (indice 6)
        if piece:
            node_features[square, 6] = 1.0 if piece.color == chess.WHITE else -1.0
        else:
            node_features[square, 6] = 0.0
        
        # Position x, y normalisées (indices 7-8)
        file = chess.square_file(square)  # 0-7
        rank = chess.square_rank(square)  # 0-7
        node_features[square, 7] = file / 7.0  # Normaliser entre 0 et 1
        node_features[square, 8] = rank / 7.0
        
        # Sécurité: score d'attaquants vs défenseurs (indice 9)
        # 1. Définition des constantes de valeur (à mettre hors de la boucle si possible)
        PIECE_VALUES = {
            chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3,
            chess.ROOK: 5, chess.QUEEN: 9, chess.KING: 100
        }

        # --- REMPLACER VOTRE BLOC PAR CELUI-CI ---

        piece = board.piece_at(square)
        if piece:
            # Paramètres de base
            my_color = piece.color
            opp_color = not my_color
            val_piece = PIECE_VALUES[piece.piece_type]
            is_my_turn = (board.turn == my_color)

            # Récupération des attaquants et défenseurs
            atk_set = board.attackers(opp_color, square)
            def_set = board.attackers(my_color, square)
            
            if not atk_set:
                # Cas 1 : Aucune menace directe
                safety_score = 1.0
            else:
                # Trouver l'attaquant le moins cher (LVA)
                min_atk_val = min(PIECE_VALUES[board.piece_at(s).piece_type] for s in atk_set)
                
                if not def_set:
                    # Cas 2 : Attaquée et non défendue (En l'air)
                    safety_score = -1.0 if not is_my_turn else -0.5
                elif min_atk_val < val_piece:
                    # Cas 3 : Menace de gain matériel (ex: Pion attaque Tour)
                    safety_score = -0.8 if not is_my_turn else -0.4
                elif len(atk_set) > len(def_set):
                    # Cas 4 : Surnombre d'attaquants
                    safety_score = -0.6 if not is_my_turn else -0.2
                else:
                    # Cas 5 : Protégée et échange a priori neutre ou défavorable pour l'ennemi
                    safety_score = 0.4 if is_my_turn else 0.1
        else:
            # Case vide
            safety_score = 0.0

        node_features[square, 9] = safety_score
    
    # Créer les arêtes (edge_index) pour les attaques
    edge_list = []
    
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            # Obtenir toutes les cases attaquées par cette pièce
            attacks = board.attacks(square)
            
            for target_square in attacks:
                target_piece = board.piece_at(target_square)
                
                # Ajouter une arête de la case source vers la case cible
                edge_list.append([square, target_square])
    
    # Convertir en tensor de forme (2, N)
    if edge_list:
        edge_index = torch.tensor(edge_list, dtype=torch.long).t()
    else:
        edge_index = torch.zeros((2, 0), dtype=torch.long)
    
    return node_features, edge_index


In [186]:
def combine_position_and_tactical_info(nodes, edges):
    """
    Combine les caractéristiques des positions avec l'influence tactique.
    
    Args:
        nodes (Tensor): Caractéristiques des noeuds de forme (64, 10)
        edges (Tensor): Arêtes de forme (2, N)
        
    Returns:
        Tensor: Caractéristiques combinées prêtes pour le Transformer
    """
    # On projette les 10 features vers 64 dimensions (d_model standard)
    projection = nn.Linear(10, 64) 
    node_embeddings = projection(nodes) # Résultat: [64, 64]

    
    num_nodes = nodes.size(0)
    # sources: qui attaque, targets: qui est attaqué
    sources, targets = edges[0], edges[1]
    
    # On récupère les caractéristiques des attaquants
    source_features = nodes[sources] 
    
    # On additionne les caractéristiques des attaquants sur la case cible
    # Pour chaque case (0-63), on somme les vecteurs des pièces qui l'attaquent
    tactical_influence = torch.zeros_like(nodes)
    tactical_influence.index_add_(0, targets, source_features)# [64, 10]
    # On combine la position actuelle et l'influence tactique

    combined_features = torch.cat([nodes, tactical_influence], dim=-1) # [64, 20]

    

    return combined_features

In [187]:
def fen_to_transformer_input(fen):
    """
    Convertit une position FEN en entrée pour un modèle Transformer.
    
    Args:
        fen (str): Position en notation FEN
        
    Returns:
        Tensor: Entrée de forme (64, 20) pour le Transformer
    """
    nodes, edges = create_position_tensor(fen)
    
    # Combine les informations de position et tactiques
    combined_features = combine_position_and_tactical_info(nodes, edges)
    
    return combined_features.detach().cpu().numpy().tolist()

In [188]:
#apply fen_to_transformers_input au df
filtered_data = filtered_data.with_columns(
    pl.col("all_FEN")
    .map_elements(lambda fen_list: [fen_to_transformer_input(fen) for fen in fen_list], return_dtype=pl.List(pl.List(pl.List(pl.Float32))))
    .alias("all_tensor")
)

In [27]:
# --- CONSTANTES PRÉ-CALCULÉES ---
_COORDS = np.array([[ (s % 8) / 7.0, (s // 8) / 7.0] for s in range(64)], dtype=np.float32)
VAL_ARRAY = np.array([0, 1, 3, 3, 5, 9, 100], dtype=np.float32)

def fast_create_position_tensor(board):
    """Calcule le tenseur (64, 20) pour l'état actuel du board."""
    node_features = np.zeros((64, 10), dtype=np.float32)
    node_features[:, 7:9] = _COORDS
    
    piece_map = board.piece_map()
    sources, targets = [], []

    for square, piece in piece_map.items():
        p_type, p_color = piece.piece_type, piece.color
        node_features[square, p_type - 1] = 1.0
        node_features[square, 6] = 1.0 if p_color else -1.0
        
        # Sécurité (9) via Bitboards
        atk_mask = board.attackers_mask(not p_color, square)
        if not atk_mask:
            safety = 1.0
        else:
            min_atk_val = 100
            for pt in range(1, 7):
                if atk_mask & board.pieces_mask(pt, not p_color):
                    min_atk_val = VAL_ARRAY[pt]
                    break
            
            def_mask = board.attackers_mask(p_color, square)
            is_my_turn = (board.turn == p_color)
            if def_mask == 0:
                safety = -1.0 if not is_my_turn else -0.5
            elif min_atk_val < VAL_ARRAY[p_type]:
                safety = -0.8 if not is_my_turn else -0.4
            elif atk_mask.bit_count() > def_mask.bit_count():
                safety = -0.6 if not is_my_turn else -0.2
            else:
                safety = 0.4 if is_my_turn else 0.1
        node_features[square, 9] = safety

        # Edges
        move_mask = board.attacks_mask(square)
        targets.extend(list(chess.SquareSet(move_mask)))
        sources.extend([square] * move_mask.bit_count())

    # Influence Tactique
    nodes_t = torch.from_numpy(node_features)
    tactical_influence = torch.zeros((64, 10), dtype=torch.float32)
    if sources:
        tactical_influence.index_add_(0, torch.tensor(targets), nodes_t[sources])
    
    return torch.cat([nodes_t, tactical_influence], dim=-1).numpy()

def generate_game_tensors(fen, moves_string):
    """
    Transforme une partie complète directement en une liste de tenseurs.
    """
    board = chess.Board(fen)
    moves = moves_string.split() if moves_string else []
    
    # 1. Tenseur de la position initiale
    results = [fast_create_position_tensor(board).tolist()]
    
    # 2. Tenseurs pour chaque coup joué
    for move_uci in moves:
        try:
            board.push_uci(move_uci)
            results.append(fast_create_position_tensor(board).tolist())
        except ValueError:
            # En cas de coup invalide, on duplique la position précédente pour garder la longueur
            results.append(results[-1])
            
    return results

# --- APPLICATION POLARS ---
final_data = filtered_data.with_columns(
    all_tensor = pl.struct(["FEN", "Moves"]).map_elements(
        lambda x: generate_game_tensors(x["FEN"], x["Moves"]),
        return_dtype=pl.List(pl.List(pl.List(pl.Float32)))
    )
)

In [28]:
#sauvegarde le dataframe final
final_data.write_parquet("data/processed/position_classifier_data.parquet")

#sauvegarde le mapping thème-index
# Ensure the directory exists before running this
with open("data/processed/theme_to_index.json", "w") as f:
    json.dump(theme_to_index, f, indent=4) # Added indent for readability
