In [5]:
import os

def get_all_files_in_dir(dir_path: str, extension:str = None) -> list[str]:
    files = []
    for r, d, f in os.walk(dir_path):
        for file in f:
            if extension is not None and f'.{extension}' in file:
                files.append(os.path.join(r, file))
    return files

# get_all_files_in_dir('../data', 'pgn')


## Filter games with eval

In [7]:
import json
def process_game_rows( game_rows: list[str]) -> list[str]:
    if game_rows[0] == '\n':
        game_rows.pop(0)
    if game_rows[-1] != '\n':
        game_rows.append('\n')
    return game_rows

def save_filtered_data_line_start_index(serie_name: str, index:int):
    file_path = '../data/keep_track.json'
    with open(file_path, 'r') as f:
        data = json.load(f)
    data[serie_name]["filtered_data_start_index"] = index
    with open(file_path, 'w') as f:
        json.dump(data, f)
        
def get_filtered_data_line_start_index(serie_name: str) -> int:
    file_path = '../data/keep_track.json'
    with open(file_path, 'r') as f:
        data = json.load(f)

    if serie_name not in data:
        data[serie_name] = {"filtered_data_start_index": 0}
        with open(file_path, 'w') as f:
            json.dump(data, f)

    with open(file_path, 'r') as f:
        data = json.load(f)
    
    return data[serie_name]["filtered_data_start_index"]


def save_games(games: list[str], save_path: str, mode: str = 'a'):
    eval_file = ''.join(games)
    with open(save_path, mode) as f:
        f.write(eval_file)

def reset_file(file_path: str):
    with open(file_path, 'w') as f:
        f.write('')

def filter_games_with_evaluations(file_path: str, save_folder:str, save_every:int = 10000, print_every:int = 10000):
    serie_name = file_path.split('\\')[-1].replace('.pgn','') # Nom du fichier sans extension
    save_path = save_folder + file_path.split('\\')[-1]
    eval_games, nb_games, start_index = [], 0, 0
    reset_file(save_path)
    
    print(f"Starting at line {start_index} for serie {serie_name}")
    with open(file_path, 'r') as f:
        game_rows = []
        for line_index,line in enumerate(f):
            if line_index < start_index:
                continue
            game_rows.append(line)
            if line[0] == '1':
                # Si ça commence par 1, c'est la ligne des coups du jeu
                if nb_games % print_every == 0:
                    print(f"Game {nb_games} for serie {serie_name}")
                if 'eval' in line:
                    # Si le jeu contient des évaluations, on le garde
                    game_rows = process_game_rows(game_rows)
                    eval_games += game_rows
                game_rows = []
                if nb_games % save_every == 0:
                    # On sauvegarde tous les save_every jeux
                    #save_filtered_data_line_start_index(serie_name, line_index)
                    save_games(eval_games, save_path, 'a')
                    eval_games = []
                nb_games += 1
    
    #save_filtered_data_line_start_index(serie_name, line_index)
    save_games(eval_games, save_path, 'a')
    
    return save_path

filter_games_with_evaluations("../data\\raw_pgn\\data_2014_07.pgn", "../data\\filtered_pgn\\", save_every=10000, print_every=100000)


Starting at line 0 for serie data_2014_07
Game 0 for serie data_2014_07
Game 100000 for serie data_2014_07
Game 200000 for serie data_2014_07
Game 300000 for serie data_2014_07
Game 400000 for serie data_2014_07
Game 500000 for serie data_2014_07
Game 600000 for serie data_2014_07
Game 700000 for serie data_2014_07
Game 800000 for serie data_2014_07
Game 900000 for serie data_2014_07
Game 1000000 for serie data_2014_07


'../data\\filtered_pgn\\data_2014_07.pgn'

### Convert each positions of PGN game to FEN into a CSV file

In [3]:
import chess.pgn 
import csv
import pandas as pd
import os
import json

def process_game(game: chess.pgn.Game, history: list[dict]):
    positions = []
    while not game.is_end():
        game = game.variation(0)
        fen = game.board().fen()
        eval = game.eval()
        if eval is not None:
            eval = game.eval().relative.score(mate_score=100000)/100
            positions.append({"fen": fen, "eval": eval})
    return positions

def save_positions(positions: list[dict], save_path: str, mode: str = 'a'):
    if not os.path.isfile(save_path):
        mode = 'w'
        
    with open(save_path, mode, newline='') as csvfile:
        fieldnames = ['fen', 'eval']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        if mode == 'w':
            writer.writeheader()
        for position in positions:
            writer.writerow(position)

def save_csv_line_start_index(serie_name: str, index:int):
    file_path = '../data/keep_track.json'
    with open(file_path, 'r') as f:
        data = json.load(f)
    data[serie_name]["csv_positions_start_index"] = index
    with open(file_path, 'w') as f:
        json.dump(data, f)
        
