### Comment réproduire ? 

Pour reproduire les données , modifier dans le bloc 2 : 
- PGN_ZST_PATH = path("Chemin/...) ; adapter le paramètre selon le chemin de votre fichier extrait
- TARGET_DATE = "AAAA.MM.JJ" ; Date comprise dans cette extraction
- TARGET_N_GAMES = xxx ; Nombre de partie à extraire
- le SALT :"random_salt" ;  Anonymiser l'iD des joueurs 


### Étape 1 – Préparer l’environnement de travail

Cette section charge les bibliothèques nécessaires pour lire le fichier de parties,
traiter du texte, manipuler des données sous forme de tableau et suivre l’avancement.

Concrètement, on prépare les outils qui permettront ensuite de transformer un gros fichier brut
en un tableau propre et exploitable.


In [None]:
import io
import re
import hashlib
from pathlib import Path
import pandas as pd
import zstandard as zstd
from tqdm import tqdm

### Étape 2 – Définir les paramètres du projet

Ici, on fixe les règles du jeu pour l’extraction :

- où se trouve le fichier de parties (fichier brut)
- quelle date on veut analyser
- combien de parties maximum on veut récupérer
- quel type de parties on garde (ici des parties “rapides”, selon une règle de durée)

On met aussi en place un identifiant secret (appelé “salt”) pour anonymiser les joueurs,
afin de ne pas publier de données personnelles.

Enfin, on vérifie que le fichier est bien présent sur l’ordinateur et on affiche sa taille.


In [None]:
# ===== CONFIG =====
PGN_ZST_PATH = Path("Chemin/lichess_db_standard_rated_xxxx-xx.pgn.zst")

TARGET_DATE = "AAAA.MM.JJ"
TARGET_N_GAMES = xxxx

# Règle officielle Lichess: rapid si 479 <= estimated < 1500
RAPID_MIN_SEC = 479
RAPID_MAX_SEC = 1500

SALT = ""

print("Fichier existe ?", PGN_ZST_PATH.exists())
if PGN_ZST_PATH.exists():
    print("Taille actuelle (GiB):", round(PGN_ZST_PATH.stat().st_size / (1024**3), 3))


Fichier existe ? True
Taille actuelle (GiB): 2.828


### Étape 3 – Fonctions utiles pour nettoyer et filtrer

Cette partie crée des petites fonctions pratiques :

- anonymiser un identifiant de joueur pour éviter d’afficher son pseudo
- convertir certaines valeurs en nombres quand c’est possible
- interpréter le “time control” (la cadence) pour estimer la durée d’une partie
- décider si une partie correspond au type recherché (ici : “rapide”)

Ces fonctions servent à rendre le traitement plus lisible et plus fiable.


In [3]:

def anon_player_id(lichess_id: str) -> str:
    return hashlib.sha256((SALT + lichess_id).encode("utf-8")).hexdigest()

def to_int(x):
    try:
        return int(x)
    except:
        return None

def estimated_seconds_from_tc(tc: str):
    if not tc or tc == "-" or "+" not in tc:
        return None
    try:
        base_s, inc_s = tc.split("+")
        base_s = int(base_s)
        inc_s = int(inc_s)
        return base_s + 40 * inc_s
    except:
        return None

def is_rapid_timecontrol(tc: str) -> bool:
    est = estimated_seconds_from_tc(tc)
    return (est is not None) and (RAPID_MIN_SEC <= est < RAPID_MAX_SEC)


### Étape 4 – Comprendre une partie et en extraire l’essentiel

Chaque partie d’échecs est stockée sous forme de texte avec :
- des informations en en-tête (joueurs, classement, résultat, ouverture, etc.)
- puis la liste des coups

Cette étape sert à :
- récupérer les informations importantes de l’en-tête
- isoler la partie “coups” du reste
- compter le nombre de coups
- transformer le résultat en une valeur compréhensible (victoire, défaite, nul)


In [4]:
TAG_RE = re.compile(r'^\[(\w+)\s+"(.*)"\]\s*$', re.MULTILINE)

def parse_tags(pgn_text: str) -> dict:
    return dict(TAG_RE.findall(pgn_text))

def get_movetext(pgn_text: str) -> str:
    """
    Récupère le movetext en séparant tags et coups.
    IMPORTANT : on ne peut PAS utiliser rfind(']') car les commentaires [%clk ...] contiennent ']' !
    """
    parts = pgn_text.split("\n\n", 1)  # tags puis ligne vide puis coups
    return parts[1].strip() if len(parts) == 2 else ""


def count_moves(movetext: str) -> int:
    movetext = re.sub(r"\{[^}]*\}", " ", movetext)
    movetext = re.sub(r"\([^)]*\)", " ", movetext)
    return len(re.findall(r"\b\d+\.", movetext))

def player_result(result_tag: str, color: str):
    if result_tag == "1-0":
        return "win" if color == "white" else "loss"
    if result_tag == "0-1":
        return "loss" if color == "white" else "win"
    if result_tag in ("1/2-1/2", "½-½"):
        return "draw"
    return None


### Étape 5 – Lire le fichier progressivement (sans tout charger d’un coup)

Le fichier brut peut être très volumineux. Plutôt que de le charger entièrement,
on le lit petit à petit.

Cette étape :
- décompresse le fichier au fur et à mesure
- détecte le début de chaque nouvelle partie
- renvoie les parties une par une

C’est ce qui permet de traiter des centaines de milliers de parties
sans saturer la mémoire de l’ordinateur.


In [5]:


def stream_games_from_zst(path: Path):
    """
    Stream un .pgn.zst local (même partiel) et yield chaque partie complète.
    Découpe robuste : une nouvelle partie commence quand une ligne commence par "[Event "
    """
    with path.open("rb") as fh:
        dctx = zstd.ZstdDecompressor()
        with dctx.stream_reader(fh) as reader:
            text_stream = io.TextIOWrapper(reader, encoding="utf-8", errors="ignore")
            buf = []
            for line in text_stream:
                if line.startswith("[Event ") and buf:
                    yield "".join(buf)
                    buf = [line]
                else:
                    buf.append(line)
            if buf:
                yield "".join(buf)


### Étape 6 – Construire un tableau de parties filtrées

Ici, on parcourt les parties une par une, puis on applique des filtres simples :

- garder uniquement les parties jouées à la date choisie
- garder uniquement les parties “rapides”
- éviter les doublons
- ignorer les parties incomplètes

Pour chaque partie conservée, on extrait quelques informations clés :
identifiant de la partie, cadence, nombre de coups, classements, résultat, ouverture, etc.

À la fin, on obtient un tableau (DataFrame) où chaque ligne correspond à une partie.
C’est ce tableau qui servira ensuite pour l’analyse.


In [6]:
def build_games_df():
    seen_games = set()
    rows = []
    date_matches = 0

    for game_text in tqdm(stream_games_from_zst(PGN_ZST_PATH), desc="Parsing games (1 row = 1 game)"):
        tags = parse_tags(game_text)

        # Filtre date
        if tags.get("UTCDate") != TARGET_DATE:
            continue
        date_matches += 1

        # Filtre rapid (selon ta fonction is_rapid_timecontrol)
        tc = tags.get("TimeControl")
        if not is_rapid_timecontrol(tc):
            continue

        # game_id depuis Site
        site = tags.get("Site")  # ex: https://lichess.org/WBUBfWck
        game_id = site.rsplit("/", 1)[-1] if site and "/" in site else site

        if not game_id or game_id in seen_games:
            continue
        seen_games.add(game_id)

        white = tags.get("White")
        black = tags.get("Black")
        if not white or not black:
            continue

        w_elo = to_int(tags.get("WhiteElo"))
        b_elo = to_int(tags.get("BlackElo"))

        result_tag = tags.get("Result")         # "1-0", "0-1", "1/2-1/2"
        termination = tags.get("Termination")
        eco = tags.get("ECO")
        opening = tags.get("Opening")
        utc_time = tags.get("UTCTime")
        num_moves = count_moves(get_movetext(game_text))

        # Résultat au niveau partie
        if result_tag == "1-0":
            winner = "white"
        elif result_tag == "0-1":
            winner = "black"
        elif result_tag in ("1/2-1/2", "½-½"):
            winner = "draw"
        else:
            winner = None

        rows.append({
            "game_id": game_id,
            "utc_date": TARGET_DATE,
            "utc_time": utc_time,
            "time_control": tc,
            "termination": termination,
            "eco": eco,
            "opening_name": opening,
            "num_moves": num_moves,

            "white_elo": w_elo,
            "black_elo": b_elo,
            "elo_diff": (w_elo - b_elo) if (w_elo is not None and b_elo is not None) else None,

            "result": result_tag,   # au format PGN
            "winner": winner
        })

        if len(rows) >= TARGET_N_GAMES:
            break

    df_games = pd.DataFrame(rows)
    return df_games, date_matches


df_games, date_matches = build_games_df()

print("Date matches (toutes cadences confondues) pour", TARGET_DATE, ":", date_matches)
print("Games rows:", len(df_games))
print("Unique games:", df_games["game_id"].nunique() if len(df_games) else 0)

df_games.head()



Parsing games (1 row = 1 game): 1941354it [00:34, 55970.52it/s]


Date matches (toutes cadences confondues) pour 2025.12.01 : 1941355
Games rows: 300000
Unique games: 300000


Unnamed: 0,game_id,utc_date,utc_time,time_control,termination,eco,opening_name,num_moves,white_elo,black_elo,elo_diff,result,winner
0,WBUBfWck,2025.12.01,00:00:14,600+0,Normal,A00,Ware Opening,17,726,787,-61,0-1,black
1,nXafeSF8,2025.12.01,00:00:14,600+0,Normal,D00,Queen's Pawn Game,57,816,793,23,1-0,white
2,JdJxgg7t,2025.12.01,00:00:14,600+0,Normal,C30,King's Gambit,49,1539,1539,0,1-0,white
3,MQYARpxn,2025.12.01,00:00:14,600+0,Normal,C42,Petrov's Defense: Damiano Variation,117,683,560,123,1-0,white
4,sUNemc7p,2025.12.01,00:00:14,600+0,Normal,D02,"Queen's Pawn Game: London System, with e6",97,1762,1738,24,1-0,white


### Étape 7 – Exporter le résultat en CSV

Dernière étape : on enregistre le tableau final sous forme de fichier CSV.

Le CSV est un format simple et universel :
il peut être ouvert dans Excel, Google Sheets, ou utilisé dans un outil de dataviz.

L’idée est de transformer le fichier brut initial en un fichier propre,
limité aux informations utiles pour la suite du projet.


In [None]:
out_path = Path(f"games_rapid_{TARGET_DATE}_n{len(df_games)}.csv")
df_games.to_csv(out_path, index=False)
out_path

PosixPath('games_rapid_2025.12.01_n300000.csv')