def get_csv_line_start_index(serie_name: str) -> int:
    file_path = '../data/keep_track.json'
    with open(file_path, 'r') as f:
        data = json.load(f)
    if serie_name not in data:
        data[serie_name] = {"csv_positions_start_index": 0}
        with open(file_path, 'w') as f:
            json.dump(data, f)
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    return data[serie_name]["csv_positions_start_index"]

def convert_pgn_to_csv(file_path: str, save_folder:str, save_every:int = 10000, print_every:int = 10000, limit:int = None):
    save_path = save_folder + file_path.split('\\')[-1].split('.')[0] + '.csv'
    serie_name = file_path.split('\\')[-1].replace('.pgn','') # Nom du fichier sans extension
    all_positions, nb_games = [], 0
    nb_game_start_index = get_csv_line_start_index(file_path.split('\\')[-1].split('.')[0])
    with open(file_path) as pgn:
        while nb_games <= nb_game_start_index:
            row = pgn.readline()
            if (row == ''): # Fin du fichier
                break
            if row[0] == '1':
                nb_games += 1
        while True:

            if row == '' or limit is not None and nb_games - nb_game_start_index >= limit:
                break

            if nb_games % print_every == 0:
                print(f"nb_games: {nb_games}")

            
            game = chess.pgn.read_game(pgn)
            if game is None:
                break
            positions = process_game(game, all_positions)
            all_positions += positions
            nb_games += 1
            if nb_games % save_every == 0:
                save_positions(all_positions, save_path, 'a')
                save_csv_line_start_index(serie_name, nb_games)
                all_positions = []

    save_positions(all_positions, save_path, 'a')
    save_csv_line_start_index(serie_name, nb_games)

    return save_path

convert_pgn_to_csv("../data\\filtered_pgn\\data_test.pgn", "../data\\csv_positions\\", save_every=100, print_every=200, limit=5000)

nb_games: 200
nb_games: 400
nb_games: 600
nb_games: 800
nb_games: 1000
nb_games: 1200
nb_games: 1400
nb_games: 1600


'../data\\csv_positions\\data_test.csv'

In [30]:
import pandas as pd

def remove_duplicates_in_csv(file_path:str, save_path:str):
    df = pd.read_csv(file_path)
    df = df.drop_duplicates(subset=['fen'])
    df.to_csv(save_path, index=False)

remove_duplicates_in_csv("../data\\csv_positions\\data_2013_09.csv", "../data\\csv_positions\\data_2013_09.csv")


In [11]:
def data_pipeline(raw_data_folder, filtered_data_folder, csv_folder):
    raw_data_file_pathes = get_all_files_in_dir(raw_data_folder, 'pgn')
    for file_path in raw_data_file_pathes:
        # remove games without evaluations and save to filtered_data_folder
        filtered_data_path = filter_games_with_evaluations(file_path, filtered_data_folder, limit=200000)
        # convert pgn game positions to csv and save to csv_folder
        csv_file_path = convert_pgn_to_csv(filtered_data_path, csv_folder)
        # remove fen duplicates in csv file
        remove_duplicates_in_csv(csv_file_path, csv_file_path)

In [12]:
data_folder = "..\\data\\"
raw_data_folder = data_folder + "raw_pgn\\"
filtered_data_folder = data_folder + "filtered_pgn\\"
csv_folder = data_folder + "csv_positions\\"

data_pipeline(raw_data_folder, filtered_data_folder, csv_folder)

nb_games: 0, nb_games_with_eval: 0


nb_games: 10000, nb_games_with_eval: 1743
nb_games: 20000, nb_games_with_eval: 3340
nb_games: 30000, nb_games_with_eval: 4942
nb_games: 40000, nb_games_with_eval: 6594
nb_games: 50000, nb_games_with_eval: 8184
nb_games: 60000, nb_games_with_eval: 9855
nb_games: 70000, nb_games_with_eval: 11419
nb_games: 80000, nb_games_with_eval: 13015
nb_games: 90000, nb_games_with_eval: 14690
nb_games: 100000, nb_games_with_eval: 16329
nb_games: 110000, nb_games_with_eval: 17996
nb_games: 120000, nb_games_with_eval: 19614
nb_games: 130000, nb_games_with_eval: 21309
nb_games: 140000, nb_games_with_eval: 22946
nb_games: 150000, nb_games_with_eval: 24631
nb_games: 160000, nb_games_with_eval: 26259
nb_games: 170000, nb_games_with_eval: 28001
nb_games: 180000, nb_games_with_eval: 29715
nb_games: 190000, nb_games_with_eval: 31341


KeyboardInterrupt: 